第101回カーネル読書会

先日楽天タワーで開催された第101回カーネル読書会で、分散KVS kumofs の設計と実装に関して発表しました。
少々準備不足で十分な発表ができたか不安ではありますが、懇親会で大いに議論が盛り上がったのは良かったです。


そんなわけで、発表資料を公開しました。

デモ用コード

デモ用のコードをいくつか用意していたので、ここに掲載します。
エラー処理などなどはテキトーですが、とりあえず動きます。

MessagePackシリアライズC++

kumofsで利用しているシリアライズライブラリMessagePackを使って、オブジェクトをシリアライズするコードです。
シリアライズした結果は標準出力(ファイルディスクリプタ1番)に書き出します。

#include <msgpack.hpp>
#include <unistd.h>
#include <vector>
#include <iostream>

int main(void)
{
    // [1, 2, 3]
    std::vector<int> vec;
    vec.push_back(1);
    vec.push_back(2);
    vec.push_back(3);

    msgpack::sbuffer sbuf;  // バッファ simple buffer
    msgpack::pack(sbuf, vec);

    write(1, sbuf.data(), sbuf.size());
}
MessagePackデシリアライズ & pretty print(Ruby

標準入力からMessagePackでシリアライズされたデータを読み込んで、デシリアライズした結果を人間に読みやすい形に整形して表示するコードです。
Rubyを使って数行で書けます。

#!/usr/bin/env ruby
require 'rubygems'
require 'msgpack'

data = STDIN.read
obj = MessagePack::unpack(data)
p obj
MessagePackデシリアライズ & pretty print&型変換(C++

標準入力からデータを読み込んでデシリアライズしてpretty printした後に、std::vector<int> に変換するコードです。
C++シリアライズしたデータをRubyでデシリアライズすることができます。逆も可。

#include <msgpack.hpp>
#include <unistd.h>
#include <vector>
#include <iostream>

int main(void)
{
    char buf[1024];
    ssize_t size = read(0, buf, sizeof(buf));

    msgpack::zone z;
    msgpack::object obj;
    msgpack::unpack(buf, size, NULL, &z, &obj);

    std::cout << "pretry print: " << obj << std::endl;

    try {
        std::vector<int> vec;
        obj.convert(&vec);

        std::cout << "convert to std::vector<int>:" << std::endl;
        std::cout << "  size =" << vec.size() << std::endl;
        for(size_t i=0; i < vec.size(); ++i) {
            std::cout << "  vec[" << i << "] = " << vec[i] << std::endl;
        }

    } catch(msgpack::type_error& e) {
        std::cout << "type mismatch. std::vector<int> is expected" << std::endl;
    }
}
MessagePackシリアライズ ゼロコピー版(C++

msgpack::sbufferの代わりにmsgpack::vrefbufferを使って、文字列をコピーせずにシリアライズして標準出力するコードです。
writevを使って出力します。

#include <msgpack.hpp>
#include <unistd.h>
#include <vector>
#include <string>
#include <iostream>

int main(void)
{
    // ["a"*64, "b"*64]
    std::vector<std::string> vec;
    char aaa[64];  memset(aaa, 'a', sizeof(aaa));
    char bbb[64];  memset(bbb, 'b', sizeof(bbb));
    vec.push_back(std::string(aaa, sizeof(aaa)));
    vec.push_back(std::string(bbb, sizeof(bbb)));

    msgpack::vrefbuffer vbuf;  // バッファ
    msgpack::pack(vbuf, vec);

    const struct iovec* iov = vbuf.vector();
    size_t iov_size         = vbuf.vector_size();

    writev(1, iov, iov_size);
}
MessagePackストリームデシリアライズC++

標準入力からデータを読み込みつつ、オブジェクトを1つデシリアライズできたらon_message関数を呼び出すコードです。
バッファリングやメッセージの切り出しは、すべてMessagePackのライブラリが面倒を見てくれます。

#include <msgpack.hpp>
#include <unistd.h>
#include <vector>
#include <string>
#include <iostream>
#include <errno.h>
#include <memory>

typedef std::auto_ptr<msgpack::zone> auto_zone;

void on_message(msgpack::object msg, auto_zone z)
{
    std::cout << "message reached: " << msg << std::endl;
}

int main(void)
{
    msgpack::unpacker pac;

    while(true) {
        pac.reserve_buffer(32);

        ssize_t count = read(0, pac.buffer(), pac.buffer_capacity());

        if(count <= 0) {
            if(count == 0) { return 0; }
            if(errno == EAGAIN || errno == EINTR) { return -1; }
            throw std::runtime_error(strerror(errno));
        }

        pac.buffer_consumed(count);

        while(pac.execute()) {
            msgpack::object msg = pac.data();
            auto_zone z(pac.release_zone());
            pac.reset();
            on_message(msg, z);
        }
    }
}
MessagePack-RPCサーバ(Ruby

MessagePack-RPCライブラリを使ってサーバを実装する例です。
set_asyncは、非同期に結果を返します。

#!/usr/bin/env ruby
require 'rubygems'
require 'msgpack/rpc'  # gem install msgpack-rpc

class MyServer
  def initialize(svr)
    @svr = svr
    @hash = Hash.new
  end
  def get(key)
    @hash[key]
  end
  def set(key, val)
    @hash[key] = val
  end
  def set_async(key, val)
    as = MessagePack::RPC::AsyncResult.new
    @svr.submit do
      @hash[key] = val
      as.result(val)
    end
    as
  end
end

svr = MessagePack::RPC::Server.new
svr.listen('0.0.0.0', 9090, MyServer.new(svr))
svr.run
MessagePack-RPCクライアント(Ruby

MessagePack-RPCライブラリを使ってサーバを実装する例です。
非同期型の呼び出し(sendとjoin)を使うと、並列して複数の関数呼び出しを行うことができます。

#!/usr/bin/env ruby
require 'rubygems'
require 'msgpack/rpc'  # gem install msgpack-rpc

cli = MessagePack::RPC::Client.new('127.0.0.1', 9090)

cli.call(:set, "key1", "val1")
p cli.call(:get, "key1")

kv = (1..100).map {|i| ["key#{i}", "val#{i}"] }

# 同期呼び出し
# set key1 ->
# ret key1 <-
# set key2 ->
# ret key2 <-
# ...
kv.map {|key, val|
  cli.call(:set, key, val)
}

# 非同期呼び出し
# set key1 ->
# set key2 ->
# ...
# ret key1 <-
# ret key2 <-
# ...
kv.map {|key, val|
  cli.send(:set, key, val)
}.map {|req|
  req.join.result
}
MessagePack-RPCプロキシ(Ruby

MessagePack-RPCを使ってプロキシサーバを実装する例です。イベント駆動型のサーバです。プロキシなのでクライアント的な部分も含んでいますが、イベント駆動型で動作するように、ブロックしないコールバック型の呼び出しを使っています。
また、セッションプーリングを利用しています(get_sessionのところ)。一度作成したセッションやコネクションは何度も使い回すことで、コネクションを確立する遅延を削減します。

#!/usr/bin/env ruby
require 'rubygems'
require 'msgpack/rpc'  # gem install msgpack-rpc

class MyProxy
  def initialize(svr)
    @svr = svr
  end
  def get_proxy(host, port, key)
    s = @svr.get_session(host, port)
    as = MessagePack::RPC::AsyncResult.new
    s.callback(:get, key) do |err, res|
      as.result(res, err)
    end
    as
  end
end

svr = MessagePack::RPC::Server.new
svr.listen('0.0.0.0', 9091, MyProxy.new(svr))
svr.run
色つきログ

kumofsで実際に利用している、ログを出力するコードです。
MessagePackでデシリアライズしたオブジェクトは、未知の(バグった)型のデータでも(Rubyのコードのように動的型のまま扱うことで)prettyに整形して表示できます。ログには自動的にタイムスタンプや関数名が付与されます。

#include <msgpack.hpp>
#include <unistd.h>
#include <vector>
#include <string>
#include <iostream>
#include <errno.h>
#include <memory>
#include "cclog/cclog.h"
#include "cclog/cclog_tty.h"

typedef std::auto_ptr<msgpack::zone> auto_zone;

void on_message(msgpack::object msg, auto_zone z)
{
    std::cout << "message reached: " << msg << std::endl;
    LOG_TRACE("message reached: ", msg, "  (level TRACE)");
    LOG_DEBUG("message reached: ", msg, "  (level DEBUG)");
    LOG_INFO ("message reached: ", msg, "  (level INFO)");
    LOG_WARN ("message reached: ", msg, "  (level WARN)");
    LOG_ERROR("message reached: ", msg, "  (level ERROR)");
}

int main(void)
{
    cclog::reset(new cclog_tty(cclog::TRACE, std::cout));

    msgpack::unpacker pac;

    while(true) {
        pac.reserve_buffer(32);

        ssize_t count = read(0, pac.buffer(), pac.buffer_capacity());

        if(count <= 0) {
            if(count == 0) { return 0; }
            if(errno == EAGAIN || errno == EINTR) { return -1; }
            throw std::runtime_error(strerror(errno));
        }

        pac.buffer_consumed(count);

        while(pac.execute()) {
            msgpack::object msg = pac.data();
            auto_zone z(pac.release_zone());
            pac.reset();
            on_message(msg, z);
        }
    }
}


実行すると↓こんな感じになります:

ログにタイムスタンプ、ファイル名、行数が自動的に付与されます。TRACEレベルとDEBUGレベルのログには関数名も付いています。