状態共有並行性

メッセージ受け渡しは、並行性を扱う素晴らしい方法ですが、唯一の方法ではありません。 他の方法としては、複数のスレッドが同一の共有されたデータにアクセスする方法があるでしょう。 Go言語ドキュメンテーションのスローガンのこの部分を再び考えてください: 「メモリを共有することでやり取りするな。」

メモリを共有することでやり取りするとはどんな感じなのでしょうか?さらに、 なぜメッセージ受け渡しに熱狂的な人は、メモリ共有を使うなと警告するのでしょうか?

ある意味では、どんなプログラミング言語のチャンネルも単独の所有権に類似しています。 一旦チャンネルに値を転送したら、その値は最早使用することがないからです。 メモリ共有並行性は、複数の所有権に似ています: 複数のスレッドが同時に同じメモリ位置にアクセスできるのです。 第15章でスマートポインタが複数の所有権を可能にするのを目の当たりにしたように、 異なる所有者を管理する必要があるので、複数の所有権は複雑度を増させます。 Rustの型システムと所有権規則は、この管理を正しく行う大きな助けになります。 例として、メモリ共有を行うより一般的な並行性の基本型の一つであるミューテックスを見てみましょう。

ミューテックスを使用して一度に1つのスレッドからデータにアクセスすることを許可する

ミューテックスは、どんな時も1つのスレッドにしかなんらかのデータへのアクセスを許可しないというように、 mutual exclusion(相互排他)の省略形です。ミューテックスにあるデータにアクセスするには、 ミューテックスのロックを所望することでアクセスしたいことをまず、スレッドは通知しなければなりません。 ロックとは、現在誰がデータへの排他的アクセスを行なっているかを追跡するミューテックスの一部をなすデータ構造です。 故に、ミューテックスはロックシステム経由で保持しているデータを死守する(guarding)と解説されます。

ミューテックスは、2つの規則を覚えておく必要があるため、難しいという評判があります:

  • データを使用する前にロックの獲得を試みなければならない。
  • ミューテックスが死守しているデータの使用が終わったら、他のスレッドがロックを獲得できるように、 データをアンロックしなければならない。

ミューテックスを現実世界の物で例えるなら、マイクが1つしかない会議のパネルディスカッションを思い浮かべてください。 パネリストが発言できる前に、マイクを使用したいと申し出たり、通知しなければなりません。マイクを受け取ったら、 話したいだけ話し、それから次に発言を申し出たパネリストにマイクを手渡します。パネリストが発言し終わった時に、 マイクを手渡すのを忘れていたら、誰も他の人は発言できません。共有されているマイクの管理がうまくいかなければ、 パネルは予定通りに機能しないでしょう!

ミューテックスの管理は、正しく行うのに著しく技巧を要することがあるので、多くの人がチャンネルに熱狂的になるわけです。 しかしながら、Rustの型システムと所有権規則のおかげで、ロックとアンロックをおかしくすることはありません。

Mutex<T>のAPI

ミューテックスの使用方法の例として、ミューテックスをシングルスレッドの文脈で使うことから始めましょう。 リスト16-12のようにですね:

ファイル名: src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

リスト16-12: 簡潔性のためにMutex<T>のAPIをシングルスレッドの文脈で探究する

多くの型同様、newという関連関数を使用してMutex<T>を生成します。ミューテックス内部のデータにアクセスするには、 lockメソッドを使用してロックを獲得します。この呼び出しは、現在のスレッドをブロックするので、 ロックを得られる順番が来るまで何も作業はできません。

ロックを保持している他のスレッドがパニックしたら、lockの呼び出しは失敗するでしょう。その場合、 誰もロックを取得することは叶わないので、unwrapすると決定し、そのような状況になったら、 このスレッドをパニックさせます。

ロックを獲得した後、今回の場合、numと名付けられていますが、戻り値を中に入っているデータへの可変参照として扱うことができます。 型システムにより、mの値を使用する前にロックを獲得していることが確認されます。mの型はMutex<i32>であってi32ではないので、 i32を使用できるようにするには、lockを呼び出さなければならないのです。忘れることはあり得ません; 型システムにより、それ以外の場合に内部のi32にアクセスすることは許されません。

お察しかもしれませんが、Mutex<T>はスマートポインタです。より正確を期すなら、 lockを呼び出すとLockResultに包まれた形でMutexGuardというスマートポインタを返却し、 これをunwrapの呼び出しによって処理しました。MutexGuardスマートポインタが、 内部のデータを指すDerefを実装しています; このスマートポインタはさらにMutexGuardがスコープを外れた時に、 自動的にロックを解除するDrop実装もしていて、これが内部スコープの終わりで発生します。 結果として、ロックの解除が自動的に行われるので、ロックの解除を忘れ、 ミューテックスが他のスレッドで使用されるのを阻害するリスクを負いません。

ロックをドロップした後、ミューテックスの値を出力し、内部のi32の値を6に変更できたことが確かめられるのです。

複数のスレッド間でMutex<T>を共有する

さて、Mutex<T>を使って複数のスレッド間で値を共有してみましょう。10個のスレッドを立ち上げ、 各々カウンタの値を1ずつインクリメントさせるので、カウンタは0から10まで上がります。 次のリスト16-13の例はコンパイルエラーになりますが、そのエラーを使用して、Mutex<T>の使用法と、 コンパイラがそれを正しく活用する手助けをしてくれる方法について学びます。

ファイル名: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-13: Mutex<T>により死守されているカウンタを10個のスレッドがそれぞれインクリメントする

リスト16-12のように、counter変数を生成してMutex<T>の内部にi32を保持しています。 次に、数値の範囲を走査して10個のスレッドを生成しています。thread::spawnを使用して、 全スレッドに同じクロージャを与えています。このクロージャは、スレッド内にカウンタをムーブし、 lockメソッドを呼ぶことでMutex<T>のロックを獲得し、それからミューテックスの値に1を足します。 スレッドがクロージャを実行し終わったら、numはスコープ外に出てロックを解除するので、 他のスレッドが獲得できるわけです。

メインスレッドで全てのjoinハンドルを収集します。それからリスト16-2のように、各ハンドルに対してjoinを呼び出し、 全スレッドが終了するのを確かめています。その時点で、メインスレッドはロックを獲得し、このプログラムの結果を出力します。

この例はコンパイルできないでしょうと仄めかしました。では、理由を探りましょう!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
(エラー: ムーブされた値の借用: `counter`)
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
   |                (`counter`は`Copy`トレイトを実装しない`Mutex<i32>`型を持つので、ムーブが発生します)
...
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
   |                                           (値は、ループの前回の反復時に、ここでクロージャ内にムーブされます)
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |                                    (値はここでムーブ後に借用されています)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

エラーメッセージには、counter値はループの前回の反復時にムーブされたと書いてあります。 コンパイラは、ロックcounterの所有権を複数のスレッドに移動することはできないと教えてくれているのです。 第15章で議論した、複数所有権の手法を使って、コンパイラエラーを修正しましょう。

複数のスレッドで複数の所有権

第15章で、スマートポインタのRc<T>を使用して参照カウントの値を作ることで、1つの値に複数の所有者を与えました。 同じことをここでもして、どうなるか見ましょう。リスト16-14でRc<T>Mutex<T>を包含し、 所有権をスレッドに移す前にRc<T>をクローンします。

ファイル名: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-14: Rc<T>を使用して複数のスレッドにMutex<T>を所有させようとする

再三、コンパイルし……別のエラーが出ました!コンパイラはいろんなことを教えてくれています。

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
(エラー: `Rc<Mutex<i32>>`はスレッド間で安全に送信できません)
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |            (この`{[closure@src/main.rs:11:36: 11:43}`の中で)
   | |                      |
   | |                      required by a bound introduced by this call
   | |                     (この呼び出しによって導入される境界によって必要とされます)
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
   |            (`Rc<Mutex<i32>>`はスレッド間で安全に送信できません)
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
   =(ヘルプ: `{closure@src/main.rs:11:36: 11:43}`の中で、トレイト`Send`は`Rc<Mutex<i32>>`に対して実装されていません
note: required because it's used within this closure
(注釈: このクロージャの中で使用されているので、要求されます)
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
(注釈: `spawn`の境界によって要求されます)
  --> /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/thread/mod.rs:678:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

おお、このエラーメッセージはとても長ったらしいですね!注目すべき重要な部分はここです: `Rc<Mutex<i32>>` cannot be sent between threads safely。コンパイラは、 その理由も伝えてくれています: the trait `Send` is not implemented for `Rc<Mutex<i32>>`Sendについては、次の節で語ります: スレッドとともに使用している型が並行な場面で使われることを意図したものであることを保証するトレイトの1つです。

残念ながら、Rc<T>はスレッド間で共有するには安全ではないのです。Rc<T>が参照カウントを管理する際、 cloneが呼び出されるたびにカウントを追加し、クローンがドロップされるたびにカウントを差し引きます。 しかし、並行基本型を使用してカウントの変更が別のスレッドに妨害されないことを確認していないのです。 これは間違ったカウントにつながる可能性があり、今度はメモリリークや、使用し終わる前に値がドロップされることにつながる可能性のある潜在的なバグです。 必要なのは、いかにもRc<T>のようだけれども、参照カウントへの変更をスレッドセーフに行うものです。

Arc<T>で原子的な参照カウント

幸いなことに、Arc<T>Rc<T>のような並行な状況で安全に使用できる型ですaatomicを表し、原子的に参照カウントする型を意味します。アトミックは、 ここでは詳しく講義しない並行性の別の基本型です: 詳細は、 std::sync::atomicの標準ライブラリドキュメンテーションを参照されたし。現時点では、 アトミックは、基本型のように動くけれども、スレッド間で共有しても安全なことだけ知っていれば良いです。

そうしたらあなたは、なぜ全ての基本型がアトミックでなく、標準ライブラリの型も標準でArc<T>を使って実装されていないのか疑問に思う可能性があります。 その理由は、スレッド安全性が、本当に必要な時だけ支払いたいパフォーマンスの犠牲とともに得られるものだからです。 シングルスレッドで値に処理を施すだけなら、アトミックが提供する保証を強制する必要がない方がコードはより速く走るのです。

例に回帰しましょう: Arc<T>Rc<T>のAPIは同じなので、use行とnewの呼び出しとcloneの呼び出しを変更して、 プログラムを修正します。リスト16-15は、ようやくコンパイルでき、動作します:

ファイル名: src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-15: Arc<T>を使用してMutex<T>をラップし、所有権を複数のスレッド間で共有できるようにする

このコードは、以下のように出力します:

Result: 10

やりました!0から10まで数え上げました。これは、あまり印象的ではないように思えるかもしれませんが、 本当にMutex<T>とスレッド安全性についていろんなことを教えてくれました。このプログラムの構造を使用して、 カウンタをインクリメントする以上の複雑な処理を行うこともできるでしょう。この手法を使えば、 計算を独立した部分に小分けにし、その部分をスレッドに分割し、それからMutex<T>を使用して、 各スレッドに最終結果を更新させることができます。

単純な数値演算を行おうとしているなら、Mutex<T>型よりも単純な、 標準ライブラリのstd::sync::atomicモジュールによって提供される型があることに注意してください。 これらの型はプリミティブ型に対する安全かつ並行的なアトミックアクセスを提供します。 この例ではMutex<T>がどのように機能するかに集中できるように、 プリミティブ型に対してMutex<T>を使用することをあえて選択しました。

RefCell<T>/Rc<T>Mutex<T>/Arc<T>の類似性

counterは不変なのに、その内部にある値への可変参照を得ることができたことに気付いたでしょうか; つまり、Mutex<T>は、Cell系のように内部可変性を提供するわけです。 第15章でRefCell<T>を使用してRc<T>の内容を可変化できるようにしたのと同様に、 Mutex<T>を使用してArc<T>の内容を可変化しているのです。

気付いておくべき別の詳細は、Mutex<T>を使用する際にあらゆる種類のロジックエラーからは、 コンパイラは保護してくれないということです。第15章でRc<T>は、循環参照を生成してしまうリスクを伴い、 そうすると、2つのRc<T>の値がお互いを参照し合い、メモリリークを引き起こしてしまうことを思い出してください。 同様に、Mutex<T>デッドロックを生成するリスクを伴っています。これは、処理が2つのリソースをロックする必要があり、 2つのスレッドがそれぞれにロックを1つ獲得して永久にお互いを待ちあってしまうときに起こります。 デッドロックに興味があるのなら、デッドロックのあるRustプログラムを組んでみてください; それからどんな言語でもいいので、ミューテックスに対してデッドロックを緩和する方法を調べて、 Rustで是非、それを実装してみてください。Mutex<T>MutexGuardに関する標準ライブラリのAPIドキュメンテーションは、 役に立つ情報を提供してくれます。

SendSyncトレイトと、それらを独自の型で使用する方法について語って、この章を締めくくります。