第101回カーネル読書会
先日楽天タワーで開催された第101回カーネル読書会で、分散KVS kumofs の設計と実装に関して発表しました。
少々準備不足で十分な発表ができたか不安ではありますが、懇親会で大いに議論が盛り上がったのは良かったです。
そんなわけで、発表資料を公開しました。
デモ用コード
デモ用のコードをいくつか用意していたので、ここに掲載します。
エラー処理などなどはテキトーですが、とりあえず動きます。
MessagePackシリアライズ(C++)
kumofsで利用しているシリアライズライブラリMessagePackを使って、オブジェクトをシリアライズするコードです。
シリアライズした結果は標準出力(ファイルディスクリプタ1番)に書き出します。
#include <msgpack.hpp> #include <unistd.h> #include <vector> #include <iostream> int main(void) { // [1, 2, 3] std::vector<int> vec; vec.push_back(1); vec.push_back(2); vec.push_back(3); msgpack::sbuffer sbuf; // バッファ simple buffer msgpack::pack(sbuf, vec); write(1, sbuf.data(), sbuf.size()); }
MessagePackデシリアライズ & pretty print(Ruby)
標準入力からMessagePackでシリアライズされたデータを読み込んで、デシリアライズした結果を人間に読みやすい形に整形して表示するコードです。
Rubyを使って数行で書けます。
#!/usr/bin/env ruby require 'rubygems' require 'msgpack' data = STDIN.read obj = MessagePack::unpack(data) p obj
MessagePackデシリアライズ & pretty print&型変換(C++)
標準入力からデータを読み込んでデシリアライズしてpretty printした後に、std::vector<int> に変換するコードです。
C++でシリアライズしたデータをRubyでデシリアライズすることができます。逆も可。
#include <msgpack.hpp> #include <unistd.h> #include <vector> #include <iostream> int main(void) { char buf[1024]; ssize_t size = read(0, buf, sizeof(buf)); msgpack::zone z; msgpack::object obj; msgpack::unpack(buf, size, NULL, &z, &obj); std::cout << "pretry print: " << obj << std::endl; try { std::vector<int> vec; obj.convert(&vec); std::cout << "convert to std::vector<int>:" << std::endl; std::cout << " size =" << vec.size() << std::endl; for(size_t i=0; i < vec.size(); ++i) { std::cout << " vec[" << i << "] = " << vec[i] << std::endl; } } catch(msgpack::type_error& e) { std::cout << "type mismatch. std::vector<int> is expected" << std::endl; } }
MessagePackシリアライズ ゼロコピー版(C++)
msgpack::sbufferの代わりにmsgpack::vrefbufferを使って、文字列をコピーせずにシリアライズして標準出力するコードです。
writevを使って出力します。
#include <msgpack.hpp> #include <unistd.h> #include <vector> #include <string> #include <iostream> int main(void) { // ["a"*64, "b"*64] std::vector<std::string> vec; char aaa[64]; memset(aaa, 'a', sizeof(aaa)); char bbb[64]; memset(bbb, 'b', sizeof(bbb)); vec.push_back(std::string(aaa, sizeof(aaa))); vec.push_back(std::string(bbb, sizeof(bbb))); msgpack::vrefbuffer vbuf; // バッファ msgpack::pack(vbuf, vec); const struct iovec* iov = vbuf.vector(); size_t iov_size = vbuf.vector_size(); writev(1, iov, iov_size); }
MessagePackストリームデシリアライズ(C++)
標準入力からデータを読み込みつつ、オブジェクトを1つデシリアライズできたらon_message関数を呼び出すコードです。
バッファリングやメッセージの切り出しは、すべてMessagePackのライブラリが面倒を見てくれます。
#include <msgpack.hpp> #include <unistd.h> #include <vector> #include <string> #include <iostream> #include <errno.h> #include <memory> typedef std::auto_ptr<msgpack::zone> auto_zone; void on_message(msgpack::object msg, auto_zone z) { std::cout << "message reached: " << msg << std::endl; } int main(void) { msgpack::unpacker pac; while(true) { pac.reserve_buffer(32); ssize_t count = read(0, pac.buffer(), pac.buffer_capacity()); if(count <= 0) { if(count == 0) { return 0; } if(errno == EAGAIN || errno == EINTR) { return -1; } throw std::runtime_error(strerror(errno)); } pac.buffer_consumed(count); while(pac.execute()) { msgpack::object msg = pac.data(); auto_zone z(pac.release_zone()); pac.reset(); on_message(msg, z); } } }
MessagePack-RPCサーバ(Ruby)
MessagePack-RPCライブラリを使ってサーバを実装する例です。
set_asyncは、非同期に結果を返します。
#!/usr/bin/env ruby require 'rubygems' require 'msgpack/rpc' # gem install msgpack-rpc class MyServer def initialize(svr) @svr = svr @hash = Hash.new end def get(key) @hash[key] end def set(key, val) @hash[key] = val end def set_async(key, val) as = MessagePack::RPC::AsyncResult.new @svr.submit do @hash[key] = val as.result(val) end as end end svr = MessagePack::RPC::Server.new svr.listen('0.0.0.0', 9090, MyServer.new(svr)) svr.run
MessagePack-RPCクライアント(Ruby)
MessagePack-RPCライブラリを使ってサーバを実装する例です。
非同期型の呼び出し(sendとjoin)を使うと、並列して複数の関数呼び出しを行うことができます。
#!/usr/bin/env ruby require 'rubygems' require 'msgpack/rpc' # gem install msgpack-rpc cli = MessagePack::RPC::Client.new('127.0.0.1', 9090) cli.call(:set, "key1", "val1") p cli.call(:get, "key1") kv = (1..100).map {|i| ["key#{i}", "val#{i}"] } # 同期呼び出し # set key1 -> # ret key1 <- # set key2 -> # ret key2 <- # ... kv.map {|key, val| cli.call(:set, key, val) } # 非同期呼び出し # set key1 -> # set key2 -> # ... # ret key1 <- # ret key2 <- # ... kv.map {|key, val| cli.send(:set, key, val) }.map {|req| req.join.result }
MessagePack-RPCプロキシ(Ruby)
MessagePack-RPCを使ってプロキシサーバを実装する例です。イベント駆動型のサーバです。プロキシなのでクライアント的な部分も含んでいますが、イベント駆動型で動作するように、ブロックしないコールバック型の呼び出しを使っています。
また、セッションプーリングを利用しています(get_sessionのところ)。一度作成したセッションやコネクションは何度も使い回すことで、コネクションを確立する遅延を削減します。
#!/usr/bin/env ruby require 'rubygems' require 'msgpack/rpc' # gem install msgpack-rpc class MyProxy def initialize(svr) @svr = svr end def get_proxy(host, port, key) s = @svr.get_session(host, port) as = MessagePack::RPC::AsyncResult.new s.callback(:get, key) do |err, res| as.result(res, err) end as end end svr = MessagePack::RPC::Server.new svr.listen('0.0.0.0', 9091, MyProxy.new(svr)) svr.run
色つきログ
kumofsで実際に利用している、ログを出力するコードです。
MessagePackでデシリアライズしたオブジェクトは、未知の(バグった)型のデータでも(Rubyのコードのように動的型のまま扱うことで)prettyに整形して表示できます。ログには自動的にタイムスタンプや関数名が付与されます。
#include <msgpack.hpp> #include <unistd.h> #include <vector> #include <string> #include <iostream> #include <errno.h> #include <memory> #include "cclog/cclog.h" #include "cclog/cclog_tty.h" typedef std::auto_ptr<msgpack::zone> auto_zone; void on_message(msgpack::object msg, auto_zone z) { std::cout << "message reached: " << msg << std::endl; LOG_TRACE("message reached: ", msg, " (level TRACE)"); LOG_DEBUG("message reached: ", msg, " (level DEBUG)"); LOG_INFO ("message reached: ", msg, " (level INFO)"); LOG_WARN ("message reached: ", msg, " (level WARN)"); LOG_ERROR("message reached: ", msg, " (level ERROR)"); } int main(void) { cclog::reset(new cclog_tty(cclog::TRACE, std::cout)); msgpack::unpacker pac; while(true) { pac.reserve_buffer(32); ssize_t count = read(0, pac.buffer(), pac.buffer_capacity()); if(count <= 0) { if(count == 0) { return 0; } if(errno == EAGAIN || errno == EINTR) { return -1; } throw std::runtime_error(strerror(errno)); } pac.buffer_consumed(count); while(pac.execute()) { msgpack::object msg = pac.data(); auto_zone z(pac.release_zone()); pac.reset(); on_message(msg, z); } } }
ログにタイムスタンプ、ファイル名、行数が自動的に付与されます。TRACEレベルとDEBUGレベルのログには関数名も付いています。