メッセージ受け渡しを使ってスレッド間でデータを転送する
人気度を増してきている安全な並行性を保証する一つのアプローチがメッセージ受け渡しで、 スレッドやアクターがデータを含むメッセージを相互に送り合うことでやり取りします。 こちらが、Go言語のドキュメンテーションのスローガンにある考えです: 「メモリを共有することでやり取りするな; 代わりにやり取りすることでメモリを共有しろ」
メッセージ送信による並行性を達成するために、Rustの標準ライブラリはチャンネルの実装を提供しています。 チャンネルは、あるスレッドから別のスレッドへデータを送信する手段である、普遍的なプログラミング概念です。
プログラミングのチャンネルは、川などの、向きのある水路のように考えることができます(訳注: channelは日常語としては水路を意味します)。 アヒルのおもちゃなどを川に置いたら、水路の終端まで下流に流れていきます。
チャンネルは、2分割できます: 転送機と受信機です。転送機はアヒルのおもちゃを川に置く上流になり、 受信機は、アヒルのおもちゃが行き着く下流になります。コードのある箇所が送信したいデータとともに転送機のメソッドを呼び出し、 別の部分がメッセージが到着していないか受信側を調べます。転送機と受信機のどちらかがドロップされると、 チャンネルは閉じられたと言います。
ここで、1つのスレッドが値を生成し、それをチャンネルに送信し、別のスレッドがその値を受け取り、 出力するプログラムに取り掛かります。チャンネルを使用してスレッド間に単純な値を送り、 機能の説明を行います。一旦そのテクニックに慣れてしまえば、チャットシステムや、 多くのスレッドが計算の一部を担い、結果をまとめる1つのスレッドにその部分を送るようなシステムなど、 任意のスレッドが互いに通信する必要があるシステムに対して、チャンネルを使用できるでしょう。
まず、リスト16-6において、チャンネルを生成するものの、何もしません。 チャンネル越しにどんな型の値を送りたいのかコンパイラがわからないため、 これはまだコンパイルできないことに注意してください。
ファイル名: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
リスト16-6: チャンネルを生成し、2つの部品をtxとrxに代入する
mpsc::channel関数で新しいチャンネルを生成しています; mpscはmultiple producer, single consumerを表しています。
簡潔に言えば、Rustの標準ライブラリがチャンネルを実装している方法は、1つのチャンネルが値を生成する複数の送信側と、
その値を消費するたった1つの受信側を持つことができるということを意味します。
複数の小川が互いに合わさって1つの大きな川になるところを想像してください:
どの小川を通っても、送られたものは最終的に1つの川に行き着きます。今は、1つの生成器から始めますが、
この例が動作するようになったら、複数の生成器を追加します。
mpsc::channel関数はタプルを返し、1つ目の要素は、送信側(転送機)、2つ目の要素は受信側(受信機)になります。
txとrxという略称は、多くの分野で伝統的に転送機と受信機にそれぞれ使用されているので、
変数をそのように名付けて、各終端を示します。タプルを分配するパターンを伴うlet文を使用しています;
let文でパターンを使用することと分配については、第18章で議論しましょう。今のところは、
このようにlet文を使うと、mpsc::channelで返ってくるタプルの部品を抽出するのが便利になるということだけ知っておいてください。
立ち上げたスレッドがメインスレッドとやり取りするように、転送機を立ち上げたスレッドに移動し、 1文字列を送らせましょう。リスト16-7のようにですね。川の上流にアヒルのおもちゃを置いたり、 チャットのメッセージをあるスレッドから別のスレッドに送るみたいですね。
ファイル名: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
リスト16-7: txを立ち上げたスレッドに移動し、「やあ」を送る
今回も、thread::spawnを使用して新しいスレッドを生成し、それからmoveを使用して、
立ち上げたスレッドがtxを所有するようにクロージャにtxをムーブしています。立ち上げたスレッドは、
メッセージをチャンネルを通して送信できるように、転送機を所有する必要があります。
転送機には、送信したい値を取るsendメソッドがあります。sendメソッドはResult<T, E>型を返すので、
既に受信機がドロップされ、値を送信する場所がなければ、送信処理はエラーを返します。
この例では、エラーの場合には、パニックするようにunwrapを呼び出しています。ですが、実際のアプリケーションでは、
ちゃんと扱うでしょう: 第9章に戻ってちゃんとしたエラー処理の方法を再確認してください。
リスト16-8において、メインスレッドのチャンネルの受信機から値を得ます。 アヒルのおもちゃを川の終端で水から回収したり、チャットメッセージを受信するみたいですね。
ファイル名: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); // "取得しました: {}" println!("Got: {}", received); }
リスト16-8: 「やあ」の値をメインスレッドで受け取り、出力する
受信機には有用なメソッドが2つあります: recvとtry_recvです。
receiveの省略形であるrecvを使っています。これは、メインスレッドの実行をブロックし、
値がチャンネルを流れてくるまで待機します。一旦値が送信されたら、recvはそれをResult<T, E>に含んで返します。
転送機が閉じたら、recvはエラーを返し、もう値は来ないと通知します。
try_recvメソッドはブロックせず、代わりに即座にResult<T, E>を返します:
メッセージがあったら、それを含むOk値、今回は何もメッセージがなければ、Err値です。
メッセージを待つ間にこのスレッドにすることが他にあれば、try_recvは有用です:
try_recvを頻繁に呼び出し、メッセージがあったら処理し、それ以外の場合は、
再度チェックするまでちょっとの間、他の作業をするループを書くことができるでしょう。
この例では、簡潔性のためにrecvを使用しました; メッセージを待つこと以外にメインスレッドがすべき作業はないので、
メインスレッドをブロックするのは適切です。
リスト16-8のコードを実行したら、メインスレッドから値が出力されるところを目撃するでしょう:
Got: hi
完璧です!
チャンネルと所有権の転送
安全な並行コードを書く手助けをしてくれるので、所有権規則は、メッセージ送信で重要な役割を担っています。
並行プログラミングでエラーを回避することは、Rustプログラム全体で所有権について考える利点です。
実験をしてチャンネルと所有権がともに動いて、どう問題を回避するかをお見せしましょう:
val値を立ち上げたスレッドで、チャンネルに送った後に使用を試みます。
リスト16-9のコードのコンパイルを試みて、このコードが許容されない理由を確認してください:
ファイル名: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
// "valは{}です"
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
リスト16-9: チャンネルに送信後にvalの使用を試みる
ここで、tx.send経由でチャンネルに送信後にvalを出力しようとしています。これを許可するのは、悪い考えです:
一旦、値が他のスレッドに送信されたら、再度値を使用しようとする前にそのスレッドが変更したりドロップできてしまいます。
可能性として、その別のスレッドの変更により、矛盾していたり存在しないデータのせいでエラーが発生したり、
予期しない結果になるでしょう。ですが、リスト16-9のコードのコンパイルを試みると、Rustはエラーを返します:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
(エラー: ムーブされた値の借用: `val`)
--> src/main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
| (`val`は`Copy`トレイトを実装しない`String`型を持つので、ムーブが発生します)
9 | tx.send(val).unwrap();
| --- value moved here
| (値はここでムーブされます)
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
| (ここでムーブ後に借用されます)
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
(ヘルプ: パフォーマンスコストが許容できる場合は、クローンすることを検討してください)
|
9 | tx.send(val.clone()).unwrap();
| ++++++++
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
並行性のミスがコンパイルエラーを招きました。send関数は引数の所有権を奪い、
値がムーブされると、受信側が所有権を得るのです。これにより、送信後に誤って再度値を使用するのを防いでくれます;
所有権システムが、万事問題ないことを確認してくれます。
複数の値を送信し、受信側が待機するのを確かめる
リスト16-8のコードはコンパイルでき、動きましたが、2つの個別のスレッドがお互いにチャンネル越しに会話していることは、 明瞭に示されませんでした。リスト16-10において、リスト16-8のコードが並行に動いていることを証明する変更を行いました: 立ち上げたスレッドは、複数のメッセージを送信し、各メッセージ間で、1秒待機します。
ファイル名: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
// スレッドからやあ (hi from the thread)
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
リスト16-10: 複数のメッセージを送信し、メッセージ間で停止する
今回は、メインスレッドに送信したい文字列のベクタを立ち上げたスレッドが持っています。
それらを繰り返し、各々個別に送信し、Durationの値1秒とともにthread::sleep関数を呼び出すことで、
メッセージ間で停止します。
メインスレッドにおいて、最早recv関数を明示的に呼んではいません: 代わりに、
rxをイテレータとして扱っています。受信した値それぞれを出力します。
チャンネルが閉じられると、繰り返しも終わります。
リスト16-10のコードを走らせると、各行の間に1秒の待機をしつつ、以下のような出力を目の当たりにするはずです:
Got: hi
Got: from
Got: the
Got: thread
メインスレッドのforループには停止したり、遅れせたりするコードは何もないので、
メインスレッドが立ち上げたスレッドから値を受け取るのを待機していることがわかります。
転送機をクローンして複数の生成器を作成する
mpscは、mutiple producer, single consumerの頭字語であると前述しました。
mpscを使い、リスト16-10のコードを拡張して、全ての値を同じ受信機に送信する複数のスレッドを生成しましょう。
転送機をクローンすることでそうすることができます。リスト16-11のようにですね:
ファイル名: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
// 君のためにもっとメッセージを (more messages for you)
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// --snip--
}
リスト16-11: 複数の生成器から複数のメッセージを送信する
今回、最初のスレッドを立ち上げる前に、転送機に対してcloneを呼び出しています。
これにより、最初に立ち上げたスレッドに渡せる新しい転送機が得られます。
元の転送機は、2番目に立ち上げたスレッドに渡します。これにより2つスレッドが得られ、
1つの受信機にそれぞれ異なるメッセージを送信します。
コードを実行すると、出力は以下のようなものになるはずです:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
システム次第で、別の順番で値が出る可能性もあります。並行性が面白いと同時に難しい部分でもあります。
異なるスレッドで色々な値を与えてthread::sleepで実験をしたら、走らせるたびにより非決定的になり、
毎回異なる出力をするでしょう。
チャンネルの動作方法を見たので、他の並行性に目を向けましょう。