memstored 0.1 = memcached + mpio + Tokyo Cabinet

memstored は memcached のバイナリプロトコルをサポートしたハッシュストレージサーバーです。IO戦略ライブラリmpio の信頼性と性能をテストするために開発しました。

IOに mp::iothreads を使用し、バックエンドには Tokyo Cabinet の抽象データベースAPIを利用しているため、高速でスケーラビリティが高く、かつ柔軟性の高いアーキテクチャになっています。プログラムの大部分はライブラリによって実現されているため、プログラム全体の見通しが良く、行数で見ても非常に小さく収まっています。

svn co http://svn.coderepos.org/share/lang/c/mpio
cd mpio/trunk                    && ./bootstrap && ./configure && make && make install && cd ../../
svn co http://svn.coderepos.org/share/lang/c/memstored
cd memstored/trunk && ./checkout && ./bootstrap && ./configure && make && make install && cd ../../

IOアーキテクチャ

マルチコア時代の高速サーバーの実装で紹介したサンプルプログラムと同様に mp::iothreads を使っています。memstored はスレッド間で共有するリソース(=排他処理が必要)が少ないので、すべてマルチスレッドで記述しています。


新しいクライアントを accept(2) するためのスレッドが走っており、コネクションが確立されるとWorkerスレッド*1にソケットが受け渡されます。

Workerスレッドはクライアントからデータを受け取り、memcached プロトコルのパースとデータベースの操作を行います。そのスレッドでそのまま返信を返します。ここで返信のサイズが非常に大きかったときに書き込みがブロックしてしまうのを防ぐため、一度に書き込みきれなかった場合は Output 用のスレッドに受け渡します。

返信は writev(2) を使って送信しています。アドレスが連続していなくてもカーネル内で効率よく処理してくれるので、ヘッダ + データ という形式のプロトコルを効率よく送信できます。memstored はデータベースからデータを取り出してから返信が完了するまでの間に一度もデータをコピーしません*2

この最適化はデータのサイズが大きいときに効果を発揮しそうです。

ストレージ

ストレージには Tokyo Cabinet の抽象データベースAPIを利用しています。

抽象データベースAPIはオンメモリデータベースAPIとハッシュデータベースAPIとB+木データベースAPIの共通のインターフェイスで、関数 `tcadbopen' でデータベースを開く際のデータベース名で具体的にどの種類のデータベースを扱うかを指定することができます。ハッシュデータベースの名前には接尾辞として ".tch" をつけ、B+木データベースの名前には接尾辞として ".tcb" をつけることで区別されます。チューニングパラメータは、名前の後に "#" で区切って "name=value" の形式で指定します。例えば "casket.tch#bnum=1000000#apow=10" などとします。


Tokyo Cabinet第1版基本仕様書

memstored の引数に渡すデータベース名によって、実際に利用されるデータベースの種類が決定されます。ストレージは性能に非常に大きな影響を与えるので、最適なデータベースを選ぶことが重要です。
"db.tch"と指定すればハッシュデータベース、"*#capsiz=1024m"と指定すれば128MBのオンメモリデータベース、"db.tcb#opts=d"と指定すればDeflateによる圧縮が有効になったB+木データベースになります。

マイクロベンチマーク

memcached のベンチマークツールである memslap を使って、memstored と memcached、Tokyo Tyrant の性能を比較してみました。

memslap --test=get --concurrency=128 [--binary]

あらかじめ1万個のエントリを書き込んでおいて、128本のスレッドでそれぞれ1万回GETを行ったときの経過時間を測定しています。キーのサイズは100バイトで、値のサイズは400バイトです。

server storage protocol elapsed
memstored TC Hash DB (#bnum=20011) binary 16.3 sec
memstored TC on Memory DB binary 15.3 sec
memstored TC B+Tree DB (#bnum=20011) binary 16.8 sec
memcached SLAB binary 15.6 sec
memcached SLAB text 16.8 sec
Tokyo Tyrant TC Hash DB (bnum=20011) text 25.0 esc

とりあえずこのテストでは十分な性能が得られていることが分かります。
値のサイズがもっと大きい場合のテストを行ってみたかったのですが、memcached は1MB以上のキー+値に対応していない & テキストプロトコルは1MB以上の値を許可していない ため、比較ができませんでした。memstored は2^32バイトまでのキー+値に対応しています。

Usage

Usage: memstored [options] -s <database> -l <[addr:]port>

options:
  -h                --help           show this message
  -s                --store          database name. see tcadb(3) for details
  -l <[addr:]port>  --listen         listen address
  -B <number=2048>  --initial-size   allocate this size of buffer first
  -N <number=1024>  --reserve-size   reserve at least this size of buffer
  -W <number=2>     --write-threads  number of threads for asynchronous writing
  -R <number=4>     --read-threads   number of threads for asynchronous reading

*1:プログラム中ではmemstored::Connectionクラスで表現

*2:ユーザー空間では

memstoredの実装詳細 1000行に収まる高速サーバーについて

1000行はテキトーなハッタリだが良くあること ;-)
memstored の本体は memstored.cc ファイル1つに収まっており、その行数は600行程度しかない。memstored のポイントは、少ない行数で高速なサーバーをカンタンに書けたことであった。memstored の実装について詳しく紹介してみる。

memstoredのソースコード
http://svn.coderepos.org/share/lang/c/memstored/trunk/memstored.cc


ソースコードを読むと、処理のほとんどはmp::iothreadsに任せているが、

  • プロトコルのストリームパーサ
  • 読み込み用のバッファリング
  • メインロジック

の3つは、mp::iothreads とはほぼ完全に分離していることが分かる。逆に言えば、この3つを実装しさえすれば、高速なサーバーを記述できることになる。*1


ちなみにプロトコルに MessagePack を使うと、MessagePack の C++ API にはストリームパーサとバッファリング機構が含まれているので、

  • メインロジック

だけを書けば良くなる!


さて、memstoredのmain関数周辺は、以下のようになっている。

static std::auto_ptr<memstored::Server> g_server;

int main(int argc, char* argv[])
{
    memstored::PARAM.parse(argc, (const char**)argv);
    g_server.reset(new memstored::Server());
    g_server->run();
    g_server->join();
    return 0;
}
  1. 引数の解釈
  2. サーバーを初期化
  3. サーバーを開始
  4. サーバーが終了するまで待つ

という処理を行っている。典型的。


引数解釈の部分はどうでもいいので置いておき、2.のサーバーの初期化処理を見てみる:

Server::Server() :
    m_db(PARAM.dbname.c_str()),
    m_sock(PARAM.listen_address)
{
    mp::iothreads::manager::initialize();
    mp::iothreads::reader::initialize(PARAM.read_threads);
    mp::iothreads::writer::initialize(PARAM.write_threads);
    mp::iothreads::listener::initialize();
    //mp::iothreads::connector::initialize(PARAM.connect_threads);

    // signal handling
    // Binary Hacks #52
    sigset_t ss;
    sigemptyset(&ss);
    sigaddset(&ss, SIGHUP);
    sigaddset(&ss, SIGINT);
    sigaddset(&ss, SIGTERM);
    m_signal_thread.reset(new mp::pthread_signal(ss, &Server::signal_end));

    // ignore SIGPIPE
    if( signal(SIGPIPE, SIG_IGN) == SIG_ERR ) {
        throw std::runtime_error(strerror(errno));
    }

    mp::iothreads::listen(m_sock.get(), &Server::accepted, (void*)m_db.get());
}

最初にmp::iothreads::*::initialize()という関数呼び出しが並んでいるが、ここでmp::iothreadsの初期化を行っている。mp::iothreadsはイベント駆動型のサーバーを記述するためのフレームワークで、memstoredの簡潔さはmp::iothreadsに依るところが大きい。

中盤ではシグナルハンドラをセットしている。ここでは Binary HacksHack #52 sigwaitで非同期シグナルを同期的に処理する に載っていたテクニックを使っている。普通シグナルハンドラでは非同期シグナルセーフな関数の呼び出しか volatile sig_atomic_t な変数の操作しかできないが*2、シグナルハンドル専用のスレッドを立ち上げることでこの制限を回避している。詳しくは Binary Hacks か mp::pthread_signalのソースコード参照。


それより重要なのは、最後の mp::iothreads::listen(m_sock.get(), &Server::accepted, (void*)m_db.get()); という一文になる。これを実行すると、m_sock.get()(←これはファイルディスクリプタを返す)で accept(2) し、新しいクライアントが接続してきたら、Server::accepted()関数が呼ばれるようになる。引数には m_db.get() が渡される。

accept(2) や connect(2) のようなブロックする可能性のある関数呼び出しは、基本的にすべてフレームワーク側(mp::iothreads)で行うことになる。ブロックする関数を呼び出しをせずにイベント駆動マシン止めないようにするのが、高速なサーバーを記述する上でのポイントになる。


その Server::accepted() 関数の実装はこの通り:

void Server::accepted(void* db, int fd)
{
    if(fd >= 0) {
        mp::set_nonblock(fd);
        mp::iothreads::add<Connection>(fd, (TCADB*)db);
    }
}

新しいクライアントが接続してくるとこの関数が呼ばれる。

mp::set_nonblock(fd) は、ファイルディスクリプタに O_NONBLOCK フラグをセットしている。

mp::iothreads::add(fd, (TCADB*)db); は、Connectionクラスをイベントハンドラとして、mp::iothreads にファイルディスクリプタを登録している。ここで:

という処理が登録される。

この挙動から分かるように、Connectionクラスのインスタンスの寿命とコネクションの寿命は一致する。つまりConnectionクラスのコンストラクタでコネクションごとに保持すべきリソース(バッファなど)を初期化したり、デストラクタで確実に後始末をするようにしたりすることができる。
これで close()しわすれイベントハンドラメモリリーク を確実に防ぐことができる。


イベントハンドラは複数のスレッドで並列に処理される。つまりConnection::read_event()はマルチスレッドで実行される。

この図の「Inter-thread Communication」が mp::iothreads::add() で行われている。


続いて Connection::read_event() 関数を見てみる。この関数はクライアントからデータが送られてくると呼ばれる。

void Connection::read_event()
try {
    if(m_free < PARAM.buffer_reserve_size) {
        // m_bufferを確保する ... 略 ...
    }

    ssize_t rl = ::read(fd(), m_buffer+m_used, m_free);
    if(rl < 0) {
        if(errno == EAGAIN || errno == EINTR) {
            return;
        } else {
            throw std::runtime_error("read error");
        }
    } else if(rl == 0) {
        throw std::runtime_error("connection closed");
    }

    // ... 略 ...

    while( (ret = memproto_parser_execute(&m_memproto, m_buffer, m_used, &m_off)) > 0) {
        if( (ret = memproto_dispatch(&m_memproto)) <= 0) {
            throw std::runtime_error("unknown command");
        }
    }

    // ... 略 ...
}

まずm_bufferに十分な容量のメモリが確保されているかどうかを確認している。続いてソケットからデータを読み込み、いつものエラー処理を行う。
例外を投げてもソケットはデストラクタで確実にcloseされるので、バンバン例外を投げて良い。


その後 memrpoto_parser_execute()関数と memproto_parser_dispatch()関数 を呼び出しているが、これはmemcachedプロトコルのストリームパーサで紹介したストリームパーサのバイナリプロトコル版である。
「データを次々に投げ込んでいくと内部の状態が遷移していき、ゴールの状態にたどり着くとパース完了、という状態遷移型のパーサ」で、イベント駆動型のサーバーを実装するために適している。


ここからの処理は少し端折るが、プロトコルのパースが成功し、パースされたリクエストパケットが例えばGETリクエストだったとすると、Connection::memproto_getx()関数が呼ばれるようになっている。
というわけで Connection::memprot_getx()関数を見てみる:

void Connection::memproto_getx(memproto_header* h, const char* key, uint16_t keylen)
{
    bool cmd_k = (h->opcode == MEMPROTO_CMD_GETK || h->opcode == MEMPROTO_CMD_GETKQ);
    bool cmd_q = (h->opcode == MEMPROTO_CMD_GETQ || h->opcode == MEMPROTO_CMD_GETKQ);
    int vallen = 0;
    void* val = tcadbget(m_db, key, keylen, &vallen);
    if(val) {
        uint32_t flags = htonl(0);
        send_response(h, MEMPROTO_RES_NO_ERROR,
                key, (cmd_k ? keylen : 0),
                val, vallen,
                (char*)&flags, sizeof(flags),
                0);
    } else if(!cmd_q) {
        send_response_nodata(h, MEMPROTO_RES_KEY_NOT_FOUND, 0);
    }
}

tcadbget()でTokyo Cabinetのデータベースから値を取り出している。もし値が存在すればsend_response(...)を呼び出し、存在しなければ send_response_nodata(...); を呼び出している。

後者はさて置き、前者のsend_response()関数はこのようになっている:

void Connection::send_response(memproto_header* h,
        uint8_t status,
        const char* key, uint16_t keylen,
              void* val, uint32_t vallen,
        const char* extra, uint16_t extralen,
        uint64_t cas)
{
    struct iovec vb[4];
    mp::iothreads::reqvec vr[4];
    unsigned int vi = 0;

    char header[24];
    pack_header(header, status, h->opcode,
            keylen, vallen, extralen,
            h->opaque, cas);

    vb[0].iov_base = header;
    vb[0].iov_len  = sizeof(header);
    vr[0] = mp::iothreads::reqvec();
    ++vi;

    if(extralen > 0) {
        vb[vi].iov_base = const_cast<char*>(extra);
        vb[vi].iov_len  = extralen;
        vr[vi] = mp::iothreads::reqvec();
        ++vi;
    }

    if(keylen > 0) {
        vb[vi].iov_base = const_cast<char*>(key);
        vb[vi].iov_len  = keylen;
        vr[vi] = mp::iothreads::reqvec();
        ++vi;
    }

    if(vallen > 0) {
        vb[vi].iov_base = val;
        vb[vi].iov_len  = vallen;
        vr[vi] = mp::iothreads::reqvec(
                mp::iothreads::writer::finalize_free, val);
        ++vi;
    }

    mp::iothreads::send_datav(fd(), vb, vr, vi);
}

せっせと配列を初期化しているが、最終的に mp::iothreads::send_datav() 関数を呼び出している。この関数はまず writev(2) でデータを送ろうと試みたあと、すべて送信しきれなければ送信専用のスレッドにデータを受け渡す。
ここでもブロックしないのがポイントである。イベント駆動マシンを止めずにスレッドを走らせ続けることができる。

*1:ストリームパーサは [http://www.complang.org/ragel/:title=Ragel] を使うと効率よく書ける。HTTPのストリームパーサが必要なら [http://mongrel.rubyforge.org/:title=Mongrel] の中に Ragel で書かれたストリームパーサが含まれているので参考になる。

*2:http://d.hatena.ne.jp/yupo5656/20040712/p2

同期プロトコルと非同期プロトコル

今のところチラシの裏的な話。


ネットワークプロトコルは、

  1. リクエストした順番通りにレスポンスを返す(リクエストの順番とレスポンスの順番が同期している)
  2. リクエストした順番通りにレスポンスが返ってくるとは限らない(リクエストの順番とレスポンスの順番が同期していない)

の2種類に分けられる。

前者を同期プロトコル、後者を非同期プロトコルと命名してみる。
HTTP や XML-RPC は同期プロトコルであり、JSON-RPC は非同期プロトコルである。TCPも非同期プロトコルだと言える。非同期プロトコルの特徴として、パケットの中にシーケンス番号(かそれに類するもの)が含まれている。


同期・非同期の違いはパイプライン化(リクエストを送ったあと、レスポンスを待たずに次のリクエストを送る)したときに出てくる。同期プロトコルではリクエストの順番通りにレスポンスが返ってくるが、非同期プロトコルでは順不同になる。
パイプライン化すると、2つ以上のリクエストが立て続けに発生したときに、2つ目以降のリクエストで ネットワーク や カーネル空間<=>ユーザー空間のアドレス変換 などの遅延分が隠蔽されて速くなる。負荷が高いときのパフォーマンスが向上する = スケーラビリティが良くなるので、高速なサーバーを実装する上ではぜひサポートしたい。


サーバーを実装することを考えたとき、マルチスレッドでイベント駆動型のアーキテクチャでは同期プロトコルのパイプラインは非常に対応しにくい。イベント駆動では「リクエストを処理しおわったら(=レスポンスを処理し終わったというイベントが発生したら)レスポンスを返す」という形になるので、単純に実装しただけではレスポンスを返す順番を保証できない。
リクエストを処理し終わっても一端キューに溜めておき、順番通りになったところでレスポンスを返すようにする必要がある。これを同期化と命名してみる。


一方クライアントが接続してくるごとにスレッドを立ち上げるアーキテクチャでは、普通に実装すれば同期プロトコルに対応する。


memcachedのテキストプロトコルは同期プロトコルだが、バイナリプロトコルは非同期プロトコルになる可能性がある*1。バイナリプロトコルにはヘッダに opaque というフィールドがあり、リクエストヘッダに入っている opaque の値をレスポンス時にそのまま返す。ここにシーケンス番号を入れればいい。


しかし libmemcached では opaque の値は常に 0 になっている。
memstored はマルチスレッドなイベント駆動型のサーバーで普通に実装しているので、リクエストをパイプライン化されるとlibmemcachedはたぶん混乱する。しかし幸いなことに(?)libmemcachedはパイプライン化しない(^_^;) *2


同期プロトコル vs 非同期プロトコル というのは一筋縄ではいかない問題だと思われる。
同期プロトコルと比べると、非同期プロトコルはクライアントの実装が複雑になる。これは嬉しくない。
逆に同期プロトコルでは、サーバーをマルチスレッドなイベント駆動型で実装しようとすると、実装が大変になる。クライアントごとにスレッドなモデルにするか、あるいはイベント駆動でもcoroutineを使えば良いのだが、どちらにしてもコンテキストスイッチのオーバーヘッドが少なからず発生する。


mpioで「汎用的な同期化ライブラリ」を実装すれば、HTTP や SMTP もサクサク実装できて嬉しかったりするだろうか。

*1:実際のところレスポンスの順番を保証しないと行けないのか、そうではないのかはちょっと良く分からない

*2:バイナリプロトコルで複数キーを同時にgetするときはまた別。これはパイプライン化で実現することが意図されている