正常なシャットダウンと片付け

リスト20-21のコードは、意図した通り、スレッドプールの使用を通してリクエストに非同期に応答できます。 直接使用していないworkersidthreadフィールドについて警告が出ます。この警告は、現在のコードは何も片付けていないことを思い出させてくれます。 優美さに欠けるctrl-cを使用してメインスレッドを停止させる方法を使用すると、 リクエストの処理中であっても、他のスレッドも停止します。

では、閉じる前に取り掛かっているリクエストを完了できるように、プールの各スレッドに対してjoinを呼び出すDropトレイトを実装します。 そして、スレッドに新しいリクエストの受付を停止し、終了するように教える方法を実装します。 このコードが動いているのを確かめるために、サーバを変更して正常にスレッドプールを終了する前に2つしかリクエストを受け付けないようにします。

ThreadPoolDropトレイトを実装する

スレッドプールにDropを実装するところから始めましょう。プールがドロップされると、 スレッドは全てjoinして、作業を完了するのを確かめるべきです。リスト20-23は、Drop実装の最初の試みを表示しています; このコードはまだ完全には動きません。

ファイル名: src/lib.rs

impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { // ワーカー{}を終了します println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }

リスト20-23: スレッドプールがスコープを抜けた時にスレッドをjoinさせる

まず、スレッドプールworkersそれぞれを走査します。selfは可変参照であり、workerを可変化できる必要もあるので、 これには&mutを使用しています。ワーカーそれぞれに対して、特定のワーカーを終了する旨のメッセージを出力し、 それからjoinをワーカースレッドに対して呼び出しています。joinの呼び出しが失敗したら、 unwrapを使用してRustをパニックさせ、正常でないシャットダウンに移行します。

こちらが、このコードをコンパイルする際に出るエラーです:

error[E0507]: cannot move out of borrowed content --> src/lib.rs:65:13 | 65 | worker.thread.join().unwrap(); | ^^^^^^ cannot move out of borrowed content

workerの可変参照しかなく、joinは引数の所有権を奪うためにこのエラーはjoinを呼び出せないと教えてくれています。 この問題を解決するには、joinがスレッドを消費できるように、threadを所有するWorkerインスタンスからスレッドをムーブする必要があります。 これをリスト17-15では行いました: Workerが代わりにOption<thread::JoinHandle<()>>を保持していれば、 Optionに対してtakeメソッドを呼び出し、Some列挙子から値をムーブし、その場所にNone列挙子を残すことができます。 言い換えれば、実行中のWorkerにはthreadSome列挙子があり、Workerを片付けたい時には、 ワーカーが実行するスレッドがないようにSomeNoneで置き換えるのです。

従って、Workerの定義を以下のように更新したいことがわかります:

ファイル名: src/lib.rs

#![allow(unused)] fn main() { use std::thread; struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } }

さて、コンパイラを頼りにして他に変更する必要がある箇所を探しましょう。このコードをチェックすると、 2つのエラーが出ます:

error[E0599]: no method named `join` found for type `std::option::Option<std::thread::JoinHandle<()>>` in the current scope --> src/lib.rs:65:27 | 65 | worker.thread.join().unwrap(); | ^^^^ error[E0308]: mismatched types --> src/lib.rs:89:13 | 89 | thread, | ^^^^^^ | | | expected enum `std::option::Option`, found struct `std::thread::JoinHandle` | help: try using a variant of the expected type: `Some(thread)` | = note: expected type `std::option::Option<std::thread::JoinHandle<()>>` found type `std::thread::JoinHandle<_>`

2番目のエラーを扱いましょう。これは、Worker::newの最後のコードを指しています; 新しいWorkerを作成する際に、 Somethreadの値を包む必要があります。このエラーを修正するために以下の変更を行なってください:

ファイル名: src/lib.rs

impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- Worker { id, thread: Some(thread), } } }

最初のエラーはDrop実装内にあります。先ほど、Option値に対してtakeを呼び出し、 threadworkerからムーブする意図があることに触れました。以下の変更がそれを行います:

ファイル名: src/lib.rs

impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

第17章で議論したように、Optiontakeメソッドは、Some列挙子を取り出し、その箇所にNoneを残します。 if letを使用してSomeを分配し、スレッドを得ています; そして、スレッドに対してjoinを呼び出します。 ワーカーのスレッドが既にNoneなら、ワーカーはスレッドを既に片付け済みであることがわかるので、 その場合には何も起きません。

スレッドに仕事をリッスンするのを止めるよう通知する

これらの変更によって、コードは警告なしでコンパイルできます。ですが悪い知らせは、このコードが期待したようにはまだ機能しないことです。 鍵は、Workerインスタンスのスレッドで実行されるクロージャのロジックです: 現時点でjoinを呼び出していますが、 仕事を求めて永遠にloopするので、スレッドを終了しません。現在のdropの実装でThreadPoolをドロップしようとしたら、 最初のスレッドが完了するのを待機してメインスレッドは永遠にブロックされるでしょう。

この問題を修正するには、スレッドが、実行すべきJobか、リッスンをやめて無限ループを抜ける通知をリッスンするように、 変更します。Jobインスタンスの代わりに、チャンネルはこれら2つのenum列挙子の一方を送信します。

ファイル名: src/lib.rs

#![allow(unused)] fn main() { struct Job; enum Message { NewJob(Job), Terminate, } }

このMessage enumはスレッドが実行すべきJobを保持するNewJob列挙子か、スレッドをループから抜けさせ、 停止させるTerminate列挙子のどちらかになります。

チャンネルを調整し、型Jobではなく、型Messageを使用するようにする必要があります。リスト20-24のようにですね。

ファイル名: src/lib.rs

pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } // --snip-- impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move ||{ loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job.call_box(); }, Message::Terminate => { // ワーカー{}は停止するよう指示された println!("Worker {} was told to terminate.", id); break; }, } } }); Worker { id, thread: Some(thread), } } }

リスト20-24: Message値を送受信し、WorkerMessage::Terminateを受け取ったら、ループを抜ける

Message enumを具体化するために、2箇所でJobMessageに変更する必要があります: ThreadPoolの定義とWorker::newのシグニチャです。ThreadPoolexecuteメソッドは、 仕事をMessage::NewJob列挙子に包んで送信する必要があります。それから、 Messageがチャンネルから受け取られるWorker::newで、NewJob列挙子が受け取られたら、 仕事が処理され、Terminate列挙子が受け取られたら、スレッドはループを抜けます。

これらの変更と共に、コードはコンパイルでき、リスト20-21の後と同じように機能し続けます。ですが、 Terminateのメッセージを何も生成していないので、警告が出るでしょう。 Drop実装をリスト20-25のような見た目に変更してこの警告を修正しましょう。

ファイル名: src/lib.rs

impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } // 全ワーカーを閉じます println!("Shutting down all workers."); for worker in &mut self.workers { // ワーカー{}を閉じます println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

リスト20-25: 各ワーカースレッドに対してjoinを呼び出す前にワーカーにMessage::Terminateを送信する

今では、ワーカーを2回走査しています: 各ワーカーにTerminateメッセージを送信するために1回と、 各ワーカースレッドにjoinを呼び出すために1回です。メッセージ送信とjoinを同じループで即座に行おうとすると、 現在の繰り返しのワーカーがチャンネルからメッセージを受け取っているものであるか保証できなくなってしまいます。

2つの個別のループが必要な理由をよりよく理解するために、2つのワーカーがある筋書きを想像してください。 単独のループで各ワーカーを走査すると、最初の繰り返しでチャンネルに停止メッセージが送信され、 joinが最初のワーカースレッドで呼び出されます。その最初のワーカーが現在、リクエストの処理で忙しければ、 2番目のワーカーがチャンネルから停止メッセージを受け取り、閉じます。最初のワーカーの終了待ちをしたままですが、 2番目のスレッドが停止メッセージを拾ってしまったので、終了することは絶対にありません。デッドロックです!

この筋書きを回避するために、1つのループでまず、チャンネルに対して全てのTerminateメッセージを送信します; そして、別のループで全スレッドのjoinを待ちます。一旦停止メッセージを受け取ったら、各ワーカーはチャンネルからのリクエストの受付をやめます。 故に、存在するワーカーと同じ数だけ停止メッセージを送れば、joinがスレッドに対して呼び出される前に、 停止メッセージを各ワーカーが受け取ると確信できるわけです。

このコードが動いているところを確認するために、mainを変更してサーバを正常に閉じる前に2つしかリクエストを受け付けないようにしましょう。 リスト20-26のようにですね。

ファイル名: src/bin/main.rs

fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }

リスト20-26: ループを抜けることで、2つのリクエストを処理した後にサーバを閉じる

現実世界のWebサーバには、たった2つリクエストを受け付けた後にシャットダウンしてほしくはないでしょう。 このコードは、単に正常なシャットダウンとクリーンアップが正しく機能することを示すだけです。

takeメソッドは、Iteratorトレイトで定義されていて、最大でも繰り返しを最初の2つの要素だけに制限します。 ThreadPoolmainの末端でスコープを抜け、drop実装が実行されます。

cargo runでサーバを開始し、3つリクエストを行なってください。3番目のリクエストはエラーになるはずで、 端末にはこのような出力が目撃できるはずです:

$ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs Running `target/debug/hello` Worker 0 got a job; executing. Worker 3 got a job; executing. Shutting down. Sending terminate message to all workers. Shutting down all workers. Shutting down worker 0 Worker 1 was told to terminate. Worker 2 was told to terminate. Worker 0 was told to terminate. Worker 3 was told to terminate. Shutting down worker 1 Shutting down worker 2 Shutting down worker 3

ワーカーとメッセージの順番は異なる可能性があります。どうやってこのコードが動くのかメッセージからわかります: ワーカー0と3が最初の2つのリクエストを受け付け、そして3番目のリクエストではサーバは接続の受け入れをやめます。 mainの最後でThreadPoolがスコープを抜ける際、Drop実装が割り込み、プールが全ワーカーに停止するよう指示します。 ワーカーはそれぞれ、停止メッセージを確認した時にメッセージを出力し、それからスレッドプールは各ワーカースレッドを閉じるjoinを呼び出します。

この特定の実行のある面白い側面に注目してください: ThreadPoolはチャンネルに停止メッセージを送信しますが、 どのワーカーがそのメッセージを受け取るよりも前に、ワーカー0のjoinを試みています。ワーカー0はまだ停止メッセージを受け取っていなかったので、 メインスレッドはワーカー0が完了するまで待機してブロックされます。その間に、各ワーカーは停止メッセージを受け取ります。 ワーカー0が完了したら、メインスレッドは残りのワーカーが完了するのを待機します。その時点で全ワーカーは停止メッセージを受け取った後で、 閉じることができたのです。

おめでとうございます!プロジェクトを完成させました; スレッドプールを使用して非同期に応答する基本的なWebサーバができました。 サーバの正常なシャットダウンを行うことができ、プールの全スレッドを片付けます。

参考までに、こちらが全コードです:

ファイル名: src/bin/main.rs

extern crate hello; use hello::ThreadPool; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::fs::File; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } // 閉じます println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let mut file = File::open(filename).unwrap(); let mut contents = String::new(); file.read_to_string(&mut contents).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }

ファイル名: src/lib.rs

#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; enum Message { NewJob(Job), Terminate, } pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<FnBox + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move ||{ loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job.call_box(); }, Message::Terminate => { println!("Worker {} was told to terminate.", id); break; }, } } }); Worker { id, thread: Some(thread), } } } }

ここでできることはまだあるでしょう!よりこのプロジェクトを改善したいのなら、こちらがアイディアの一部です:

  • ThreadPoolとその公開メソッドにもっとドキュメンテーションを追加する。
  • ライブラリの機能のテストを追加する。
  • unwrapの呼び出しをもっと頑健なエラー処理に変更する。
  • ThreadPoolを使用してWebリクエスト以外のなんらかの作業を行う。
  • https://crates.io でスレッドプールのクレートを探して、そのクレートを代わりに使用して似たWebサーバを実装する。 そして、APIと頑健性を我々が実装したものと比較する。

まとめ

よくやりました!本の最後に到達しました!Rustのツアーに参加していただき、感謝の辞を述べたいです。 もう、ご自身のRustプロジェクトや他の方のプロジェクトのお手伝いをする準備ができています。 あなたがこれからのRustの旅で遭遇する、あらゆる困難の手助けを是非とも行いたいRustaceanたちの温かいコミュニティがあることを心に留めておいてくださいね。