シングルスレッドサーバをマルチスレッド化する

現状、サーバはリクエストを順番に処理します。つまり、最初のが処理し終わるまで、2番目の接続は処理しないということです。 サーバが受け付けるリクエストの量が増えるほど、この連続的な実行は、最適ではなくなるでしょう。 サーバが処理するのに長い時間がかかるリクエストを受け付けたら、新しいリクエストは迅速に処理できても、 続くリクエストは長いリクエストが完了するまで待たなければならなくなるでしょう。これを修正する必要がありますが、 まずは、実際に問題が起こっているところを見ます。

現在のサーバの実装で遅いリクエストをシミュレーションする

処理が遅いリクエストが現在のサーバ実装に対して行われる他のリクエストにどう影響するかに目を向けます。 リスト20-10は、応答する前に5秒サーバをスリープさせる遅いレスポンスをシミュレーションした /sleepへのリクエストを扱う実装です。

ファイル名: src/main.rs


# #![allow(unused_variables)]
#fn main() {
use std::thread;
use std::time::Duration;
# use std::io::prelude::*;
# use std::net::TcpStream;
# use std::fs::File;
// --snip--

fn handle_connection(mut stream: TcpStream) {
#     let mut buffer = [0; 512];
#     stream.read(&mut buffer).unwrap();
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}
#}

リスト20-10: /sleepを認識して5秒間スリープすることで遅いリクエストをシミュレーションする

このコードはちょっと汚いですが、シミュレーション目的には十分です。2番目のリクエストsleepを作成し、 そのデータをサーバは認識します。ifブロックの後に else ifを追加し、/sleepへのリクエストを確認しています。 そのリクエストが受け付けられると、サーバは成功のHTMLページを描画する前に5秒間スリープします。

我々のサーバがどれだけ基礎的か見て取れます: 本物のライブラリは、もっと冗長でない方法で複数のリクエストの認識を扱うでしょう!

cargo runでサーバを開始してください。それから2つブラウザのウインドウを開いてください: 1つは、 http://localhost:7878/ 用、そしてもう1つはhttp://localhost:7878/sleep 用です。 以前のように / URIを数回入力したら、素早く応答するでしょう。しかし、/sleepを入力し、それから / をロードしたら、 sleepがロードする前にきっかり5秒スリープし終わるまで、/ は待機するのを目撃するでしょう。

より多くのリクエストが遅いリクエストの背後に回ってしまうのを回避するようWebサーバが動く方法を変える方法は複数あります; これから実装するのは、スレッドプールです。

スレッドプールでスループットを向上させる

スレッドプールは、待機し、タスクを処理する準備のできた一塊りの大量に生成されたスレッドです。 プログラムが新しいタスクを受け取ったら、プールのスレッドのどれかをタスクにあてがい、 そのスレッドがそのタスクを処理します。 プールの残りのスレッドは、最初のスレッドが処理中にやってくる他のあらゆるタスクを扱うために利用可能です。 最初のスレッドがタスクの処理を完了したら、新しいタスクを処理する準備のできたアイドル状態のスレッドプールに戻ります。 スレッドプールにより、並行で接続を処理でき、サーバのスループットを向上させます。

プール内のスレッド数は、小さい数字に制限し、DoS攻撃から保護します; リクエストが来た度に新しいスレッドをプログラムが生成したら、 1000万リクエストをサーバに行う誰かが、サーバのリソースを使い尽くし、リクエストの処理を停止に追い込むことで、 大混乱を招くことができてしまうでしょう。

無制限にスレッドを大量生産するのではなく、プールに固定された数のスレッドを待機させます。リクエストが来る度に、 処理するためにプールに送られます。プールは、やって来るリクエストのキューを管理します。 プールの各スレッドがこのキューからリクエストを取り出し、リクエストを処理し、そして、別のリクエストをキューに要求します。 この設計により、Nリクエストを並行して処理でき、ここでNはスレッド数です。各スレッドが実行に時間のかかるリクエストに応答したら、 続くリクエストはそれでも、キュー内で待機させられてしまいますが、その地点に到達する前に扱える時間のかかるリクエスト数を増加させました。

このテクニックは、Webサーバのスループットを向上させる多くの方法の1つに過ぎません。探求する可能性のある他の選択肢は、 fork/joinモデルと、シングルスレッドの非同期I/Oモデルです。この話題にご興味があれば、他の解決策についてもっと読み、 Rustで実装を試みることができます; Rustのような低レベル言語であれば、これらの選択肢全部が可能なのです。

スレッドプールを実装し始める前に、プールを使うのはどんな感じになるはずなのかについて語りましょう。コードの設計を試みる際、 クライアントのインターフェイスをまず書くことは、設計を導く手助けになることがあります。呼び出したいように構成されるよう、 コードのAPIを記述してください; そして、機能を実装してから公開APIの設計をするのではなく、その構造内で機能を実装してください。

第12章のプロジェクトでTDDを使用したように、ここではCompiler Driven Development(コンパイラ駆動開発)を使用します。 欲しい関数を呼び出すコードを書き、それからコンパイラの出すエラーを見てコードが動くように次に何を変更すべきかを決定します。

各リクエストに対してスレッドを立ち上げられる場合のコードの構造

まず、全接続に対して新しいスレッドを確かに生成した場合にコードがどんな見た目になるかを探求しましょう。 先ほど述べたように、無制限にスレッドを大量生産する可能性があるという問題のため、これは最終的な計画ではありませんが、 開始点です。リスト20-11は、新しいスレッドを立ち上げてforループ内で各ストリームを扱うためにmainに行う変更を示しています。

ファイル名: src/main.rs

# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
#
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}
# fn handle_connection(mut stream: TcpStream) {}

リスト20-11: 各ストリームに対して新しいスレッドを立ち上げる

第16章で学んだように、thread::spawnは新しいスレッドを生成し、それからクロージャ内のコードを新しいスレッドで実行します。 このコードを実行してブラウザで /sleepをロードし、それからもう2つのブラウザのタブで / をロードしたら、 確かに / へのリクエストは、/sleepが完了するのを待機しなくても済むことがわかるでしょう。 ですが、前述したように、無制限にスレッドを生成することになるので、これは最終的にシステムを参らせてしまうでしょう。

有限数のスレッド用に似たインターフェイスを作成する

スレッドからスレッドプールへの変更にAPIを使用するコードへの大きな変更が必要ないように、 スレッドプールには似た、馴染み深い方法で動作してほしいです。リスト20-12は、 thread::spawnの代わりに使用したいThreadPool構造体の架空のインターフェイスを表示しています。

ファイル名: src/main.rs

# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
# struct ThreadPool;
# impl ThreadPool {
#    fn new(size: u32) -> ThreadPool { ThreadPool }
#    fn execute<F>(&self, f: F)
#        where F: FnOnce() + Send + 'static {}
# }
#
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}
# fn handle_connection(mut stream: TcpStream) {}

リスト20-12: ThreadPoolの理想的なインターフェイス

ThreadPool::newを使用して設定可能なスレッド数で新しいスレッドプールを作成し、今回の場合は4です。 それからforループ内で、pool.executeは、プールが各ストリームに対して実行すべきクロージャを受け取るという点で、 thread::spawnと似たインターフェイスです。pool.executeを実装する必要があるので、 クロージャを取り、実行するためにプール内のスレッドに与えます。このコードはまだコンパイルできませんが、 コンパイラがどう修正したらいいかガイドできるように試してみます。

コンパイラ駆動開発でThreadPool構造体を構築する

リスト20-12の変更をsrc/main.rsに行い、それから開発を駆動するためにcargo checkからのコンパイラエラーを活用しましょう。 こちらが得られる最初のエラーです:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
(エラー: 解決に失敗しました。未定義の型またはモジュール`ThreadPool`を使用しています)
  --> src\main.rs:10:16
   |
10 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ Use of undeclared type or module
   `ThreadPool`

error: aborting due to previous error

よろしい!このエラーはThreadPool型かモジュールが必要なことを教えてくれているので、今構築します。 ThreadPoolの実装は、Webサーバが行う仕事の種類とは独立しています。従って、helloクレートをバイナリクレートからライブラリクレートに切り替え、 ThreadPoolの実装を保持させましょう。ライブラリクレートに変更後、 個別のスレッドプールライブラリをWebリクエストを提供するためだけではなく、スレッドプールでしたいあらゆる作業にも使用できます。

以下を含むsrc/lib.rsを生成してください。これは、現状存在できる最も単純なThreadPoolの定義です:

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
pub struct ThreadPool;
#}

それから新しいディレクトリ、src/binを作成し、src/main.rsに根付くバイナリクレートをsrc/bin/main.rsに移動してください。 そうすると、ライブラリクレートがhelloディレクトリ内でプライマリクレートになります; それでも、 cargo runsrc/bin/main.rsのバイナリを実行することはできます。main.rsファイルを移動後、 編集してライブラリクレートを持ち込み、以下のコードをsrc/bin/main.rsの先頭に追記してThreadPoolをスコープに導入してください:

ファイル名: src/bin/main.rs

extern crate hello;
use hello::ThreadPool;

このコードはまだ動きませんが、再度それを確認して扱う必要のある次のエラーを手に入れましょう:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
(エラー: 現在のスコープで型`hello::ThreadPool`の関数または関連アイテムに`new`というものが見つかりません)
 --> src/bin/main.rs:13:16
   |
13 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ function or associated item not found in
   `hello::ThreadPool`

このエラーは、次に、ThreadPoolに対してnewという関連関数を作成する必要があることを示唆しています。 また、newには4を引数として受け入れる引数1つがあり、ThreadPoolインスタンスを返すべきということも知っています。 それらの特徴を持つ最も単純なnew関数を実装しましょう:

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}
#}

size引数の型として、usizeを選択しました。何故なら、マイナスのスレッド数は、何も筋が通らないことを知っているからです。 また、この4をスレッドのコレクションの要素数として使用し、第3章の「整数型」節で議論したように、これはusizeのあるべき姿であることも知っています。

コードを再度確認しましょう:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
(警告: 未使用の変数: `size`)
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/bin/main.rs:18:14
   |
18 |         pool.execute(|| {
   |              ^^^^^^^

今度は、警告とエラーが出ました。一時的に警告は無視して、ThreadPoolexecuteメソッドがないためにエラーが発生しました。 「有限数のスレッド用に似たインターフェイスを作成する」節で我々のスレッドプールは、 thread::spawnと似たインターフェイスにするべきと決定したことを思い出してください。 さらに、execute関数を実装するので、与えられたクロージャを取り、実行するようにプールの待機中のスレッドに渡します。

ThreadPoolexecuteメソッドをクロージャを引数として受け取るように定義します。 第13章の「ジェネリック引数とFnトレイトを使用してクロージャを保存する」節から、 3つの異なるトレイトでクロージャを引数として取ることができることを思い出してください: FnFnMutFnOnceです。 ここでは、どの種類のクロージャを使用するか決定する必要があります。最終的には、 標準ライブラリのthread::spawn実装に似たことをすることがわかっているので、 thread::spawnのシグニチャで引数にどんな境界があるか見ることができます。ドキュメンテーションは、以下のものを示しています:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

F型引数がここで関心のあるものです; T型引数は戻り値と関係があり、関心はありません。spawnは、 Fのトレイト境界としてFnOnceを使用していることが確認できます。これはおそらく、我々が欲しているものでもあるでしょう。 というのも、最終的にはexecuteで得た引数をspawnに渡すからです。さらにFnOnceは使用したいトレイトであると自信を持つことができます。 リクエストを実行するスレッドは、そのリクエストのクロージャを1回だけ実行し、これはFnOnceOnceに合致するからです。

F型引数にはまた、トレイト境界のSendとライフタイム境界の'staticもあり、この状況では有用です: あるスレッドから別のスレッドにクロージャを移動するのにSendが必要で、スレッドの実行にどれくらいかかるかわからないので、 'staticも必要です。ThreadPoolにこれらの境界のジェネリックな型Fの引数を取るexecuteメソッドを生成しましょう:

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
# pub struct ThreadPool;
impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}
#}

それでも、FnOnceの後に()を使用しています。このFnOnceは引数を取らず、値も返さないクロージャを表すからです。 関数定義同様に、戻り値の型はシグニチャから省略できますが、引数がなくても、カッコは必要です。

またもや、これがexecuteメソッドの最も単純な実装です: 何もしませんが、 コードがコンパイルできるようにしようとしているだけです。再確認しましょう:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

warning: unused variable: `f`
 --> src/lib.rs:8:30
  |
8 |     pub fn execute<F>(&self, f: F)
  |                              ^
  |
  = note: to avoid this warning, consider using `_f` instead

これで警告を受け取るだけになり、コンパイルできるようになりました!しかし、cargo runを試して、 ブラウザでリクエストを行うと、章の冒頭で見かけたエラーがブラウザに現れることに注意してください。 ライブラリは、まだ実際にexecuteに渡されたクロージャを呼び出していないのです!

注釈: HaskellやRustなどの厳密なコンパイラがある言語についての格言として「コードがコンパイルできたら、 動作する」というものをお聴きになったことがある可能性があります。ですが、この格言は普遍的に当てはまるものではありません。 このプロジェクトはコンパイルできますが、全く何もしません!本物の完璧なプロジェクトを構築しようとしているのなら、 ここがユニットテストを書き始めて、コードがコンパイルでき、かつ欲しい振る舞いを保持していることを確認するのに良い機会でしょう。

newでスレッド数を検査する

newexecuteの引数で何もしていないので、警告が出続けます。欲しい振る舞いでこれらの関数の本体を実装しましょう。 まずはじめに、newを考えましょう。先刻、size引数に非負整数型を選択しました。負のスレッド数のプールは、 全く道理が通らないからです。しかしながら、0スレッドのプールも全く意味がわかりませんが、0も完全に合法なusizeです。 ThreadPoolインスタンスを返す前にsizeが0よりも大きいことを確認するコードを追加し、リスト20-13に示したように、 assert!マクロを使用することで0を受け取った時にプログラムをパニックさせます。

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
# pub struct ThreadPool;
impl ThreadPool {
    /// 新しいThreadPoolを生成する。
    ///
    /// sizeがプールのスレッド数です。
    ///
    /// # パニック
    ///
    /// sizeが0なら、`new`関数はパニックします。
    ///
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}
#}

リスト20-13: ThreadPool::newを実装してsizeが0ならパニックする

doc commentでThreadPoolにドキュメンテーションを追加しました。第14章で議論したように、 関数がパニックする場面を声高に叫ぶセクションを追加することで、 いいドキュメンテーションの実践に(なら)っていることに注意してください。 試しにcargo doc --openを実行し、ThreadPool構造体をクリックして、newの生成されるドキュメンテーションがどんな感じが確かめてください!

ここでしたようにassert!マクロを追加する代わりに、リスト12-9のI/OプロジェクトのConfig::newのように、 newResultを返させることもできるでしょう。しかし、今回の場合、スレッドなしでスレッドプールを作成しようとするのは、 回復不能なエラーであるべきと決定しました。野心を感じるのなら、以下のシグニチャのnewも書いてみて、両者を比較してみてください:

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

スレッドを格納するスペースを生成する

今や、プールに格納する合法なスレッド数を知る方法ができたので、ThreadPool構造体を返す前にスレッドを作成して格納できます。 ですが、どのようにスレッドを「格納」するのでしょうか?もう一度、thread::spawnシグニチャを眺めてみましょう:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

spawn関数は、JoinHandle<T>を返し、ここでTは、クロージャが返す型です。試しに同じようにJoinHandleを使ってみて、 どうなるか見てみましょう。我々の場合、スレッドプールに渡すクロージャは接続を扱い、何も返さないので、 Tはユニット型()になるでしょう。

リスト20-14のコードはコンパイルできますが、まだスレッドは何も生成しません。ThreadPoolの定義を変更して、 thread::JoinHandle<()>インスタンスのベクタを保持し、sizeキャパシティのベクタを初期化し、 スレッドを生成する何らかのコードを実行するforループを設定し、それらを含むThreadPoolインスタンスを返します。

ファイル名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // スレッドを生成してベクタに格納する
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    // --snip--
}

リスト20-14: ThreadPoolにスレッドを保持するベクタを生成する

ライブラリクレート内でstd::threadをスコープに導入しました。ThreadPoolのベクタの要素の型として、 thread::JoinHandleを使用しているからです。

一旦、合法なサイズを受け取ったら、ThreadPoolsize個の要素を保持できる新しいベクタを生成します。 この本ではまだ、with_capacity関数を使用したことがありませんが、これはVec::newと同じ作業をしつつ、 重要な違いがあります: ベクタに予めスペースを確保しておくのです。ベクタにsize個の要素を格納する必要があることはわかっているので、 このメモリ確保を前もってしておくと、Vec::newよりも少しだけ効率的になります。Vec::newは、 要素が挿入されるにつれて、自身のサイズを変更します。

再びcargo checkを実行すると、もういくつか警告が出るものの、成功するはずです。

ThreadPoolからスレッドにコードを送信する責任を負うWorker構造体

リスト20-14のforループにスレッドの生成に関するコメントを残しました。ここでは、実際にスレッドを生成する方法に目を向けます。 標準ライブラリはスレッドを生成する手段としてthread::spawnを提供し、thread::spawnは、 生成されるとすぐにスレッドが実行すべき何らかのコードを得ることを予期します。ところが、我々の場合、 スレッドを生成して、後ほど送信するコードを待機してほしいです。標準ライブラリのスレッドの実装は、 それをするいかなる方法も含んでいません; それを手動で実装しなければなりません。

この新しい振る舞いを管理するスレッドとThreadPool間に新しいデータ構造を導入することでこの振る舞いを実装します。 このデータ構造をWorkerと呼び、プール実装では一般的な用語です。レストランのキッチンで働く人々を思い浮かべてください: 労働者は、お客さんからオーダーが来るまで待機し、それからそれらのオーダーを取り、満たすことに責任を負います。

スレッドプールにJoinHanlde<()>インスタンスのベクタを格納する代わりに、Worker構造体のインスタンスを格納します。 各Workerが単独のJoinHandle<()>インスタンスを格納します。そして、Workerに実行するコードのクロージャを取り、 既に走っているスレッドに実行してもらうために送信します。ログを取ったり、デバッグする際にプールの異なるワーカーを区別できるように、 各ワーカーにidも付与します。

ThreadPoolを生成する際に発生することに以下の変更を加えましょう。このようにWorkerをセットアップした後に、 スレッドにクロージャを送信するコードを実装します:

  1. idJoinHandle<()>を保持するWorker構造体を定義する。
  2. ThreadPoolを変更し、Workerインスタンスのベクタを保持する。
  3. id番号を取り、idと空のクロージャで大量生産されるスレッドを保持するWorkerインスタンスを返すWorker::new関数を定義する。
  4. ThreadPool::newforループカウンタを使用してidを生成し、そのidで新しいWorkerを生成し、ベクタにワーカーを格納する。

挑戦に積極的ならば、リスト20-15のコードを見る前にご自身でこれらの変更を実装してみてください。

いいですか?こちらが先ほどの変更を行う1つの方法を行ったリスト20-15です。

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}
#}

リスト20-15: ThreadPoolを変更してスレッドを直接保持するのではなく、Workerインスタンスを保持する

ThreadPoolのフィールド名をthreadsからworkersに変更しました。JoinHandle<()>インスタンスではなく、 Workerインスタンスを保持するようになったからです。forループのカウンタをWorker::newへの引数として使用し、 それぞれの新しいWorkerworkersというベクタに格納します。

外部のコード(src/bin/main.rsのサーバなど)は、ThreadPool内でWorker構造体を使用していることに関する実装の詳細を知る必要はないので、 Worker構造体とそのnew関数は非公開にしています。Worker::new関数は与えたidを使用し、 空のクロージャを使って新しいスレッドを立ち上げることで生成されるJoinHandle<()>インスタンスを格納します。

このコードはコンパイルでき、ThreadPool::newへの引数として指定した数のWorkerインスタンスを格納します。 ですがそれでもexecuteで得るクロージャを処理してはいません。次は、それをする方法に目を向けましょう。

チャンネル経由でスレッドにリクエストを送信する

さて、thread::spawnに与えられたクロージャが確かに何もしない問題に取り組みましょう。現在、 executeメソッドで実行したいクロージャを得ています。ですが、ThreadPoolの生成中、Workerそれぞれを生成する際に、 実行するクロージャをthread::spawnに与える必要があります。

作ったばかりのWorker構造体にThreadPoolが保持するキューから実行するコードをフェッチして、 そのコードをスレッドが実行できるように送信してほしいです。

第16章でこのユースケースにぴったりであろうチャンネル(2スレッド間コミュニケーションをとる単純な方法)について学びました。 チャンネルをキューの仕事として機能させ、executeThreadPoolからWorkerインスタンスに仕事を送り、 これが仕事をスレッドに送信します。こちらが計画です:

  1. ThreadPoolはチャンネルを生成し、チャンネルの送信側に就く。
  2. Workerそれぞれは、チャンネルの受信側に就く。
  3. チャンネルに送信したいクロージャを保持する新しいJob構造体を生成する。
  4. executeメソッドは、実行したい仕事をチャンネルの送信側に送信する。
  5. スレッド内で、Workerはチャンネルの受信側をループし、受け取ったあらゆる仕事のクロージャを実行する。

ThreadPool::new内でチャンネルを生成し、ThreadPoolインスタンスに送信側を保持することから始めましょう。リスト20-16のようにですね。 今の所、Job構造体は何も保持しませんが、チャンネルに送信する種類の要素になります。

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
# use std::thread;
// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}
#
# struct Worker {
#     id: usize,
#     thread: thread::JoinHandle<()>,
# }
#
# impl Worker {
#     fn new(id: usize) -> Worker {
#         let thread = thread::spawn(|| {});
#
#         Worker {
#             id,
#             thread,
#         }
#     }
# }
#}

リスト20-18: ThreadPoolを変更してJobインスタンスを送信するチャンネルの送信側を格納する

ThreadPool::new内で新しいチャンネルを生成し、プールに送信側を保持させています。これはコンパイルに成功しますが、 まだ警告があります。

スレッドプールがワーカーを生成する際に各ワーカーにチャンネルの受信側を試しに渡してみましょう。 受信側はワーカーが大量生産するスレッド内で使用したいことがわかっているので、クロージャ内でreceiver引数を参照します。 リスト20-17のコードはまだ完璧にはコンパイルできません。

ファイル名: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}

リスト20-17: チャンネルの受信側をワーカーに渡す

多少些細で率直な変更を行いました: チャンネルの受信側をWorker::newに渡し、それからクロージャの内側で使用しています。

このコードのチェックを試みると、このようなエラーが出ます:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:27:42
   |
27 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here in
   previous iteration of loop
   |
   = note: move occurs because `receiver` has type
   `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait

このコードは、receiverを複数のWorkerインスタンスに渡そうとしています。第16章を思い出すように、これは動作しません: Rustが提供するチャンネル実装は、複数の生成者、単独の消費者です。要するに、 チャンネルの消費側をクローンするだけでこのコードを修正することはできません。たとえできたとしても、 使用したいテクニックではありません; 代わりに、全ワーカー間で単独のreceiverを共有することで、 スレッド間に仕事を分配したいです。

さらに、チャンネルキューから仕事を取り出すことは、receiverを可変化することに関連するので、 スレッドには、receiverを共有して変更する安全な方法が必要です; さもなくば、 競合状態に陥る可能性があります(第16章で講義しました)。

第16章で議論したスレッド安全なスマートポインタを思い出してください: 複数のスレッドで所有権を共有しつつ、 スレッドに値を可変化させるためには、Arc<Mutex<T>>を使用する必要があります。Arc型は、 複数のワーカーに受信者を所有させ、Mutexにより、1度に受信者から1つの仕事をたった1つのワーカーが受け取ることを保証します。 リスト20-18は、行う必要のある変更を示しています。

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
# use std::thread;
# use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// --snip--

# pub struct ThreadPool {
#     workers: Vec<Worker>,
#     sender: mpsc::Sender<Job>,
# }
# struct Job;
#
impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

# struct Worker {
#     id: usize,
#     thread: thread::JoinHandle<()>,
# }
#
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
#         let thread = thread::spawn(|| {
#            receiver;
#         });
#
#         Worker {
#             id,
#             thread,
#         }
    }
}
#}

リスト20-18: ArcMutexを使用してワーカー間でチャンネルの受信側を共有する

ThreadPool::newで、チャンネルの受信側をArcMutexに置いています。新しいワーカーそれぞれに対して、 Arcをクローンして参照カウントを跳ね上げているので、ワーカーは受信側の所有権を共有することができます。

これらの変更でコードはコンパイルできます!ゴールはもうすぐそこです!

executeメソッドを実装する

最後にThreadPoolexecuteメソッドを実装しましょう。 Jobも構造体からexecuteが受け取るクロージャの型を保持するトレイトオブジェクトの型エイリアスに変更します。 第19章の「型エイリアスで型同義語を生成する」節で議論したように、型エイリアスにより長い型を短くできます。 リスト20-19をご覧ください。

ファイル名: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
// --snip--
# pub struct ThreadPool {
#     workers: Vec<Worker>,
#     sender: mpsc::Sender<Job>,
# }
# use std::sync::mpsc;
# struct Worker {}

type Job = Box<FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--
#}

リスト20-19: 各クロージャを保持するBoxに対してJob型エイリアスを生成し、それからチャンネルに仕事を送信する

executeで得たクロージャを使用して新しいJobインスタンスを生成した後、その仕事をチャンネルの送信側に送信しています。 送信が失敗した時のためにsendに対してunwrapを呼び出しています。これは例えば、全スレッドの実行を停止させるなど、 受信側が新しいメッセージを受け取るのをやめてしまったときなどに起こる可能性があります。現時点では、 スレッドの実行を止めることはできません: スレッドは、プールが存在する限り実行し続けます。 unwrapを使用している理由は、失敗する場合が起こらないとわかっているからですが、コンパイラにはわかりません。

ですが、まだ完全にやり終えたわけではありません!ワーカー内でthread::spawnに渡されているクロージャは、 それでもチャンネルの受信側を参照しているだけです。その代わりに、クロージャには永遠にループし、 チャンネルの受信側に仕事を要求し、仕事を得たらその仕事を実行してもらう必要があります。 リスト20-20に示した変更をWorker::newに行いましょう。

ファイル名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                // ワーカー{}は仕事を得ました; 実行します
                println!("Worker {} got a job; executing.", id);

                (*job)();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

リスト20-20: ワーカーのスレッドで仕事を受け取り、実行する

ここで、まずreceiverに対してlockを呼び出してミューテックスを獲得し、それからunwrapを呼び出して、 エラーの際にはパニックします。ロックの獲得は、ミューテックスが毒された状態なら失敗する可能性があり、 これは、他のどれかのスレッドがロックを保持している間に、解放するのではなく、パニックした場合に起き得ます。 この場面では、unwrapを呼び出してこのスレッドをパニックさせるのは、取るべき正当な行動です。 このunwrapをあなたにとって意味のあるエラーメッセージを伴うexpectに変更することは、ご自由に行なってください。

ミューテックスのロックを獲得できたら、recvを呼び出してチャンネルからJobを受け取ります。 最後のunwrapもここであらゆるエラーを超えていき、これはチャンネルの送信側を保持するスレッドが閉じた場合に発生する可能性があり、 受信側が閉じた場合にsendメソッドがErrを返すのと似ています。

recvの呼び出しはブロックするので、まだ仕事がなければ、現在のスレッドは、仕事が利用可能になるまで待機します。 Mutex<T>により、ただ1つのWorkerスレッドのみが一度に仕事の要求を試みることを保証します。

理論的には、このコードはコンパイルできるはずです。残念ながら、Rustコンパイラはまだ完全ではなく、 このようなエラーが出ます:

error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
(エラー: std::ops::FnOnce() + std::marker::Sendの値をムーブできません:
std::ops::FnOnce() + std::marker::Sendのサイズを静的に決定できません)
  --> src/lib.rs:63:17
   |
63 |                 (*job)();
   |                 ^^^^^^

問題が非常に謎めいているので、エラーも非常に謎めいています。Box<T>に格納されたFnOnceクロージャを呼び出すためには(Job型エイリアスがそう)、 呼び出す際にクロージャがselfの所有権を奪うので、 クロージャは自身をBox<T>からムーブする必要があります。一般的に、RustはBox<T>から値をムーブすることを許可しません。 コンパイラには、Box<T>の内側の値がどれほどの大きさなのか見当がつかないからです: 第15章でBox<T>に格納して既知のサイズの値を得たい未知のサイズの何かがあるためにBox<T>を正確に使用したことを思い出してください。

リスト17-15で見かけたように、記法self: Box<Self>を使用するメソッドを書くことができ、 これにより、メソッドはBox<T>に格納されたSelf値の所有権を奪うことができます。 それがまさしくここで行いたいことですが、残念ながらコンパイラはさせてくれません: クロージャが呼び出された際に振る舞いを実装するRustの一部は、self: Box<Self>を使用して実装されていないのです。 故に、コンパイラはまだこの場面においてself: Box<Self>を使用してクロージャの所有権を奪い、 クロージャをBox<T>からムーブできることを理解していないのです。

Rustは、コンパイラが改善できる箇所ではまだ、発展途上にありますが、将来的にリスト20-20のコードは、 ただ単純にうまく動くはずです。あなたのような方がこれや他の問題を修正するのに取り掛かっています!この本を完了したら、 是非ともあなたにも参加していただきたいです。

ですがとりあえず、手頃なトリックを使ってこの問題を回避しましょう。この場合、self: Box<Self>で、 Box<T>の内部の値の所有権を奪うことができることをコンパイラに明示的に教えてあげます; そして、一旦クロージャの所有権を得たら、呼び出せます。これには、 シグニチャにself: Box<Self>を使用するcall_boxというメソッドのある新しいトレイトFnBoxを定義すること、 FnOnce()を実装する任意の型に対してFnBoxを定義すること、型エイリアスを新しいトレイトを使用するように変更すること、 Workercall_boxメソッドを使用するように変更することが関連します。これらの変更は、 リスト20-21に表示されています。

ファイル名: src/lib.rs

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<FnBox + Send + 'static>;

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

リスト20-21: 新しいトレイトFnBoxを追加してBox<FnOnce()>の現在の制限を回避する

まず、FnBoxという新しいトレイトを作成します。このトレイトにはcall_boxという1つのメソッドがあり、 これは、self: Box<Self>を取ってselfの所有権を奪い、Box<T>から値をムーブする点を除いて、 他のFn*トレイトのcallメソッドと類似しています。

次に、FnOnce()トレイトを実装する任意の型Fに対してFnBoxトレイトを実装します。実質的にこれは、 あらゆるFnOnce()クロージャがcall_boxメソッドを使用できることを意味します。call_boxの実装は、 (*self)()を使用してBox<T>からクロージャをムーブし、クロージャを呼び出します。

これでJob型エイリアスには、新しいトレイトのFnBoxを実装する何かのBoxである必要が出てきました。 これにより、クロージャを直接呼び出す代わりにJob値を得た時にWorkercall_boxを使えます。 任意のFnOnce()クロージャに対してFnBoxトレイトを実装することは、チャンネルに送信する実際の値は何も変えなくてもいいことを意味します。 もうコンパイラは、我々が行おうとしていることが平気なことであると認識できます。

このトリックは非常にこそこそしていて複雑です。完璧に筋が通らなくても心配しないでください; いつの日か、完全に不要になるでしょう。

このトリックの実装で、スレッドプールは動く状態になります!cargo runを実行し、 リクエストを行なってください:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: #[warn(dead_code)] on by default

warning: field is never used: `id`
  --> src/lib.rs:61:5
   |
61 |     id: usize,
   |     ^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

warning: field is never used: `thread`
  --> src/lib.rs:62:5
   |
62 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

    Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功!もう非同期に接続を実行するスレッドプールができました。絶対に4つ以上のスレッドが生成されないので、 サーバが多くのリクエストを受け取っても、システムは過負荷にならないでしょう。/sleepにリクエストを行なっても、 サーバは他のスレッドに実行させることで他のリクエストを提供できるでしょう。

第18章でwhile letループを学んだ後で、なぜリスト20-22に示したようにワーカースレッドのコードを記述しなかったのか、 不思議に思っている可能性があります。

ファイル名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

リスト20-22: while letを使用したもう1つのWorker::newの実装

このコードはコンパイルでき、動きますが、望み通りのスレッドの振る舞いにはなりません: 遅いリクエストがそれでも、他のリクエストが処理されるのを待機させてしまうのです。理由はどこか捉えがたいものです: Mutex構造体には公開のunlockメソッドがありません。ロックの所有権が、 lockメソッドが返すLockResult<MutexGuard<T>>内のMutexGuard<T>のライフタイムに基づくからです。 コンパイル時には、ロックを保持していない限り、借用精査機はそうしたら、Mutexに保護されるリソースにはアクセスできないという規則を強制できます。 しかし、この実装は、MutexGuard<T>のライフタイムについて熟考しなければ、 意図したよりもロックが長い間保持される結果になり得ます。while式の値がブロックの間中スコープに残り続けるので、 ロックはjob.call_boxの呼び出し中保持されたままになり、他のワーカーが仕事を受け取れなくなるのです。

代わりにloopを使用し、ロックと仕事をブロックの外ではなく、内側で獲得することで、 lockメソッドが返すMutexGuardlet job文が終わると同時にドロップされます。 これにより、複数のリクエストを並行で提供し、ロックはrecvの呼び出しの間は保持されるけれども、 job.call_boxの呼び出しの前には解放されることを保証します。