シングルスレッドサーバをマルチスレッド化する
現状、サーバはリクエストを順番に処理します。つまり、最初の接続が処理し終わるまで、2番目の接続は処理しないということです。 サーバが受け付けるリクエストの量が増えるほど、この連続的な実行は、最適ではなくなるでしょう。 サーバが処理するのに長い時間がかかるリクエストを受け付けたら、新しいリクエストは迅速に処理できても、 続くリクエストは長いリクエストが完了するまで待たなければならなくなるでしょう。これを修正する必要がありますが、 まずは、実際に問題が起こっているところを見ます。
現在のサーバの実装で遅いリクエストをシミュレーションする
処理が遅いリクエストが現在のサーバ実装に対して行われる他のリクエストにどう影響するかに目を向けます。 リスト20-10は、応答する前に5秒サーバをスリープさせる遅いレスポンスをシミュレーションした /sleepへのリクエストを扱う実装です。
ファイル名: src/main.rs
# #![allow(unused_variables)] #fn main() { use std::thread; use std::time::Duration; # use std::io::prelude::*; # use std::net::TcpStream; # use std::fs::File; // --snip-- fn handle_connection(mut stream: TcpStream) { # let mut buffer = [0; 1024]; # stream.read(&mut buffer).unwrap(); // --snip-- let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; // --snip-- } #}
このコードはちょっと汚いですが、シミュレーション目的には十分です。2番目のリクエストsleep
を作成し、
そのデータをサーバは認識します。if
ブロックの後に else if
を追加し、/sleepへのリクエストを確認しています。
そのリクエストが受け付けられると、サーバは成功のHTMLページを描画する前に5秒間スリープします。
我々のサーバがどれだけ基礎的か見て取れます: 本物のライブラリは、もっと冗長でない方法で複数のリクエストの認識を扱うでしょう!
cargo run
でサーバを開始してください。それから2つブラウザのウインドウを開いてください: 1つは、
http://localhost:7878/ 用、そしてもう1つはhttp://localhost:7878/sleep 用です。
以前のように / URIを数回入力したら、素早く応答するでしょう。しかし、/sleepを入力し、それから / をロードしたら、
sleep
がロードする前にきっかり5秒スリープし終わるまで、/ は待機するのを目撃するでしょう。
より多くのリクエストが遅いリクエストの背後に回ってしまうのを回避するようWebサーバが動く方法を変える方法は複数あります; これから実装するのは、スレッドプールです。
スレッドプールでスループットを向上させる
スレッドプールは、待機し、タスクを処理する準備のできた一塊りの大量に生成されたスレッドです。 プログラムが新しいタスクを受け取ったら、プールのスレッドのどれかをタスクにあてがい、 そのスレッドがそのタスクを処理します。 プールの残りのスレッドは、最初のスレッドが処理中にやってくる他のあらゆるタスクを扱うために利用可能です。 最初のスレッドがタスクの処理を完了したら、アイドル状態のスレッドプールに戻り、新しいタスクを処理する準備ができます。 スレッドプールにより、並行で接続を処理でき、サーバのスループットを向上させます。
プール内のスレッド数は、小さい数字に制限し、DoS(Denial of Service; サービスの拒否)攻撃から保護します; リクエストが来た度に新しいスレッドをプログラムに生成させたら、 1000万リクエストをサーバに行う誰かが、サーバのリソースを使い尽くし、リクエストの処理を停止に追い込むことで、 大混乱を招くことができてしまうでしょう。
無制限にスレッドを大量生産するのではなく、プールに固定された数のスレッドを待機させます。リクエストが来る度に、
処理するためにプールに送られます。プールは、やって来るリクエストのキューを管理します。
プールの各スレッドがこのキューからリクエストを取り出し、リクエストを処理し、そして、別のリクエストをキューに要求します。
この設計により、N
リクエストを並行して処理でき、ここでN
はスレッド数です。各スレッドが実行に時間のかかるリクエストに応答していたら、
続くリクエストはそれでも、キュー内で待機させられてしまうこともありますが、その地点に到達する前に扱える時間のかかるリクエスト数を増加させました。
このテクニックは、Webサーバのスループットを向上させる多くの方法の1つに過ぎません。探究する可能性のある他の選択肢は、 fork/joinモデルと、シングルスレッドの非同期I/Oモデルです。この話題にご興味があれば、他の解決策についてもっと読み、 Rustで実装を試みることができます; Rustのような低レベル言語であれば、これらの選択肢全部が可能なのです。
スレッドプールを実装し始める前に、プールを使うのはどんな感じになるはずなのかについて語りましょう。コードの設計を試みる際、 クライアントのインターフェイスをまず書くことは、設計を導く手助けになることがあります。呼び出したいように構成されるよう、 コードのAPIを記述してください; そして、機能を実装してから公開APIの設計をするのではなく、その構造内で機能を実装してください。
第12章のプロジェクトでTDDを使用したように、ここではCompiler Driven Development(コンパイラ駆動開発)を使用します。 欲しい関数を呼び出すコードを書き、それからコンパイラの出すエラーを見てコードが動くように次に何を変更すべきかを決定します。
各リクエストに対してスレッドを立ち上げられる場合のコードの構造
まず、全接続に対して新しいスレッドを確かに生成した場合にコードがどんな見た目になるかを探究しましょう。
先ほど述べたように、無制限にスレッドを大量生産する可能性があるという問題のため、これは最終的な計画ではありませんが、
開始点です。リスト20-11は、新しいスレッドを立ち上げてfor
ループ内で各ストリームを扱うためにmain
に行う変更を示しています。
ファイル名: src/main.rs
# use std::thread; # use std::io::prelude::*; # use std::net::TcpListener; # use std::net::TcpStream; # fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } # fn handle_connection(mut stream: TcpStream) {}
第16章で学んだように、thread::spawn
は新しいスレッドを生成し、それからクロージャ内のコードを新しいスレッドで実行します。
このコードを実行してブラウザで /sleepをロードし、それからもう2つのブラウザのタブで / をロードしたら、
確かに / へのリクエストは、/sleepが完了するのを待機しなくても済むことがわかるでしょう。
ですが、前述したように、無制限にスレッドを生成することになるので、これは最終的にシステムを参らせてしまうでしょう。
有限数のスレッド用に似たインターフェイスを作成する
スレッドからスレッドプールへの変更にAPIを使用するコードへの大きな変更が必要ないように、
スレッドプールには似た、馴染み深い方法で動作してほしいです。リスト20-12は、
thread::spawn
の代わりに使用したいThreadPool
構造体の架空のインターフェイスを表示しています。
ファイル名: src/main.rs
# use std::thread; # use std::io::prelude::*; # use std::net::TcpListener; # use std::net::TcpStream; # struct ThreadPool; # impl ThreadPool { # fn new(size: u32) -> ThreadPool { ThreadPool } # fn execute<F>(&self, f: F) # where F: FnOnce() + Send + 'static {} # } # fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } } # fn handle_connection(mut stream: TcpStream) {}
ThreadPool::new
を使用して設定可能なスレッド数で新しいスレッドプールを作成し、今回の場合は4です。
それからfor
ループ内で、pool.execute
は、プールが各ストリームに対して実行すべきクロージャを受け取るという点で、
thread::spawn
と似たインターフェイスです。pool.execute
を実装する必要があるので、
これはクロージャを取り、実行するためにプール内のスレッドに与えます。このコードはまだコンパイルできませんが、
コンパイラがどう修正したらいいかガイドできるように試してみます。
コンパイラ駆動開発でThreadPool
構造体を構築する
リスト20-12の変更をsrc/main.rsに行い、それから開発を駆動するためにcargo check
からのコンパイラエラーを活用しましょう。
こちらが得られる最初のエラーです:
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
(エラー: 解決に失敗しました。未定義の型またはモジュール`ThreadPool`を使用しています)
--> src\main.rs:10:16
|
10 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ Use of undeclared type or module
`ThreadPool`
error: aborting due to previous error
よろしい!このエラーはThreadPool
型かモジュールが必要なことを教えてくれているので、今構築します。
ThreadPool
の実装は、Webサーバが行う仕事の種類とは独立しています。従って、hello
クレートをバイナリクレートからライブラリクレートに切り替え、
ThreadPool
の実装を保持させましょう。ライブラリクレートに変更後、
個別のスレッドプールライブラリをWebリクエストを提供するためだけではなく、スレッドプールでしたいあらゆる作業にも使用できます。
以下を含むsrc/lib.rsを生成してください。これは、現状存在できる最も単純なThreadPool
の定義です:
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { pub struct ThreadPool; #}
それから新しいディレクトリ、src/binを作成し、src/main.rsに根付くバイナリクレートをsrc/bin/main.rsに移動してください。
そうすると、ライブラリクレートがhelloディレクトリ内で主要クレートになります; それでも、
cargo run
でsrc/bin/main.rsのバイナリを実行することはできます。main.rsファイルを移動後、
編集してライブラリクレートを持ち込み、以下のコードをsrc/bin/main.rsの先頭に追記してThreadPool
をスコープに導入してください:
ファイル名: src/bin/main.rs
extern crate hello;
use hello::ThreadPool;
このコードはまだ動きませんが、再度それを確認して扱う必要のある次のエラーを手に入れましょう:
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
(エラー: 現在のスコープで型`hello::ThreadPool`の関数または関連アイテムに`new`というものが見つかりません)
--> src/bin/main.rs:13:16
|
13 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ function or associated item not found in
`hello::ThreadPool`
このエラーは、次に、ThreadPool
に対してnew
という関連関数を作成する必要があることを示唆しています。
また、new
には4
を引数として受け入れる引数1つがあり、ThreadPool
インスタンスを返すべきということも知っています。
それらの特徴を持つ最も単純なnew
関数を実装しましょう:
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } } #}
size
引数の型として、usize
を選択しました。何故なら、マイナスのスレッド数は、何も筋が通らないことを知っているからです。
また、この4をスレッドのコレクションの要素数として使用し、第3章の「整数型」節で議論したように、これはusize
のあるべき姿であることも知っています。
コードを再度確認しましょう:
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
(警告: 未使用の変数: `size`)
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
--> src/bin/main.rs:18:14
|
18 | pool.execute(|| {
| ^^^^^^^
今度は、警告とエラーが出ました。一時的に警告は無視して、ThreadPool
にexecute
メソッドがないためにエラーが発生しました。
「有限数のスレッド用に似たインターフェイスを作成する」節で我々のスレッドプールは、
thread::spawn
と似たインターフェイスにするべきと決定したことを思い出してください。
さらに、execute
関数を実装するので、与えられたクロージャを取り、実行するようにプールの待機中のスレッドに渡します。
ThreadPool
にexecute
メソッドをクロージャを引数として受け取るように定義します。
第13章の「ジェネリック引数とFn
トレイトを使用してクロージャを保存する」節から、
3つの異なるトレイトでクロージャを引数として取ることができることを思い出してください: Fn
、FnMut
、FnOnce
です。
ここでは、どの種類のクロージャを使用するか決定する必要があります。最終的には、
標準ライブラリのthread::spawn
実装に似たことをすることがわかっているので、
thread::spawn
のシグニチャで引数にどんな境界があるか見ることができます。ドキュメンテーションは、以下のものを示しています:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
F
型引数がここで関心のあるものです; T
型引数は戻り値と関係があり、関心はありません。spawn
は、
F
のトレイト境界としてFnOnce
を使用していることが確認できます。これはおそらく、我々が欲しているものでもあるでしょう。
というのも、最終的にはexecute
で得た引数をspawn
に渡すからです。さらにFnOnce
は使用したいトレイトであると自信を持つことができます。
リクエストを実行するスレッドは、そのリクエストのクロージャを1回だけ実行し、これはFnOnce
のOnce
に合致するからです。
F
型引数にはまた、トレイト境界のSend
とライフタイム境界の'static
もあり、この状況では有用です:
あるスレッドから別のスレッドにクロージャを移動するのにSend
が必要で、スレッドの実行にどれくらいかかるかわからないので、
'static
も必要です。ThreadPool
にこれらの境界のジェネリックな型F
の引数を取るexecute
メソッドを生成しましょう:
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { # pub struct ThreadPool; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { } } #}
それでも、FnOnce
の後に()
を使用しています。このFnOnce
は引数を取らず、値も返さないクロージャを表すからです。
関数定義同様に、戻り値の型はシグニチャから省略できますが、引数がなくても、カッコは必要です。
またもや、これがexecute
メソッドの最も単純な実装です: 何もしませんが、
コードがコンパイルできるようにしようとしているだけです。再確認しましょう:
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
warning: unused variable: `f`
--> src/lib.rs:8:30
|
8 | pub fn execute<F>(&self, f: F)
| ^
|
= note: to avoid this warning, consider using `_f` instead
これで警告を受け取るだけになり、コンパイルできるようになりました!しかし、cargo run
を試して、
ブラウザでリクエストを行うと、章の冒頭で見かけたエラーがブラウザに現れることに注意してください。
ライブラリは、まだ実際にexecute
に渡されたクロージャを呼び出していないのです!
注釈: HaskellやRustなどの厳密なコンパイラがある言語についての格言として「コードがコンパイルできたら、 動作する」というものをお聴きになったことがある可能性があります。ですが、この格言は普遍的に当てはまるものではありません。 このプロジェクトはコンパイルできますが、全く何もしません!本物の完璧なプロジェクトを構築しようとしているのなら、 ここが単体テストを書き始めて、コードがコンパイルでき、かつ欲しい振る舞いを保持していることを確認するのに良い機会でしょう。
new
でスレッド数を検査する
new
とexecute
の引数で何もしていないので、警告が出続けます。欲しい振る舞いでこれらの関数の本体を実装しましょう。
まずはじめに、new
を考えましょう。先刻、size
引数に非負整数型を選択しました。負のスレッド数のプールは、
全く道理が通らないからです。しかしながら、0スレッドのプールも全く意味がわかりませんが、0も完全に合法なusize
です。
ThreadPool
インスタンスを返す前にsize
が0よりも大きいことを確認するコードを追加し、リスト20-13に示したように、
assert!
マクロを使用することで0を受け取った時にプログラムをパニックさせます。
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { # pub struct ThreadPool; impl ThreadPool { /// 新しいThreadPoolを生成する。 /// /// sizeがプールのスレッド数です。 /// /// # パニック /// /// sizeが0なら、`new`関数はパニックします。 /// /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- } #}
doc commentでThreadPool
にドキュメンテーションを追加しました。第14章で議論したように、
関数がパニックすることもある場面を声高に叫ぶセクションを追加することで、
いいドキュメンテーションの実践に倣っていることに注意してください。
試しにcargo doc --open
を実行し、ThreadPool
構造体をクリックして、new
の生成されるドキュメンテーションがどんな見た目か確かめてください!
ここでしたようにassert!
マクロを追加する代わりに、リスト12-9のI/OプロジェクトのConfig::new
のように、
new
にResult
を返させることもできるでしょう。しかし、今回の場合、スレッドなしでスレッドプールを作成しようとするのは、
回復不能なエラーであるべきと決定しました。野心を感じるのなら、以下のシグニチャのnew
も書いてみて、両者を比較してみてください:
pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
スレッドを格納するスペースを生成する
今や、プールに格納する合法なスレッド数を知る方法ができたので、ThreadPool
構造体を返す前にスレッドを作成して格納できます。
ですが、どのようにスレッドを「格納」するのでしょうか?もう一度、thread::spawn
シグニチャを眺めてみましょう:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
spawn
関数は、JoinHandle<T>
を返し、ここでT
は、クロージャが返す型です。試しに同じようにJoinHandle
を使ってみて、
どうなるか見てみましょう。我々の場合、スレッドプールに渡すクロージャは接続を扱い、何も返さないので、
T
はユニット型()
になるでしょう。
リスト20-14のコードはコンパイルできますが、まだスレッドは何も生成しません。ThreadPool
の定義を変更して、
thread::JoinHandle<()>
インスタンスのベクタを保持し、size
キャパシティのベクタを初期化し、
スレッドを生成する何らかのコードを実行するfor
ループを設定し、それらを含むThreadPool
インスタンスを返します。
ファイル名: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// スレッドを生成してベクタに格納する
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
// --snip--
}
ライブラリクレート内でstd::thread
をスコープに導入しました。ThreadPool
のベクタの要素の型として、
thread::JoinHandle
を使用しているからです。
一旦、合法なサイズを受け取ったら、ThreadPool
はsize
個の要素を保持できる新しいベクタを生成します。
この本ではまだ、with_capacity
関数を使用したことがありませんが、これはVec::new
と同じ作業をしつつ、
重要な違いがあります: ベクタに予めスペースを確保しておくのです。ベクタにsize
個の要素を格納する必要があることはわかっているので、
このメモリ確保を前もってしておくと、Vec::new
よりも少しだけ効率的になります。Vec::new
は、
要素が挿入されるにつれて、自身のサイズを変更します。
再びcargo check
を実行すると、もういくつか警告が出るものの、成功するはずです。
ThreadPool
からスレッドにコードを送信する責任を負うWorker
構造体
リスト20-14のfor
ループにスレッドの生成に関するコメントを残しました。ここでは、実際にスレッドを生成する方法に目を向けます。
標準ライブラリはスレッドを生成する手段としてthread::spawn
を提供し、thread::spawn
は、
生成されるとすぐにスレッドが実行すべき何らかのコードを得ることを予期します。ところが、我々の場合、
スレッドを生成して、後ほど送信するコードを待機してほしいです。標準ライブラリのスレッドの実装は、
それをするいかなる方法も含んでいません; それを手動で実装しなければなりません。
この新しい振る舞いを管理するスレッドとThreadPool
間に新しいデータ構造を導入することでこの振る舞いを実装します。
このデータ構造をWorker
と呼び、プール実装では一般的な用語です。レストランのキッチンで働く人々を思い浮かべてください:
労働者は、お客さんからオーダーが来るまで待機し、それからそれらのオーダーを取り、満たすことに責任を負います。
スレッドプールにJoinHanlde<()>
インスタンスのベクタを格納する代わりに、Worker
構造体のインスタンスを格納します。
各Worker
が単独のJoinHandle<()>
インスタンスを格納します。そして、Worker
に実行するコードのクロージャを取り、
既に走っているスレッドに実行してもらうために送信するメソッドを実装します。ログを取ったり、デバッグする際にプールの異なるワーカーを区別できるように、
各ワーカーにid
も付与します。
ThreadPool
を生成する際に発生することに以下の変更を加えましょう。このようにWorker
をセットアップした後に、
スレッドにクロージャを送信するコードを実装します:
id
とJoinHandle<()>
を保持するWorker
構造体を定義する。ThreadPool
を変更し、Worker
インスタンスのベクタを保持する。id
番号を取り、id
と空のクロージャで大量生産されるスレッドを保持するWorker
インスタンスを返すWorker::new
関数を定義する。ThreadPool::new
でfor
ループカウンタを使用してid
を生成し、そのid
で新しいWorker
を生成し、ベクタにワーカーを格納する。
挑戦に積極的ならば、リスト20-15のコードを見る前にご自身でこれらの変更を実装してみてください。
いいですか?こちらが先ほどの変更を行う1つの方法を行ったリスト20-15です。
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread, } } } #}
ThreadPool
のフィールド名をthreads
からworkers
に変更しました。JoinHandle<()>
インスタンスではなく、
Worker
インスタンスを保持するようになったからです。for
ループのカウンタをWorker::new
への引数として使用し、
それぞれの新しいWorker
をworkers
というベクタに格納します。
外部のコード(src/bin/main.rsのサーバなど)は、ThreadPool
内でWorker
構造体を使用していることに関する実装の詳細を知る必要はないので、
Worker
構造体とそのnew
関数は非公開にしています。Worker::new
関数は与えたid
を使用し、
空のクロージャを使って新しいスレッドを立ち上げることで生成されるJoinHandle<()>
インスタンスを格納します。
このコードはコンパイルでき、ThreadPool::new
への引数として指定した数のWorker
インスタンスを格納します。
ですがそれでも、execute
で得るクロージャを処理してはいません。次は、それをする方法に目を向けましょう。
チャンネル経由でスレッドにリクエストを送信する
さて、thread::spawn
に与えられたクロージャが全く何もしないという問題に取り組みましょう。現在、
execute
メソッドで実行したいクロージャを得ています。ですが、ThreadPool
の生成中、Worker
それぞれを生成する際に、
実行するクロージャをthread::spawn
に与える必要があります。
作ったばかりのWorker
構造体にThreadPool
が保持するキューから実行するコードをフェッチして、
そのコードをスレッドが実行できるように送信してほしいです。
第16章でこのユースケースにぴったりであろうチャンネル(2スレッド間コミュニケーションをとる単純な方法)について学びました。
チャンネルをキューの仕事として機能させ、execute
はThreadPool
からWorker
インスタンスに仕事を送り、
これが仕事をスレッドに送信します。こちらが計画です:
ThreadPool
はチャンネルを生成し、チャンネルの送信側に就く。Worker
それぞれは、チャンネルの受信側に就く。- チャンネルに送信したいクロージャを保持する新しい
Job
構造体を生成する。 execute
メソッドは、実行したい仕事をチャンネルの送信側に送信する。- スレッド内で、
Worker
はチャンネルの受信側をループし、受け取ったあらゆる仕事のクロージャを実行する。
ThreadPool::new
内でチャンネルを生成し、ThreadPool
インスタンスに送信側を保持することから始めましょう。リスト20-16のようにですね。
今の所、Job
構造体は何も保持しませんが、チャンネルに送信する種類の要素になります。
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { # use std::thread; // --snip-- use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender, } } // --snip-- } # # struct Worker { # id: usize, # thread: thread::JoinHandle<()>, # } # # impl Worker { # fn new(id: usize) -> Worker { # let thread = thread::spawn(|| {}); # # Worker { # id, # thread, # } # } # } #}
ThreadPool::new
内で新しいチャンネルを生成し、プールに送信側を保持させています。これはコンパイルに成功しますが、
まだ警告があります。
スレッドプールがワーカーを生成する際に各ワーカーにチャンネルの受信側を試しに渡してみましょう。
受信側はワーカーが大量生産するスレッド内で使用したいことがわかっているので、クロージャ内でreceiver
引数を参照します。
リスト20-17のコードはまだ完璧にはコンパイルできません。
ファイル名: src/lib.rs
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
多少些細で単純な変更を行いました: チャンネルの受信側をWorker::new
に渡し、それからクロージャの内側で使用しています。
このコードのチェックを試みると、このようなエラーが出ます:
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here in
previous iteration of loop
|
= note: move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
このコードは、receiver
を複数のWorker
インスタンスに渡そうとしています。第16章を思い出すように、これは動作しません:
Rustが提供するチャンネル実装は、複数の生成者、単独の消費者です。要するに、
チャンネルの消費側をクローンするだけでこのコードを修正することはできません。たとえできたとしても、
使用したいテクニックではありません; 代わりに、全ワーカー間で単独のreceiver
を共有することで、
スレッド間に仕事を分配したいです。
さらに、チャンネルキューから仕事を取り出すことは、receiver
を可変化することに関連するので、
スレッドには、receiver
を共有して変更する安全な方法が必要です; さもなくば、
競合状態に陥る可能性があります(第16章で講義しました)。
第16章で議論したスレッド安全なスマートポインタを思い出してください: 複数のスレッドで所有権を共有しつつ、
スレッドに値を可変化させるためには、Arc<Mutex<T>>
を使用する必要があります。Arc
型は、
複数のワーカーに受信者を所有させ、Mutex
により、1度に受信者から1つの仕事をたった1つのワーカーが受け取ることを保証します。
リスト20-18は、行う必要のある変更を示しています。
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { # use std::thread; # use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; // --snip-- # pub struct ThreadPool { # workers: Vec<Worker>, # sender: mpsc::Sender<Job>, # } # struct Job; # impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } // --snip-- } # struct Worker { # id: usize, # thread: thread::JoinHandle<()>, # } # impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- # let thread = thread::spawn(|| { # receiver; # }); # # Worker { # id, # thread, # } } } #}
ThreadPool::new
で、チャンネルの受信側をArc
とMutex
に置いています。新しいワーカーそれぞれに対して、
Arc
をクローンして参照カウントを跳ね上げているので、ワーカーは受信側の所有権を共有することができます。
これらの変更でコードはコンパイルできます!ゴールはもうすぐそこです!
execute
メソッドを実装する
最後にThreadPool
にexecute
メソッドを実装しましょう。
Job
も構造体からexecute
が受け取るクロージャの型を保持するトレイトオブジェクトの型エイリアスに変更します。
第19章の「型エイリアスで型同義語を生成する」節で議論したように、型エイリアスにより長い型を短くできます。
リスト20-19をご覧ください。
ファイル名: src/lib.rs
# #![allow(unused_variables)] #fn main() { // --snip-- # pub struct ThreadPool { # workers: Vec<Worker>, # sender: mpsc::Sender<Job>, # } # use std::sync::mpsc; # struct Worker {} type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --snip-- #}
execute
で得たクロージャを使用して新しいJob
インスタンスを生成した後、その仕事をチャンネルの送信側に送信しています。
送信が失敗した時のためにsend
に対してunwrap
を呼び出しています。これは例えば、全スレッドの実行を停止させるなど、
受信側が新しいメッセージを受け取るのをやめてしまったときなどに起こる可能性があります。現時点では、
スレッドの実行を止めることはできません: スレッドは、プールが存在する限り実行し続けます。
unwrap
を使用している理由は、失敗する場合が起こらないとわかっているからですが、コンパイラにはわかりません。
ですが、まだやり終えたわけではありませんよ!ワーカー内でthread::spawn
に渡されているクロージャは、
それでもチャンネルの受信側を参照しているだけです。その代わりに、クロージャには永遠にループし、
チャンネルの受信側に仕事を要求し、仕事を得たらその仕事を実行してもらう必要があります。
リスト20-20に示した変更をWorker::new
に行いましょう。
ファイル名: src/lib.rs
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
// ワーカー{}は仕事を得ました; 実行します
println!("Worker {} got a job; executing.", id);
(*job)();
}
});
Worker {
id,
thread,
}
}
}
ここで、まずreceiver
に対してlock
を呼び出してミューテックスを獲得し、それからunwrap
を呼び出して、
エラーの際にはパニックします。ロックの獲得は、ミューテックスが毒された状態なら失敗する可能性があり、
これは、他のどれかのスレッドがロックを保持している間に、解放するのではなく、パニックした場合に起き得ます。
この場面では、unwrap
を呼び出してこのスレッドをパニックさせるのは、取るべき正当な行動です。
このunwrap
をあなたにとって意味のあるエラーメッセージを伴うexpect
に変更することは、ご自由に行なってください。
ミューテックスのロックを獲得できたら、recv
を呼び出してチャンネルからJob
を受け取ります。
最後のunwrap
もここであらゆるエラーを超えていき、これはチャンネルの送信側を保持するスレッドが閉じた場合に発生する可能性があり、
受信側が閉じた場合にsend
メソッドがErr
を返すのと似ています。
recv
の呼び出しはブロックするので、まだ仕事がなければ、現在のスレッドは、仕事が利用可能になるまで待機します。
Mutex<T>
により、ただ1つのWorker
スレッドのみが一度に仕事の要求を試みることを保証します。
理論的には、このコードはコンパイルできるはずです。残念ながら、Rustコンパイラはまだ完全ではなく、 このようなエラーが出ます:
error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
(エラー: std::ops::FnOnce() + std::marker::Sendの値をムーブできません:
std::ops::FnOnce() + std::marker::Sendのサイズを静的に決定できません)
--> src/lib.rs:63:17
|
63 | (*job)();
| ^^^^^^
問題が非常に謎めいているので、エラーも非常に謎めいています。Box<T>
に格納されたFnOnce
クロージャを呼び出すためには(Job
型エイリアスがそう)、
呼び出す際にクロージャがself
の所有権を奪うので、
クロージャは自身をBox<T>
からムーブする必要があります。一般的に、RustはBox<T>
から値をムーブすることを許可しません。
コンパイラには、Box<T>
の内側の値がどれほどの大きさなのか見当がつかないからです:
第15章でBox<T>
に格納して既知のサイズの値を得たい未知のサイズの何かがあるためにBox<T>
を正確に使用したことを思い出してください。
リスト17-15で見かけたように、記法self: Box<Self>
を使用するメソッドを書くことができ、
これにより、メソッドはBox<T>
に格納されたSelf
値の所有権を奪うことができます。
それがまさしくここで行いたいことですが、残念ながらコンパイラはさせてくれません:
クロージャが呼び出された際に振る舞いを実装するRustの一部は、self: Box<Self>
を使用して実装されていないのです。
故に、コンパイラはまだこの場面においてself: Box<Self>
を使用してクロージャの所有権を奪い、
クロージャをBox<T>
からムーブできることを理解していないのです。
Rustはまだコンパイラの改善途上にあり、リスト20-20のコードは、 将来的にうまく動くようになるべきです。まさしくあなたのような方がこれや他の問題を修正しています!この本を完了したら、 是非ともあなたにも参加していただきたいです。
ですがとりあえず、手頃なトリックを使ってこの問題を回避しましょう。この場合、self: Box<Self>
で、
Box<T>
の内部の値の所有権を奪うことができることをコンパイラに明示的に教えてあげます;
そして、一旦クロージャの所有権を得たら、呼び出せます。これには、
シグニチャにself: Box<Self>
を使用するcall_box
というメソッドのある新しいトレイトFnBox
を定義すること、
FnOnce()
を実装する任意の型に対してFnBox
を定義すること、型エイリアスを新しいトレイトを使用するように変更すること、
Worker
をcall_box
メソッドを使用するように変更することが関連します。これらの変更は、
リスト20-21に表示されています。
ファイル名: src/lib.rs
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
まず、FnBox
という新しいトレイトを作成します。このトレイトにはcall_box
という1つのメソッドがあり、
これは、self: Box<Self>
を取ってself
の所有権を奪い、Box<T>
から値をムーブする点を除いて、
他のFn*
トレイトのcall
メソッドと類似しています。
次に、FnOnce()
トレイトを実装する任意の型F
に対してFnBox
トレイトを実装します。実質的にこれは、
あらゆるFnOnce()
クロージャがcall_box
メソッドを使用できることを意味します。call_box
の実装は、
(*self)()
を使用してBox<T>
からクロージャをムーブし、クロージャを呼び出します。
これでJob
型エイリアスには、新しいトレイトのFnBox
を実装する何かのBox
である必要が出てきました。
これにより、クロージャを直接呼び出す代わりにJob
値を得た時にWorker
のcall_box
を使えます。
任意のFnOnce()
クロージャに対してFnBox
トレイトを実装することは、チャンネルに送信する実際の値は何も変えなくてもいいことを意味します。
もうコンパイラは、我々が行おうとしていることが平気なことであると認識できます。
このトリックは非常にこそこそしていて複雑です。完璧に筋が通らなくても心配しないでください; いつの日か、完全に不要になるでしょう。
このトリックの実装で、スレッドプールは動く状態になります!cargo run
を実行し、
リクエストを行なってください:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `id`
--> src/lib.rs:61:5
|
61 | id: usize,
| ^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `thread`
--> src/lib.rs:62:5
|
62 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!もう非同期に接続を実行するスレッドプールができました。絶対に4つ以上のスレッドが生成されないので、 サーバが多くのリクエストを受け取っても、システムは過負荷にならないでしょう。/sleepにリクエストを行なっても、 サーバは他のスレッドに実行させることで他のリクエストを提供できるでしょう。
第18章でwhile let
ループを学んだ後で、なぜリスト20-22に示したようにワーカースレッドのコードを記述しなかったのか、
不思議に思っている可能性があります。
ファイル名: src/lib.rs
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
このコードはコンパイルでき、動きますが、望み通りのスレッドの振る舞いにはなりません:
遅いリクエストがそれでも、他のリクエストが処理されるのを待機させてしまうのです。理由はどこか捉えがたいものです:
Mutex
構造体には公開のunlock
メソッドがありません。ロックの所有権が、
lock
メソッドが返すLockResult<MutexGuard<T>>
内のMutexGuard<T>
のライフタイムに基づくからです。
コンパイル時には、ロックを保持していない限り、借用チェッカーはそうしたら、Mutex
に保護されるリソースにはアクセスできないという規則を強制できます。
しかし、この実装は、MutexGuard<T>
のライフタイムについて熟考しなければ、
意図したよりもロックが長い間保持される結果になり得ます。while
式の値がブロックの間中スコープに残り続けるので、
ロックはjob.call_box
の呼び出し中保持されたままになり、つまり、他のワーカーが仕事を受け取れなくなるのです。
代わりにloop
を使用し、ロックと仕事をブロックの外ではなく、内側で獲得することで、
lock
メソッドが返すMutexGuard
はlet job
文が終わると同時にドロップされます。
これにより、複数のリクエストを並行で提供し、ロックはrecv
の呼び出しの間は保持されるけれども、
job.call_box
の呼び出しの前には解放されることを保証します。