並行性

並行性と並列性はコンピュータサイエンスにおいて極めて重要なトピックであり、現在では産業界でもホットトピックです。 コンピュータはどんどん多くのコアを持つようになってきていますが、多くのプログラマはまだそれを十分に使いこなす準備ができていません。

Rustのメモリ安全性の機能は、Rustの並行性の話においても適用されます。 Rustプログラムは並行であっても、メモリ安全でなければならず、データ競合を起こさないのです。 Rustの型システムはこの問題を扱うことができ、並行なコードをコンパイル時に確かめるための強力な方法を与えます。

Rustが備えている並行性の機能について語る前に、理解しておくべき重要なことがあります。 それは、Rustは十分にローレベルであるため、その大部分は、言語によってではなく、標準ライブラリによって提供されるということです。 これは、もしRustの並行性の扱い方に気に入らないところがあれば、代わりの方法を実装できるということを意味します。 mio はこの原則を行動で示している実例です。

背景: SendSync

並行性を確かめるのは難しいことです。 Rustには、コードを確かめるのを支援する強力で静的な型システムがあります。 そしてRustは、並行になりうるコードの理解を助ける2つのトレイトを提供します。

Send

最初に取り上げるトレイトは Send です。 型 TSend を実装していた場合、 この型のものはスレッド間で安全に受け渡しされる所有権を持てることを意味します。

これはある種の制約を強制させる際に重要です。 例えば、もし2つのスレッドをつなぐチャネルがあり、そのチャネルを通じてデータを別のスレッドに送れるようにしたいとします。 このときには、その型について Send が実装されているかを確かめます。

逆に、スレッドセーフでない FFI でライブラリを包んでいて、 Send を実装したくなかったとします。 このときコンパイラは、そのライブラリが現在のスレッドの外にいかないよう強制することを支援してくれるでしょう。

Sync

2つ目のトレイトは Sync といいます。 型 TSync を実装していた場合、この型のものは共有された参照を通じて複数スレッドから並行に使われたとしても、必ずメモリ安全であることを意味します。 そのため、 interior mutability を持たない型はもともと Sync であるといえます。 そのような型としては、 u8 などの単純なプリミティブ型やそれらを含む合成型などがあります。

スレッドをまたいで参照を共有するために、Rustは Arc<T> というラッパ型を提供しています。 TSendSync の両方を実装している時かつその時に限り、 Arc<T>SendSync を実装します。 例えば、型 Arc<RefCell<U>> のオブジェクトをスレッドをまたいで受け渡すことはできません。 なぜなら、 RefCellSync を実装していないため、 Arc<RefCell<U>>Send を実装しないためです。

これらの2つのトレイトのおかげで、コードの並行性に関する性質を強く保証するのに型システムを使うことができます。 ただ、それがどうしてかということを示す前に、まずどうやって並行なRustプログラムをつくるかということを学ぶ必要があります!

スレッド

Rustの標準ライブラリはスレッドのためのライブラリを提供しており、それによりRustのコードを並列に走らせることができます。 これが std::thread を使う基本的な例です。

use std::thread; fn main() { thread::spawn(|| { println!("Hello from a thread!"); }); }
use std::thread;

fn main() {
    thread::spawn(|| {
        println!("Hello from a thread!");
    });
}

thread::spawn() というメソッドは クロージャ を受け取り、それを新たなスレッドで実行します。 そして、元のスレッドにハンドルを返します。 このハンドルは、子スレッドが終了するのを待機しその結果を取り出すのに使うことが出来ます。

use std::thread; fn main() { let handle = thread::spawn(|| { "Hello from a thread!" }); println!("{}", handle.join().unwrap()); }
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        "Hello from a thread!"
    });

    println!("{}", handle.join().unwrap());
}

クロージャは環境から変数を捕捉出来るので、スレッドにデータを取り込もうとすることも出来ます。

use std::thread; fn main() { let x = 1; thread::spawn(|| { println!("x is {}", x); }); }
use std::thread;

fn main() {
    let x = 1;
    thread::spawn(|| {
        println!("x is {}", x);
    });
}

しかし、これはエラーです。

5:19: 7:6 error: closure may outlive the current function, but it
                 borrows `x`, which is owned by the current function
...
5:19: 7:6 help: to force the closure to take ownership of `x` (and any other referenced variables),
          use the `move` keyword, as shown:
      thread::spawn(move || {
          println!("x is {}", x);
      });

これはクロージャはデフォルトで変数を参照で捕捉するためクロージャは x への参照 のみを捕捉するからです。 これは問題です。なぜならスレッドは x のスコープよに長生きするかもしれないのでダングリングポインタを招きかねません。

これを直すにはエラーメッセージにあるように move クロージャを使います。 move クロージャは こちら で詳細に説明されていますが、基本的には変数を環境からクロージャへムーブします。

use std::thread; fn main() { let x = 1; thread::spawn(move || { println!("x is {}", x); }); }
use std::thread;

fn main() {
    let x = 1;
    thread::spawn(move || {
        println!("x is {}", x);
    });
}

多くの言語はスレッドを実行できますが、それはひどく危険です。 shared mutable stateによって引き起こされるエラーをいかに防ぐかを丸々あつかった本もあります。 Rustはこれについて型システムによって、コンパイル時にデータ競合を防ぐことで支援します。 それでは、実際にどうやってスレッド間での共有を行うかについて話しましょう。

訳注: "shared mutable state" は 「共有されたミュータブルな状態」という意味ですが、定型句として、訳さずそのまま使用しています。

安全な Shared Mutable State

Rustの型システムのおかげで、「安全な shared mutable state」という嘘のようにきこえる概念があらわれます。 shared mutable state がとてもとても悪いものであるということについて、多くのプログラマの意見は一致しています。

このようなことを言った人がいます。

Shared mutable state is the root of all evil. Most languages attempt to deal with this problem through the 'mutable' part, but Rust deals with it by solving the 'shared' part.

訳: shared mutable state は諸悪の根源だ。 多くの言語は mutable の部分を通じてこの問題に対処しようとしている。 しかし、Rustは shared の部分を解決することで対処する。

ポインタの誤った使用の防止には 所有権のシステム が役立ちますが、このシステムはデータ競合を排除する際にも同様に一役買います。 データ競合は、並行性のバグの中で最悪なものの一つです。

例として、多くの言語で起こりうるようなデータ競合を含んだRustプログラムをあげます。 これは、コンパイルが通りません。

use std::thread; use std::time::Duration; fn main() { let mut data = vec![1, 2, 3]; for i in 0..3 { thread::spawn(move || { data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::thread;
use std::time::Duration;

fn main() {
    let mut data = vec![1, 2, 3];

    for i in 0..3 {
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

以下のようなエラーがでます。

8:17 error: capture of moved value: `data`
        data[i] += 1;
        ^~~~

Rustはこれが安全でないだろうと知っているのです! もし、各スレッドに data への参照があり、スレッドごとにその参照の所有権があるとしたら、3人の所有者がいることになってしまうのです! data は最初の spawn の呼び出しで main からムーブしてしまっているので、ループ内の続く呼び出しはこの変数を使えないのです。

この例では配列の異ったインデックスにアクセスしているのでデータ競合は起きません。 しかしこの分離性はコンパイル時に決定出来ませんし i が定数や乱数だった時にデータ競合が起きます。

そのため、1つの値に対して2つ以上の所有権を持った参照を持てるような型が必要です。 通常、この用途には Rc<T> を使います。これは所有権の共有を提供する参照カウントの型です。 実行時にある程度の管理コストを払って、値への参照の数をカウントします。 なので名前に参照カウント(reference count) が付いているのです。

Rc<T> に対して clone() を呼ぶと新たな所有権を持った参照を返し、内部の参照カウント数を増やします。 スレッドそれぞれで clone() を取ります:

use std::thread; use std::time::Duration; use std::rc::Rc; fn main() { let mut data = Rc::new(vec![1, 2, 3]); for i in 0..3 { // create a new owned reference // 所有権を持った参照を新たに作る let data_ref = data.clone(); // use it in a thread // スレッド内でそれを使う thread::spawn(move || { data_ref[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::thread;
use std::time::Duration;
use std::rc::Rc;

fn main() {
    let mut data = Rc::new(vec![1, 2, 3]);

    for i in 0..3 {
        // 所有権を持った参照を新たに作る
        let data_ref = data.clone();

        // スレッド内でそれを使う
        thread::spawn(move || {
            data_ref[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

これは動作せず、以下のようなエラーを出します:

13:9: 13:22 error: the trait bound `alloc::rc::Rc<collections::vec::Vec<i32>> : core::marker::Send`
            is not satisfied
...
13:9: 13:22 note: `alloc::rc::Rc<collections::vec::Vec<i32>>`
            cannot be sent between threads safely

エラーメッセージで言及があるように、 Rc は安全に別のスレッドに送ることが出来ません。 これは内部の参照カウントがスレッドセーフに管理されていないのでデータ競合を起こし得るからです。

この問題を解決するために、 Arc<T> を使います。Rustの標準のアトミックな参照カウント型です。

「アトミック」という部分は Arc<T> が複数スレッドから安全にアクセスできることを意味しています。 このためにコンパイラは、内部のカウントの更新には、データ競合が起こりえない分割不能な操作が用いられることを保証します。

要点は Arc<T>スレッド間 で所有権を共有可能にする型ということです。

use std::thread; use std::sync::Arc; use std::time::Duration; fn main() { let mut data = Arc::new(vec![1, 2, 3]); for i in 0..3 { let data = data.clone(); thread::spawn(move || { data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::thread;
use std::sync::Arc;
use std::time::Duration;

fn main() {
    let mut data = Arc::new(vec![1, 2, 3]);

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

前回と同様に clone() を使って所有権を持った新たなハンドルを作っています。 そして、このハンドルは新たなスレッドに移動されます。

そうすると... まだ、エラーがでます。

<anon>:11:24 error: cannot borrow immutable borrowed content as mutable
<anon>:11                    data[i] += 1;
                             ^~~~

Arc<T> が保持する値はデフォルトでイミュータブルです。 スレッド間での 共有 はしてくれますがスレッドが絡んだ時の共有されたミュータブルなデータはデータ競合を引き起こし得ます。

通常イミュータブルな位置のものをミュータブルにしたい時は Cell<T> 又は RefCell<T> が実行時のチェックあるいは他の方法で安全に変更する手段を提供してくれる(参考: 保障を選ぶ)のでそれを使います。 しかしながら Rc と同じくこれらはスレッドセーフではありません。 これらを使おうとすると Sync でない旨のエラーが出てコンパイルに失敗します。

スレッド間で共有された値を安全に変更出来る型、例えばどの瞬間でも同時に1スレッドしか内容の値を変更できないことを保障する型が必要そうです。

そのためには、 Mutex<T> 型を使うことができます!

これが動くバージョンです。

use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let data = Arc::new(Mutex::new(vec![1, 2, 3]));

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            data[i] += 1;
        });
    }

    thread::sleep(Duration::from_millis(50));
}

i の値はクロージャへ束縛(コピー)されるだけで、スレッド間で共有されるわけではないことに注意してください。

ここではmutexを「ロック」しているのです。 mutex(「mutual exclusion(訳注: 相互排他)」の略)は前述の通り同時に1つのスレッドからのアクセスしか許しません。 値にアクセスしようと思ったら、 lock() を使います。これは値を使い終わるまでmutexを「ロック」して他のどのスレッドもロック出来ない(そして値に対して何も出来ない)ようにします。 もし既にロックされているmutexをロックしようとすると別のスレッドがロックを解放するまで待ちます。

ここでの「解放」は暗黙的です。ロックの結果(この場合は data)がスコープを出ると、ロックは自動で解放されます

Mutexlockメソッドは次のシグネチャを持つことを気をつけて下さい。

fn main() { fn lock(&self) -> LockResult<MutexGuard<T>> }
fn lock(&self) -> LockResult<MutexGuard<T>>

そして、 SendMutexGuard<T> に対して実装されていないため、ガードはスレッドの境界をまたげず、ロックの獲得と解放のスレッドローカル性が保証されています。

それでは、スレッドの中身をさらに詳しく見ていきましょう。

use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep(Duration::from_millis(50)); }
thread::spawn(move || {
    let mut data = data.lock().unwrap();
    data[i] += 1;
});

まず、 lock() を呼び、mutex のロックを獲得します。 これは失敗するかもしれないため、Result<T, E> が返されます。 そして、今回は単なる例なので、データへの参照を得るためにそれを unwrap() します。 実際のコードでは、ここでもっとちゃんとしたエラーハンドリングをするでしょう。 そうしたら、ロックを持っているので、自由に値を変更できます。

最後の部分で、スレッドが実行されている間、短いタイマで待機しています。 しかし、これはよろしくないです。 というのも、ちょうどよい待機時間を選んでいた可能性より、必要以上に長い時間待ってしまっていたり、十分に待っていなかったりする可能性の方が高いからです。 適切な待ち時間というのは、プログラムを実行した際に、実際に計算が終わるまでどれだけの時間がかかったかに依存します。

タイマに代わるより良い選択肢は、Rust標準ライブラリによって提供されている、スレッドがお互いに同期するためのメカニズムを用いることです。 それでは、そのようなものの一つについて話しましょう。 チャネルです。

チャネル

このコードが、適当な時間を待つ代わりに、同期のためにチャネルを使ったバージョンです。

use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc; fn main() { let data = Arc::new(Mutex::new(0)); // `tx` is the "transmitter" or "sender" // `rx` is the "receiver" // `tx` は送信(transmitter) // `rx` は受信(receiver) let (tx, rx) = mpsc::channel(); for _ in 0..10 { let (data, tx) = (data.clone(), tx.clone()); thread::spawn(move || { let mut data = data.lock().unwrap(); *data += 1; tx.send(()).unwrap(); }); } for _ in 0..10 { rx.recv().unwrap(); } }
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0));

    // `tx` は送信(transmitter)
    // `rx` は受信(receiver)
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            tx.send(()).unwrap();
        });
    }

    for _ in 0..10 {
        rx.recv().unwrap();
    }
}

mpsc::channel() メソッドを使って、新たなチャネルを生成しています。 そして、ただの () をチャネルを通じて send し、それが10個戻ってくるのを待機します。

このチャネルはシグナルを送っているだけですが、 Send であるデータならばなんでもこのチャネルを通じて送れます!

use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); for i in 0..10 { let tx = tx.clone(); thread::spawn(move || { let answer = i * i; tx.send(answer).unwrap(); }); } for _ in 0..10 { println!("{}", rx.recv().unwrap()); } }
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = i * i;

            tx.send(answer).unwrap();
        });
    }

    for _ in 0..10 {
        println!("{}", rx.recv().unwrap());
    }
}

ここでは、10個のスレッドを生成し、それぞれに数値 ( spawn() したときの i ) の2乗を計算させ、その答えをチャネルを通じて send() で送り返させています。

パニック

panic! は現在実行中のスレッドをクラッシュさせます。 Rustのスレッドは独立させるための単純なメカニズムとして使うことができます。

fn main() { use std::thread; let handle = thread::spawn(move || { panic!("oops!"); }); let result = handle.join(); assert!(result.is_err()); }
use std::thread;

let handle = thread::spawn(move || {
    panic!("oops!");
});

let result = handle.join();

assert!(result.is_err());

Thread.join()Result を返し、これによってスレッドがパニックしたかどうかをチェックできます。