memcachedプロトコルのストリームパーサ

memcachedクライアントはほとんどの言語で実装されており、key-valueベースの何かを作るときにはmemcacheプロトコルをサポートしておくと、クライアントを実装する手間が省けるのでイケてます。

しかしmemcachedのテキストプロトコルのような「行」が主体となっているプロトコルは、スレッドを使った実装では比較的簡単に処理できるのですが(fgets(3)を使うなど)、selectやepollなどを使ったイベント駆動型の実装では非常に面倒なことになります。(一度パースしてみて、どうも全部データが到着していないようなら一度状態を変数に保存して、次にデータが到着したら変数から状態を復元して…)

イベント駆動型の実装では、データを次々に投げ込んでいくと内部の状態が遷移していき、ゴールの状態にたどり着くとパース完了、という状態遷移型のパーサが必要になります。そこで、Ragel State Machine Compilerを使うと高速なストリームパーサを作ることができます。*1


このRagelを使ってmemcachedのテキストプロトコルのパーサを書いてみました*2。CのAPIRubyバインディングを実装しています。
mp-memcached.tar.gzからダウンロードできます。

#include <memcache_parser.h>

// コールバック関数
static int memcache_get(const char** key, unsigned* key_len, unsigned keys,
        void* user)
{
    // ....
}

static memcache_parser parser;

int init(void)
{
    // コールバック関数をセット
    memcache_parser_callback callback = {
        memcache_get,
        memcache_set,
        memcache_replace,
        memcache_append,
        memcache_prepend,
        memcache_cas,
        memcache_delete,
    };
    // パーサを初期化
    memcache_parser_init(&parser, &callback, NULL);
}

static size_t nread = 0;

// データが来たら呼ばれる関数
void receive_data(char* stream, size_t stream_len) {
    // ストリーム処理
    memcache_parser_execute(&parser, stream, stream_len, &nread);

    // ...
}
require 'memcache_parser'

class MemcacheConnection
  def initialize
    # プロキシオブジェクトはself
    @parser = MemcacheParser::Parser.new(self)

    @buffer = ''
    @nread = 0
  end

  # コールバックメソッド
  # memcache_getという名前にしておくと呼ばれる
  def memcache_get(keys)
    # ...
  end

  def memcache_set(key, data, flags, exptime, noreply)
    # ...
  end

  # データが来たら呼ばれるメソッド
  def receive_data(data)
    @buffer << data
    # ↓ここでコールバックメソッドが呼ばれる
    @nread = @parser.execute(@buffer, @nread)
    if @parser.finished?
      @buffer.slice!(0..-1)
      @nread = 0
    end
  rescue
    # パースエラー → 例外
    send_data "CLIENT_ERROR #{$!}\r\n"
    raise
  end
end

memcache_parser + mpio + Tokyo Cabinet

このRagelで作ったmemcacheプロトコルパーサと、Tokyo Cabinetを組み合わせて、memcachedモドキを作ってみました。memcachedモドキと言うか、getとsetだけをサポートしたハッシュストレージです :-p

IOにはmpioを使い、スケーラビリティの高いイベント駆動型のIOで動作します。


250行足らずの小さいプログラムです。以下にソースコードを貼り付けました。
mp::multiplexは最近(今日)開発したIOライブラリで、read/writeとlisten、connect、タイマーをライブラリ内で管理してくれます(予定&未完成)。コールバック関数を登録していくことでネットワークプログラムを記述していきます。別に開発している、マルチスレッドな並列IOとシングルスレッドなコールバックを組み合わせたIOライブラリ(mp::iothreads)と同じインターフェイスで使える点がポイントで、プログラムを変更することなく環境に合わせて最適なIO戦略を選べるようになる予定です。


現バージョンのmpioライブラリを含めたソースコード一式は、mp-memcached.tar.gzからダウンロードできます。mpioの最新バージョンはCodeRepos://lang/c/mpioにあります。

#include <mp/multiplex.h>      // mpio multiplex io strategy
#include <memcache_parser.h>   // memcache protocol stream parser
#include <tchdb.h>             // TokyoCabinet hash database
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <sstream>
#include <iostream>
#include "uniext.h"            // listen_tcp()


// mp::multiplexストリームハンドラ
// クライアント1コネクションごとにオブジェクトが作られる
class MemcacheUser : public mp::multiplex::handler {
public:
    static const size_t INITIAL_SPACE = 2048;
    static const size_t MIN_SPACE = 1024;

    MemcacheUser(int fd, TCHDB* db) :
        mp::multiplex::handler(fd),
        m_buffer( (char*)malloc(INITIAL_SPACE) ),
        m_allocated(INITIAL_SPACE),
        m_used(0),
        m_nparsed(0),
        m_db(db)
    {
        if(!m_buffer) { throw std::bad_alloc(); }

        // memcacheプロトコルパーサーを初期化
        // コールバック関数を登録
        memcache_parser_callback callback = {
            &MemcacheUser::s_memcache_get,
            &MemcacheUser::s_memcache_set,
            NULL,  // replace
            NULL,  // append
            NULL,  // prepend
            NULL,  // cas
            NULL,  // delete
        };
        memcache_parser_init(&m_parser, &callback, this);
    }

    ~MemcacheUser()
    {
        std::cout << "connection closed" << std::endl;
        free(m_buffer);
    }

public:
    // readできそうになると呼ばれる
    void read_event()
    {
        // 最低でもMIN_SPACEバイトのバッファを確保
        if(m_allocated - m_used < MIN_SPACE) {
            size_t nsize = m_allocated*2;
            void* tmp = realloc(m_buffer, nsize);
            if(!tmp) { throw std::bad_alloc(); }
            m_buffer = (char*)tmp;
            m_allocated = nsize;
        }

        // readしてみる
        ssize_t rl = ::read(fd(), m_buffer+m_used, m_allocated-m_used);
        if(rl < 0) {
            if(errno == EAGAIN || errno == EINTR) {
                return;
            } else {
                throw std::runtime_error("read error");
            }
        } else if(rl == 0) {
            throw std::runtime_error("connection closed");
        }
        m_used += rl;  // rlバイト読み込めた

        // stream parsing
        // パースでき次第コールバック関数が呼ばれる
        int ret = memcache_parser_execute(&m_parser, m_buffer, m_used, &m_nparsed);
        if(ret < 0) {
            throw std::runtime_error("parse error");
        } else if(ret == 0) {
            return;
        } else {
            if(m_nparsed == m_used) {
                m_used = 0;
            } else {
                std::memmove(m_buffer, m_buffer+m_nparsed, m_used-m_nparsed);
            }
            m_nparsed = 0;
        }
    }

private:
    void memcache_get(const char** keys, unsigned* key_lens, unsigned key_num)
    {
        for(unsigned i=0; i < key_num; ++i) {
            const char* key = keys[i];
            unsigned key_len = key_lens[i];
    
            // TokyoCabinetからvalueを取得  FIXME バッファは使い回した方が速い
            int data_len;
            char* data = (char*)tchdbget(m_db, key, key_len, &data_len);
            if(data) {
                // テキストを作成  FIXME おそい
                std::ostringstream stream;
                stream << "VALUE ";
                stream.write(key, key_len);
                stream << " 0 " << data_len << "\r\n";
                stream.write(data, data_len);
                stream << "\r\n";
    
                std::string s(stream.str());
                mp::multiplex::send_data(fd(), s.data(), s.size());
    
                free(data);
            }
        }
        mp::multiplex::send_data(fd(), "END\r\n", 5);
    }

    void memcache_set(const char* key, unsigned key_len,
            unsigned short flags, uint64_t exptime,
            const char* data, unsigned data_len,
            bool noreply)
    {
        if(flags != 0 || exptime != 0 || noreply) {
            static const char* NOT_SUPPORTED_REPLY("CLIENT_ERROR no options are supported\r\n");
            mp::multiplex::send_data(fd(), NOT_SUPPORTED_REPLY, strlen(NOT_SUPPORTED_REPLY));
        }

        // TokyoCabinetにdataを格納
        if( tchdbput(m_db, key, key_len, data, data_len) ) {
            mp::multiplex::send_data(fd(), "STORED\r\n", 8);
        } else {
            static const char* STORE_FAILED = "SERVER_ERROR can't stored\r\n";
            mp::multiplex::send_data(fd(), STORE_FAILED, strlen(STORE_FAILED));
        }
    }

private:
    // buffer
    char* m_buffer;
    size_t m_allocated;
    size_t m_used;

    // stream parser
    memcache_parser m_parser;
    size_t m_nparsed;

    // TokyoCabinet
    TCHDB* m_db;

private:
    // トランポリン
    static int s_memcache_get(const char** key, unsigned* key_lens, unsigned key_num,
            void* user)
    {
        reinterpret_cast<MemcacheUser*>(user)->memcache_get(key, key_lens, key_num);
        return 0;
    }

    // トランポリン
    static int s_memcache_set(const char* key, unsigned key_len,
            unsigned short flags, uint64_t exptime,
            const char* data, unsigned data_len,
            bool noreply,
            void* user)
    {
        reinterpret_cast<MemcacheUser*>(user)->memcache_set(key, key_len,
                flags, exptime,
                data, data_len,
                noreply);
        return 0;
    }
};


class Md {
public:
    Md(unsigned short listen_port, const char* dbpath) :
            m_db(tchdbnew())
    {
        if(!m_db) { std::runtime_error("can't create tchdb"); }
        tchdbopen(m_db, dbpath, HDBOWRITER|HDBOCREAT);

        int lsock = listen_tcp(listen_port);
        mp::multiplex::initialize();
        mp::multiplex::listen(lsock,
                &mp::object_callback<void (int)>::mem_fun<Md, &Md::new_user>,
                this);
    }

    ~Md()
    {
        tchdbclose(m_db);
        tchdbdel(m_db);
    }

public:
    void run() { mp::multiplex::run(); }

private:
    TCHDB* m_db;

private:
    // accept()したら呼ばれる
    void new_user(int fd)
    {
        if(fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
            throw std::runtime_error("can't set nonblocking mode");
        }
        mp::multiplex::add<MemcacheUser>(fd, m_db);
    }
};


void usage()
{
    std::cout << "usage: a.out <port> <db path>.hdb" << std::endl;
}

int main(int argc, char* argv[])
{
    if(argc != 3) {
        usage();
        return 1;
    }

    int port = atoi(argv[1]);
    if(port <= 0) {
        usage();
        return 1;
    }

    const char* dbpath = argv[2];

    Md app(port, dbpath);
    app.run();

    return 0;
}

*1:ちなみに[http://d.hatena.ne.jp/viver/20080816/p1:title=MessagePack]のデシリアライザは最初はRagelで実装していたのですが、最終的にはMessagePackの仕様の特性を利用したパーサーを自前で書いています。コードは非常に読みにくいですが、とても高速です;-) [http://svn.coderepos.org/share/lang/c/msgpack/trunk/msgpack/unpack/inline_impl.h:title=msgpack inline_impl.h]

*2:incr, decl, statsは未実装。memcachedのクローンを実装するのではなく、memcachedのプロトコルをしゃべるストレージなどを実装することを目的にしているので、たぶんすべてのコマンドは実装しません。