読者です 読者をやめる 読者になる 読者になる

マルチコア時代の高速サーバーの実装

特にサーバー用途では、CPUがシングルコアに戻ってくることは考えにくい。

マルチコアCPUの性能を活かすにはマルチスレッドに対応したサーバーの実装が必要になるわけですが、マルチスレッドなプログラミングは往々にして「高負荷になると固まる」とか「たまに落ちる」といった悩ましいバグと戦わなければならず、イヤです。
かといってシングルスレッドでは、近い将来 32コアCPU! などが出てきたとき、たぶん性能を発揮できません。

そこで、そこそこデバッグしやすく、それでいて多コアCPUでもスケールするという落としどころを模索しているのですが、ボトルネックはネットワークIO周りにあるだろう*1という前提の元で、ネットワークIO部分だけをマルチスレッドで動かし、それ以外の部分をシングルスレッドで動かすというアーキテクチャを考えています。
ロジックの部分はマルチスレッドで書いても共有リソースにアクセスする度にロックしなければならないので、並列性が上がらず、実装コストに見合うほど性能は上がらない、のではなかろうか。


図にすると↓こんな感じです。


ソケットのpolling(epoll/kqueue/...)と、実際に読み込んだデータのバッファリング、そしてプロトコルのパースをInputの段階で行います。Dispatchの段階でパースされたメッセージを解釈し、どんな処理を行うかを決定します。Logicの段階で実際に処理を行い、Outputの段階で出力用のバッファリングと実際の送信を行います。

TheC10kProblemの分類で言えば、1. 各スレッドが複数のクライアントを受け付ける. そしてノンブロッキング I/O と レベル・トリガ型の完了通知(level-triggered readiness notification)を利用するですが、実際に処理を行うLogicの段階から見ると、非同期IOのように見えます。


このアーキテクチャの利点は、ロジックの部分をシングルスレッドで書けるという点です。ロックやタイミングの問題を考える必要が無く、気楽にプログラミングができます。LLとの相性も良好です。


悩みどころは、スレッド間通信とコンテキストスイッチは遅いというところです。平均の負荷が低めで、高スループットよりも低遅延を重視するアプリケーションの場合は、全部シングルスレッドの方が適しているかもしれません。

それから、必然的にイベント駆動型のプログラミングモデルになります。一本のスレッドで最後までクライアントの面倒を見るようなプロトコルだと、実装が大変かもしれません*2JSON-RPCのような非同期なプロトコルならとても自然に書けます。

また、データに区切りがあるメッセージ指向なプロトコルは自然に処理できますが、データが連続的に流れてくるようなストリーム指向なプロトコルは合いません。ストリームを適当に区切ってメッセージ単位にし、イベント駆動で動かすことになります。


このアーキテクチャの中で性能を上げるためには、

  • Input・Dispatchの段階でできるだけ多くの処理をする(並列性が良くなる)
  • 1回のメッセージの粒度を大きくする(コンテキストスイッチが減る)
  • パイプライン処理が可能なプロトコルにする(スレッド間通信の遅延を隠蔽できる)

といったポイントがありそうです。


使いどころ

このアーキテクチャの使いどころですが、たとえばmemcachedのようなメモリキャッシュサーバーや、分散ストレージサーバー、メールサーバー、リバースプロキシなどの実装にはうまく使えると思います。
HTTPはパイプライン処理はできますがレスポンスは同期的でストリーム指向なので、リバースプロキシとリアルサーバー間の通信はHTTPにこだわらない方が良さそうです。
Webサーバーの実装には適さない気がしますが、Cometサーバーの実装には合いそうです。


mp::iothreads + Ruby

このアーキテクチャのポイントは、ロジックの部分をシングルスレッドで書ける点にあります。シングルスレッドなので、Rubyも使えます。
というわけで、プロトコルにMessagePackを使って、ロジックの部分をRubyで記述するサンプルを書いてみました。基盤部分の実装はmp::iothreadsを使っています。

logic.rb

ストレージサーバーのようなものを想定しています。[メソッド名, 引数, id]というメッセージを受け取り、[返り値, エラー, id]という結果を返します(返り値とエラーのどちらか一方はnil)。
["set", ["key", "value"], id]でデータをキーを保存し、["get", ["key"], id]でデータを取り出します。

require 'rubygems'
require 'msgpack'

@db = {}

def receive_request(method, params_raw, id)
  params = MessagePack.unpack(params_raw)

  if method == "get"
    key = *params
    return [@db[key], nil, id].to_msgpack

  elsif method == "set"
    key, data = *params
    @db[key] = data
    return ["stored", nil, id].to_msgpack

  end

  raise "unknown method #{method}"
rescue Exception
  [nil, $!.to_s, id].to_msgpack
end
rpc.cc

上のRubyのコードを呼び出すC++のコードです。mp::iothreadsを使っているので、記述しているのはプロトコルをパースする部分と、Rubyのコードを呼び出す部分がほとんどです。バッファリングとプロトコルのパースもMessagePackのC++ライブラリがやってくれるので、総量は200行に満たない小さなプログラムになっています。

#include "ruby.h"                // Ruby
#include <mp/iothreads.h>        // mpio iothreads
#include <msgpack.hpp>           // MessagePack
#include <mp/utility.h>          // mp::set_nonblock
#include <boost/shared_ptr.hpp>  // MessagePackオブジェクトの寿命管理
#include <boost/scoped_ptr.hpp>
#include "simple_buffer.h"       // ちょっとしたバッファ
#include "uniext.h"              // listen_tcp();
#include <iostream>


typedef msgpack::object msgobj;
typedef boost::shared_ptr<msgpack::zone> msgzone;

struct request_t {
  request_t(int from, msgzone _life) : fd(from), life(_life) {}

  int fd;
  msgpack::type::raw_ref method;
  msgobj params;
  uint32_t id;
  msgzone life;

  void msgpack_unpack(const msgpack::type::tuple<msgpack::type::raw_ref, msgobj, uint32_t>& t)
  {
    method = t.get<0>();
    params = t.get<1>();
    id     = t.get<2>();
  }
};


// Logicクラス
class RubyServer {
public:
  // メッセージが届いたら呼ばれる
  void receive_request(request_t req)
  try {
    // request_tをRubyのVALUE型に変換
    VALUE method = rb_str_new(req.method.ptr, req.method.size);
    VALUE id = LONG2FIX(req.id);
    VALUE params;
    {
      simple_buffer buf;
      msgpack::pack(req.params, buf);
      params = rb_str_new(buf.data(), buf.size());
    }

    // receive_requestメソッドを呼ぶ
    // FIXME Rubyの例外をキャッチしていない
    VALUE ret = rb_funcall(
        rb_eval_string("self"), rb_intern("receive_request"),
        3, method, params, id);

    // 返り値を送信する
    char* sendbuf = (char*)malloc(RSTRING_LEN(ret));
    if(!sendbuf) { throw std::bad_alloc(); }
    memcpy(sendbuf, RSTRING_PTR(ret), RSTRING_LEN(ret));

    // Outputはmp::iothreadsに丸投げ
    mp::iothreads::send_data(req.fd, sendbuf, RSTRING_LEN(ret),
        mp::iothreads::writer::finalize_free, sendbuf);

  } catch (std::runtime_error& e) {
    std::cout << e.what() << std::endl;
    throw;
  } catch (...) {
    std::cout << "unknown error" << std::endl;
    throw;
  }
};

static boost::scoped_ptr<RubyServer> g_server( new RubyServer() );


// Input/Dispatch用クラス
// バッファリングとプロトコルのパースをマルチスレッドで処理する
class Connection : public mp::iothreads::handler {
public:
  Connection(int fd) : mp::iothreads::handler(fd) {
    std::cout << "connection established" << std::endl;
  }
  ~Connection() {
    std::cout << "connection closed" << std::endl;
  }

public:
  // readできそうになったら呼ばれる
  void read_event() {
    // MessagePackストリームデシリアライザ
    m_pac.reserve_buffer(1024);

    ssize_t rl = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity());
    if(rl < 0) {
      if(errno == EAGAIN || errno == EINTR) {
        return;
      } else {
        throw std::runtime_error("read error");
      }
    } else if(rl == 0) {
      throw std::runtime_error("connection closed");
    }

    m_pac.buffer_consumed(rl);

    while(m_pac.execute()) {
      // デシリアライズできた receive_request()を呼ぶ
      msgobj msg(m_pac.data());
      msgzone life(m_pac.release_zone());
      m_pac.reset();
      receive_request(msg, life);
    }
  }

private:
  // メッセージが届いたら呼ばれる
  void receive_request(msgobj msg, msgzone life) {
    std::cout << "receive request: " << msg << std::endl;
    // MessagePackのメッセージを静的型に変換する
    request_t req(fd(), life);
    msgpack::convert(req, msg);

    // LogicスレッドでRubyServer::receive_requestを実行する
    mp::iothreads::submit(
        &mp::object_callback<void (request_t)>::
              mem_fun<RubyServer, &RubyServer::receive_request>,
        g_server.get(),
        req
        );
  }

private:
  msgpack::unpacker m_pac;
};


// accept()したら呼ばれる
static void new_connection(void*, int fd)
{
  mp::set_nonblock(fd);
  // iothreadsに登録
  // read()できそうになったらConnection::read_event()が呼ばれる
  mp::iothreads::add<Connection>(fd);
}


// シグナルハンドラ
void finished(void*)
{
  std::cout << "end" << std::endl;
}

void signal_end(int)
{
  mp::iothreads::end();
  mp::iothreads::submit(finished, (void*)NULL);
}

int main()
{
  // Rubyを初期化
  char* rbargv[] = {"", "-e", "", NULL};
  ruby_init();
  ruby_options(3, rbargv);
  rb_eval_string("require 'logic'");  // logic.rbを読み込む

  // 4320/tcpでlisten
  int lsock = listen_tcp(4320);
  mp::set_nonblock(lsock);

  // mp::iothreadsを初期化
  mp::iothreads::manager::initialize();
  mp::iothreads::writer::initialize(2);   // スレッド数を指定
  mp::iothreads::reader::initialize(2);   // スレッド数を指定
  mp::iothreads::listener::initialize();
  //mp::iothreads::connector::initialize(2); // iothreads::connectは使わない

  // シグナルハンドラを登録
  signal(SIGTERM, signal_end);
  signal(SIGINT,  signal_end);

  // accept()できたらnew_connection()が呼ばれる
  mp::iothreads::listen(lsock, new_connection, (void*)NULL);

  mp::iothreads::run();
}
client.rb

メッセージを送信するクライアントです。返り値は単に表示しているだけで、少々手抜き :-p

require 'msgpack'

# サーバーに接続
sock = TCPSocket.new('127.0.0.1', 4320)

# 返り値を表示するスレッド
th = Thread.new {
  begin
    # MessagePackストリームデシリアライザ
    rdbuf = ''
    pac = MessagePack::Unpacker.new
    buffer = ''
    nread = 0
    while true
      sock.sysread(1024, rdbuf)
      buffer << rdbuf
      while !buffer.empty?
        nread = pac.execute(buffer, nread)
        if pac.finished?
          msg = pac.data
          # FIXME 表示しているだけ
          puts "message reached: #{msg.inspect}"
          pac.reset
          buffer.slice!(0, nread)
          nread = 0
          next unless buffer.empty?
        end
      end
    end
  rescue
    p $!
  end
}

# [メソッド, 引数, id]

idseq = 0
sock.write( ["set", ["key0", "value0"], idseq+=1].to_msgpack )
sock.write( ["set", ["key1", "value1"], idseq+=1].to_msgpack )
sock.write( ["set", ["key2", "value2"], idseq+=1].to_msgpack )
sock.write( ["set", ["key3", "value3"], idseq+=1].to_msgpack )

sleep 0.1

sock.write( ["get", ["key0"], idseq+=1].to_msgpack )
sock.write( ["get", ["key1"], idseq+=1].to_msgpack )
sock.write( ["get", ["key2"], idseq+=1].to_msgpack )
sock.write( ["get", ["key3"], idseq+=1].to_msgpack )

th.join


サーバーを起動し、クライアントを実行すると、

message reached: ["stored", nil, 1]
message reached: ["stored", nil, 2]
message reached: ["stored", nil, 3]
message reached: ["stored", nil, 4]
message reached: ["value0", nil, 5]
message reached: ["value1", nil, 6]
message reached: ["value2", nil, 7]
message reached: ["value3", nil, 8]

という結果が返ってきます。確かにRubyのコードが呼び出され、実行されていることが分かります。


Rubyとうまく融合できるのは、汎用的なシリアライズ形式(MessagePack)を使っているところがポイントでしょうか。プロトタイプはRubyで書き、性能と例外処理が必要になってきたところでC++で清書するという開発スタイルでも、途中でプロトコルを変更せずにスムーズに行えそうです。

*1:実際にはディスクIOがボトルネックだったりもするわけですが、キャッシュに載れば十分速いし…

*2:coroutineを使えば案外普通に書けるかもしれません