正常なシャットダウンと片付け
リスト20-21のコードは、意図した通り、スレッドプールの使用を通してリクエストに非同期に応答できます。
直接使用していないworkers
、id
、thread
フィールドについて警告が出ます。この警告は、現在のコードは何も片付けていないことを思い出させてくれます。
優美さに欠けるctrl-cを使用してメインスレッドを停止させる方法を使用すると、
リクエストの処理中であっても、他のスレッドも停止します。
では、閉じる前に取り掛かっているリクエストを完了できるように、プールの各スレッドに対してjoin
を呼び出すDrop
トレイトを実装します。
そして、スレッドに新しいリクエストの受付を停止し、終了するように教える方法を実装します。
このコードが動いているのを確かめるために、サーバを変更して正常にスレッドプールを終了する前に2つしかリクエストを受け付けないようにします。
ThreadPool
にDrop
トレイトを実装する
スレッドプールにDrop
を実装するところから始めましょう。プールがドロップされると、
スレッドは全てjoinして、作業を完了するのを確かめるべきです。リスト20-23は、Drop
実装の最初の試みを表示しています;
このコードはまだ完全には動きません。
ファイル名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
// ワーカー{}を終了します
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
まず、スレッドプールworkers
それぞれを走査します。self
は可変参照であり、worker
を可変化できる必要もあるので、
これには&mut
を使用しています。ワーカーそれぞれに対して、特定のワーカーを終了する旨のメッセージを出力し、
それからjoin
をワーカースレッドに対して呼び出しています。join
の呼び出しが失敗したら、
unwrap
を使用してRustをパニックさせ、正常でないシャットダウンに移行します。
こちらが、このコードをコンパイルする際に出るエラーです:
error[E0507]: cannot move out of borrowed content
--> src/lib.rs:65:13
|
65 | worker.thread.join().unwrap();
| ^^^^^^ cannot move out of borrowed content
各worker
の可変参照しかなく、join
は引数の所有権を奪うためにこのエラーはjoin
を呼び出せないと教えてくれています。
この問題を解決するには、join
がスレッドを消費できるように、thread
を所有するWorker
インスタンスからスレッドをムーブする必要があります。
これをリスト17-15では行いました: Worker
が代わりにOption<thread::JoinHandle<()>>
を保持していれば、
Option
に対してtake
メソッドを呼び出し、Some
列挙子から値をムーブし、その場所にNone
列挙子を残すことができます。
言い換えれば、実行中のWorker
にはthread
にSome
列挙子があり、Worker
を片付けたい時には、
ワーカーが実行するスレッドがないようにSome
をNone
で置き換えるのです。
従って、Worker
の定義を以下のように更新したいことがわかります:
ファイル名: src/lib.rs
#![allow(unused)] fn main() { use std::thread; struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } }
さて、コンパイラを頼りにして他に変更する必要がある箇所を探しましょう。このコードをチェックすると、 2つのエラーが出ます:
error[E0599]: no method named `join` found for type
`std::option::Option<std::thread::JoinHandle<()>>` in the current scope
--> src/lib.rs:65:27
|
65 | worker.thread.join().unwrap();
| ^^^^
error[E0308]: mismatched types
--> src/lib.rs:89:13
|
89 | thread,
| ^^^^^^
| |
| expected enum `std::option::Option`, found struct
`std::thread::JoinHandle`
| help: try using a variant of the expected type: `Some(thread)`
|
= note: expected type `std::option::Option<std::thread::JoinHandle<()>>`
found type `std::thread::JoinHandle<_>`
2番目のエラーを扱いましょう。これは、Worker::new
の最後のコードを指しています; 新しいWorker
を作成する際に、
Some
にthread
の値を包む必要があります。このエラーを修正するために以下の変更を行なってください:
ファイル名: src/lib.rs
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
Worker {
id,
thread: Some(thread),
}
}
}
最初のエラーはDrop
実装内にあります。先ほど、Option
値に対してtake
を呼び出し、
thread
をworker
からムーブする意図があることに触れました。以下の変更がそれを行います:
ファイル名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
第17章で議論したように、Option
のtake
メソッドは、Some
列挙子を取り出し、その箇所にNone
を残します。
if let
を使用してSome
を分配し、スレッドを得ています; そして、スレッドに対してjoin
を呼び出します。
ワーカーのスレッドが既にNone
なら、ワーカーはスレッドを既に片付け済みであることがわかるので、
その場合には何も起きません。
スレッドに仕事をリッスンするのを止めるよう通知する
これらの変更によって、コードは警告なしでコンパイルできます。ですが悪い知らせは、このコードが期待したようにはまだ機能しないことです。
鍵は、Worker
インスタンスのスレッドで実行されるクロージャのロジックです: 現時点でjoin
を呼び出していますが、
仕事を求めて永遠にloop
するので、スレッドを終了しません。現在のdrop
の実装でThreadPool
をドロップしようとしたら、
最初のスレッドが完了するのを待機してメインスレッドは永遠にブロックされるでしょう。
この問題を修正するには、スレッドが、実行すべきJob
か、リッスンをやめて無限ループを抜ける通知をリッスンするように、
変更します。Job
インスタンスの代わりに、チャンネルはこれら2つのenum列挙子の一方を送信します。
ファイル名: src/lib.rs
#![allow(unused)] fn main() { struct Job; enum Message { NewJob(Job), Terminate, } }
このMessage
enumはスレッドが実行すべきJob
を保持するNewJob
列挙子か、スレッドをループから抜けさせ、
停止させるTerminate
列挙子のどちらかになります。
チャンネルを調整し、型Job
ではなく、型Message
を使用するようにする必要があります。リスト20-24のようにですね。
ファイル名: src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
// --snip--
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
// ワーカー{}は停止するよう指示された
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Message
enumを具体化するために、2箇所でJob
をMessage
に変更する必要があります:
ThreadPool
の定義とWorker::new
のシグニチャです。ThreadPool
のexecute
メソッドは、
仕事をMessage::NewJob
列挙子に包んで送信する必要があります。それから、
Message
がチャンネルから受け取られるWorker::new
で、NewJob
列挙子が受け取られたら、
仕事が処理され、Terminate
列挙子が受け取られたら、スレッドはループを抜けます。
これらの変更と共に、コードはコンパイルでき、リスト20-21の後と同じように機能し続けます。ですが、
Terminate
のメッセージを何も生成していないので、警告が出るでしょう。
Drop
実装をリスト20-25のような見た目に変更してこの警告を修正しましょう。
ファイル名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
// 全ワーカーを閉じます
println!("Shutting down all workers.");
for worker in &mut self.workers {
// ワーカー{}を閉じます
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
今では、ワーカーを2回走査しています: 各ワーカーにTerminate
メッセージを送信するために1回と、
各ワーカースレッドにjoin
を呼び出すために1回です。メッセージ送信とjoin
を同じループで即座に行おうとすると、
現在の繰り返しのワーカーがチャンネルからメッセージを受け取っているものであるか保証できなくなってしまいます。
2つの個別のループが必要な理由をよりよく理解するために、2つのワーカーがある筋書きを想像してください。
単独のループで各ワーカーを走査すると、最初の繰り返しでチャンネルに停止メッセージが送信され、
join
が最初のワーカースレッドで呼び出されます。その最初のワーカーが現在、リクエストの処理で忙しければ、
2番目のワーカーがチャンネルから停止メッセージを受け取り、閉じます。最初のワーカーの終了待ちをしたままですが、
2番目のスレッドが停止メッセージを拾ってしまったので、終了することは絶対にありません。デッドロックです!
この筋書きを回避するために、1つのループでまず、チャンネルに対して全てのTerminate
メッセージを送信します;
そして、別のループで全スレッドのjoinを待ちます。一旦停止メッセージを受け取ったら、各ワーカーはチャンネルからのリクエストの受付をやめます。
故に、存在するワーカーと同じ数だけ停止メッセージを送れば、join
がスレッドに対して呼び出される前に、
停止メッセージを各ワーカーが受け取ると確信できるわけです。
このコードが動いているところを確認するために、main
を変更してサーバを正常に閉じる前に2つしかリクエストを受け付けないようにしましょう。
リスト20-26のようにですね。
ファイル名: src/bin/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
現実世界のWebサーバには、たった2つリクエストを受け付けた後にシャットダウンしてほしくはないでしょう。 このコードは、単に正常なシャットダウンとクリーンアップが正しく機能することを示すだけです。
take
メソッドは、Iterator
トレイトで定義されていて、最大でも繰り返しを最初の2つの要素だけに制限します。
ThreadPool
はmain
の末端でスコープを抜け、drop
実装が実行されます。
cargo run
でサーバを開始し、3つリクエストを行なってください。3番目のリクエストはエラーになるはずで、
端末にはこのような出力が目撃できるはずです:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
ワーカーとメッセージの順番は異なる可能性があります。どうやってこのコードが動くのかメッセージからわかります:
ワーカー0と3が最初の2つのリクエストを受け付け、そして3番目のリクエストではサーバは接続の受け入れをやめます。
main
の最後でThreadPool
がスコープを抜ける際、Drop
実装が割り込み、プールが全ワーカーに停止するよう指示します。
ワーカーはそれぞれ、停止メッセージを確認した時にメッセージを出力し、それからスレッドプールは各ワーカースレッドを閉じるjoin
を呼び出します。
この特定の実行のある面白い側面に注目してください: ThreadPool
はチャンネルに停止メッセージを送信しますが、
どのワーカーがそのメッセージを受け取るよりも前に、ワーカー0のjoinを試みています。ワーカー0はまだ停止メッセージを受け取っていなかったので、
メインスレッドはワーカー0が完了するまで待機してブロックされます。その間に、各ワーカーは停止メッセージを受け取ります。
ワーカー0が完了したら、メインスレッドは残りのワーカーが完了するのを待機します。その時点で全ワーカーは停止メッセージを受け取った後で、
閉じることができたのです。
おめでとうございます!プロジェクトを完成させました; スレッドプールを使用して非同期に応答する基本的なWebサーバができました。 サーバの正常なシャットダウンを行うことができ、プールの全スレッドを片付けます。
参考までに、こちらが全コードです:
ファイル名: src/bin/main.rs
extern crate hello;
use hello::ThreadPool;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
// 閉じます
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let mut file = File::open(filename).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
ファイル名: src/lib.rs
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; enum Message { NewJob(Job), Terminate, } pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<FnBox + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move ||{ loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job.call_box(); }, Message::Terminate => { println!("Worker {} was told to terminate.", id); break; }, } } }); Worker { id, thread: Some(thread), } } } }
ここでできることはまだあるでしょう!よりこのプロジェクトを改善したいのなら、こちらがアイディアの一部です:
ThreadPool
とその公開メソッドにもっとドキュメンテーションを追加する。- ライブラリの機能のテストを追加する。
unwrap
の呼び出しをもっと頑健なエラー処理に変更する。ThreadPool
を使用してWebリクエスト以外のなんらかの作業を行う。- https://crates.io でスレッドプールのクレートを探して、そのクレートを代わりに使用して似たWebサーバを実装する。 そして、APIと頑健性を我々が実装したものと比較する。
まとめ
よくやりました!本の最後に到達しました!Rustのツアーに参加していただき、感謝の辞を述べたいです。 もう、ご自身のRustプロジェクトや他の方のプロジェクトのお手伝いをする準備ができています。 あなたがこれからのRustの旅で遭遇する、あらゆる困難の手助けを是非とも行いたいRustaceanたちの温かいコミュニティがあることを心に留めておいてくださいね。