メッセージ受け渡しを使ってスレッド間でデータを転送する
人気度を増してきている安全な並行性を保証する一つのアプローチがメッセージ受け渡しで、 スレッドやアクターがデータを含むメッセージを相互に送り合うことでやり取りします。 こちらが、Go言語のドキュメンテーションのスローガンにある考えです: 「メモリを共有することでやり取りするな; 代わりにやり取りすることでメモリを共有しろ」
メッセージ送信並行性を達成するためにRustに存在する一つの主な道具は、チャンネルで、 Rustの標準ライブラリが実装を提供しているプログラミング概念です。プログラミングのチャンネルは、 水の流れのように考えることができます。小川とか川ですね。アヒルのおもちゃやボートみたいなものを流れに置いたら、 水路の終端まで下流に流れていきます。
プログラミングにおけるチャンネルは、2分割できます: 転送機と受信機です。転送機はアヒルのおもちゃを川に置く上流になり、 受信機は、アヒルのおもちゃが行き着く下流になります。コードのある箇所が送信したいデータとともに転送機のメソッドを呼び出し、 別の部分がメッセージが到着していないか受信側を調べます。転送機と受信機のどちらかがドロップされると、 チャンネルは閉じられたと言います。
ここで、1つのスレッドが値を生成し、それをチャンネルに送信し、別のスレッドがその値を受け取り、 出力するプログラムに取り掛かります。チャンネルを使用してスレッド間に単純な値を送り、 機能の説明を行います。一旦、そのテクニックに慣れてしまえば、チャンネルを使用してチャットシステムや、 多くのスレッドが計算の一部を担い、結果をまとめる1つのスレッドにその部分を送るようなシステムを実装できるでしょう。
まず、リスト16-6において、チャンネルを生成するものの、何もしません。 チャンネル越しにどんな型の値を送りたいのかコンパイラがわからないため、 これはまだコンパイルできないことに注意してください。
ファイル名: src/main.rs
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); tx.send(()).unwrap(); }
mpsc::channel
関数で新しいチャンネルを生成しています; mpsc
はmultiple producer, single consumerを表しています。
簡潔に言えば、Rustの標準ライブラリがチャンネルを実装している方法は、1つのチャンネルが値を生成する複数の送信側と、
その値を消費するたった1つの受信側を持つことができるということを意味します。
複数の小川が互いに合わさって1つの大きな川になるところを想像してください:
どの小川を通っても、送られたものは最終的に1つの川に行き着きます。今は、1つの生成器から始めますが、
この例が動作するようになったら、複数の生成器を追加します。
mpsc::channel
関数はタプルを返し、1つ目の要素は、送信側、2つ目の要素は受信側になります。
tx
とrx
という略称は、多くの分野で伝統的に転送機と受信機にそれぞれ使用されているので、
変数をそのように名付けて、各終端を示します。タプルを分配するパターンを伴うlet
文を使用しています;
let
文でパターンを使用することと分配については、第18章で議論しましょう。このようにlet
文を使うと、
mpsc::channel
で返ってくるタプルの部品を抽出するのが便利になります。
立ち上げたスレッドがメインスレッドとやり取りするように、転送機を立ち上げたスレッドに移動し、 1文字列を送らせましょう。リスト16-7のようにですね。川の上流にアヒルのおもちゃを置いたり、 チャットのメッセージをあるスレッドから別のスレッドに送るみたいですね。
ファイル名: src/main.rs
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
今回も、thread::spawn
を使用して新しいスレッドを生成し、それからmove
を使用して、
立ち上げたスレッドがtx
を所有するようにクロージャにtx
をムーブしています。立ち上げたスレッドは、
メッセージをチャンネルを通して送信できるように、チャンネルの送信側を所有する必要があります。
転送側には、送信したい値を取るsend
メソッドがあります。send
メソッドはResult<T, E>
型を返すので、
既に受信側がドロップされ、値を送信する場所がなければ、送信処理はエラーを返します。
この例では、エラーの場合には、パニックするようにunwrap
を呼び出しています。ですが、実際のアプリケーションでは、
ちゃんと扱うでしょう: 第9章に戻ってちゃんとしたエラー処理の方法を再確認してください。
リスト16-8において、メインスレッドのチャンネルの受信側から値を得ます。 アヒルのおもちゃを川の終端で水から回収したり、チャットメッセージを取得するみたいですね。
ファイル名: src/main.rs
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); // 値は{}です println!("Got: {}", received); }
チャンネルの受信側には有用なメソッドが2つあります: recv
とtry_recv
です。
receiveの省略形であるrecv
を使っています。これは、メインスレッドの実行をブロックし、
値がチャンネルを流れてくるまで待機します。一旦値が送信されたら、recv
はそれをResult<T, E>
に含んで返します。
チャンネルの送信側が閉じたら、recv
はエラーを返し、もう値は来ないと通知します。
try_recv
メソッドはブロックせず、代わりに即座にResult<T, E>
を返します:
メッセージがあったら、それを含むOk
値、今回は何もメッセージがなければ、Err
値です。
メッセージを待つ間にこのスレッドにすることが他にあれば、try_recv
は有用です:
try_recv
を頻繁に呼び出し、メッセージがあったら処理し、それ以外の場合は、
再度チェックするまでちょっとの間、他の作業をするループを書くことができるでしょう。
この例では、簡潔性のためにrecv
を使用しました; メッセージを待つこと以外にメインスレッドがすべき作業はないので、
メインスレッドをブロックするのは適切です。
リスト16-8のコードを実行したら、メインスレッドから値が出力されるところを目撃するでしょう:
Got: hi
完璧です!
チャンネルと所有権の転送
安全な並行コードを書く手助けをしてくれるので、所有権規則は、メッセージ送信で重要な役割を担っています。
並行プログラミングでエラーを回避することは、Rustプログラム全体で所有権について考える利点です。
実験をしてチャンネルと所有権がともに動いて、どう問題を回避するかをお見せしましょう:
val
値を立ち上げたスレッドで、チャンネルに送った後に使用を試みます。
リスト16-9のコードのコンパイルを試みて、このコードが許容されない理由を確認してください:
ファイル名: src/main.rs
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
// valは{}
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
ここで、tx.send
経由でチャンネルに送信後にval
を出力しようとしています。これを許可するのは、悪い考えです:
一旦、値が他のスレッドに送信されたら、再度値を使用しようとする前にそのスレッドが変更したりドロップできてしまいます。
可能性として、その別のスレッドの変更により、矛盾していたり存在しないデータのせいでエラーが発生したり、
予期しない結果になるでしょう。ですが、リスト16-9のコードのコンパイルを試みると、Rustはエラーを返します:
error[E0382]: use of moved value: `val`
--> src/main.rs:10:31
|
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value used here after move
|
= note: move occurs because `val` has type `std::string::String`, which does
not implement the `Copy` trait
並行性のミスがコンパイルエラーを招きました。send
関数は引数の所有権を奪い、
値がムーブされると、受信側が所有権を得るのです。これにより、送信後に誤って再度値を使用するのを防いでくれます;
所有権システムが、万事問題ないことを確認してくれます。
複数の値を送信し、受信側が待機するのを確かめる
リスト16-8のコードはコンパイルでき、動きましたが、2つの個別のスレッドがお互いにチャンネル越しに会話していることは、 明瞭に示されませんでした。リスト16-10において、リスト16-8のコードが並行に動いていることを証明する変更を行いました: 立ち上げたスレッドは、複数のメッセージを送信し、各メッセージ間で、1秒待機します。
ファイル名: src/main.rs
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { // スレッドからやあ(hi from the thread) let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
今回は、メインスレッドに送信したい文字列のベクタを立ち上げたスレッドが持っています。
それらを繰り返し、各々個別に送信し、Duration
の値1秒とともにthread::sleep
関数を呼び出すことで、
メッセージ間で停止します。
メインスレッドにおいて、最早recv
関数を明示的に呼んではいません: 代わりに、
rx
をイテレータとして扱っています。受信した値それぞれを出力します。
チャンネルが閉じられると、繰り返しも終わります。
リスト16-10のコードを走らせると、各行の間に1秒の待機をしつつ、以下のような出力を目の当たりにするはずです:
Got: hi
Got: from
Got: the
Got: thread
メインスレッドのfor
ループには停止したり、遅れせたりするコードは何もないので、
メインスレッドが立ち上げたスレッドから値を受け取るのを待機していることがわかります。
転送機をクローンして複数の生成器を作成する
mpsc
は、mutiple producer, single consumerの頭字語であると前述しました。
mpsc
を使い、リスト16-10のコードを拡張して、全ての値を同じ受信機に送信する複数のスレッドを生成しましょう。
チャンネルの転送の片割れをクローンすることでそうすることができます。リスト16-11のようにですね:
ファイル名: src/main.rs
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { // --snip-- let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { // 君のためにもっとメッセージを(more messages for you) let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } // --snip-- }
今回、最初のスレッドを立ち上げる前に、チャンネルの送信側に対してclone
を呼び出しています。
これにより、最初に立ち上げたスレッドに渡せる新しい送信ハンドルが得られます。
元のチャンネルの送信側は、2番目に立ち上げたスレッドに渡します。これにより2つスレッドが得られ、
それぞれチャンネルの受信側に異なるメッセージを送信します。
コードを実行すると、出力は以下のようなものになるはずです:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
別の順番で値が出る可能性もあります; システム次第です。並行性が面白いと同時に難しい部分でもあります。
異なるスレッドで色々な値を与えてthread::sleep
で実験をしたら、走らせるたびにより非決定的になり、
毎回異なる出力をするでしょう。
チャンネルの動作方法を見たので、他の並行性に目を向けましょう。