読者です 読者をやめる 読者になる 読者になる

MessagePack for C++


この記事はもう古いです。「MessagePack for C++ template版」を見てください。


バイナリシリアライズ形式「MessagePack」C++APIを開発しています。
Thriftなどのコードジェネレータを利用して静的なクラスを生成するものとは差別化を図り、IDLを記述しないで使える動的型にこだわりました。


ストリームデシリアライザには、ちょっと高度なバッファ機構を実装してあります。バッファをmalloc()したりrealloc()したりというコードを書く必要が無く、ライブラリの中で自動的にバッファリングされます。

一方シリアライザはバッファを自分で実装するようになっています(面倒なときはstd::ostreamを使えます)。普通にメモリに追記していきつつ適宜realloc()する方法の他にも、バッファリングせずに直接ファイルに書き出したり、ソケットに書き込んだりと、環境に合わせて最適な出力方法を選択できます。柔軟でありつつtemplateで最適化されるので、オーバーヘッドは生じません。


オブジェクトの寿命はメモリプールで一括管理されます。メモリプールの寿命にさえ気をつければ、細々としたオブジェクトの寿命を気にする必要はありません。GCほど柔軟ではありませんが、たぶんGCより高速です。


CodeRepos://lang/c/msgpackからSVNでダウンロードできます。

$ svn co http://svn.coderepos.org/share/lang/c/msgpack/trunk msgpack-svn
$ cd msgpack-svn
$ ./bootstrap
$ ./configure --prefix=/path/to/install
$ make -j3
$ make install

サンプルコード

RPC風のメッセージをシリアライズ・デシリアライズするサンプルです。

  • sample.cpp
#include <msgpack.hpp>
#include <vector>
#include <memory>
#include <iostream>
#include <sstream>

void rpc_request(
    msgpack::raw method,  // struct msgpack::raw { const char* ptr; size_t len; };
    msgpack::object args,
    unsigned int id)
{
    std::cout << "request\n";
    std::cout << "  id:      " << id << "\n";
    std::cout << "  method:  "; std::cout.write(method.ptr, method.len) << "\n";
    std::cout << "  args:    " << args << "\n";
    std::cout << std::endl;
}

void rpc_response(
    msgpack::object result,
    msgpack::object error,
    unsigned int id)
{
    std::cout << "response\n";
    std::cout << "  id:      " << id << "\n";
    std::cout << "  result:  " << result << "\n";
    std::cout << "  error:   " << error  << "\n";
    std::cout << std::endl;
}


int main(void)
{
    using msgpack::object;

    // ここにメッセージをシリアライズする
    // R write(Pointer, Size); な関数が実装されたクラスなら何でもOK
    std::stringstream stream;

    // クライアント側コード

    {  // stream << [true, "GetColor", ["apple", "lemon"], 1].to_msgpack
        msgpack::zone z;
        object msg = z.narray(
                z.ntrue(),
                z.nraw_cstr_ref("GetColor"),
                z.narray(
                    z.nraw_cstr_ref("apple"),
                    z.nraw_cstr_ref("lemon")
                ),
                z.nu32(1)
            );
        msg.pack(stream);
    }

    {  // stream << [false, {"apple"=>"red", "lemon"=>"yellow"}, nil, 1].to_msgpack
        msgpack::zone z;
        object msg = z.narray(
                z.nfalse(),
                z.nmap(
                    z.nraw_cstr_ref("apple"), z.nraw_cstr_ref("red"),
                    z.nraw_cstr_ref("lemon"), z.nraw_cstr_ref("yellow")
                ),
                z.nnil(),
                z.nu32(1)
            );
        msg.pack(stream);
    }


    // サーバー側コード

    // ストリームデシリアライザ
    msgpack::unpacker upk;

    while(stream.good()) {  // streamが尽きるまでループ
        // 1. バッファを確保
        upk.reserve_buffer(8*1024);

        // 2. バッファにデータを読み込む
        stream.read(upk.buffer(), upk.buffer_capacity());

        // 3. 実際に読み込めたバイト数を指定する
        upk.buffer_consumed(stream.gcount());

        // 4. false が返るまで execute() を繰り返し呼ぶ
        while(upk.execute()) {

            // 5.1. デシリアライズされたオブジェクトを取り出す
            object msg = upk.data();

            // 5.2. オブジェクトの寿命は zone を delete するまで
            std::auto_ptr<msgpack::zone> z(upk.release_zone());
            // release_zone() の返り値は delete しないとメモリリークするので、
            // std::auto_ptr や boost::shared_ptr などで受ける

            // msgpack::object は iostream に出力できる
            // puts "rpc message: #{msg.inspect}"
            std::cout << "rpc message: " << msg << std::endl;

            try {
                // dynamic_cast要らず。暗黙の型変換
                std::vector<object>& rpc(msg);

                if(rpc.at(0)) {
                    rpc_request(rpc.at(1),      // 暗黙の型変換
                                rpc.at(2),
                                rpc.at(3));     // 暗黙の型変換
                } else {
                    rpc_response(rpc.at(1),
                                 rpc.at(2),
                                 rpc.at(3));    // 暗黙の型変換
                }

            } catch (std::bad_cast&) {     // 型変換に失敗
                std::cout << "invalid message" << std::endl;
            } catch (std::range_error&) {  // std::vector<T>::at()が範囲オーバー
                std::cout << "invalid message" << std::endl;
            }

            // 5.3. ストリームデシリアライザをリセット
            upk.reset();
        }
    }

}
コンパイル
$ # libmsgpackだけスタティックリンクする on Linux
$ g++ -g -Wall -O4 sample.cpp -o sample -Wl,-Bstatic -lmsgpack -Wl,-Bdynamic

$ # libmsgpackだけスタティックリンクする on Mac OS X
$ # Mac OS X で部分的にスタティックリンクってどうやるんでしょう…
$ g++ -g -Wall -O4 sample.cpp -o sample /usr/local/lib/libmsgpack.a
実行結果
$ ./sample
rpc message: [true, "GetColor", ["apple", "lemon"], 1]
request
  id:      1
  method:  GetColor
  args:    ["apple", "lemon"]

rpc message: [false, {"apple"=>"red", "lemon"=>"yellow"}, nil, 1]
response
  id:      1
  result:  {"apple"=>"red", "lemon"=>"yellow"}
  error:   nil

提供されるクラス

msgpack::object

動的型のオブジェクト。
msgpack::object::xi32() や msgpack::object::xarray() などのメンバ関数で int32_t や std::vector& などの型にキャストできるほか、暗黙の型変換も実装されています。キャストに失敗すると msgpack::type_error(std::bad_castを継承している)を投げます。

msgpack::object::pack(Stream& s)でシリアライズできます。

std::ostream::operator<< に対応しており、Rubyのinspectメソッド風に整形して表示できます。

msgpack::zone

固定長のメモリプール + ファイナライザ。
objectをzoneから確保していくと、zoneを解放すると同時にobjectがすべて解放されます。
msgpack::zone::u32(uint32_t val) や msgpack::zone::narray() などのメンバ関数で新しいオブジェクトを確保できます。
msgpack::zone::narray(...) と msgpack::zone::nmap(...) は型安全な可変長引数(16個まで)をサポートしており、引数のオブジェクトが中に入った配列や連想配列を作成できます。

msgpack::packer

ストリームシリアライザ。
template引数には、R write(ConstPointer, SomeSizeType) という型のメンバ関数を実装したクラスを指定し、コンストラクタでそのクラスのインスタンスへの参照を渡します。Rは任意の型。ConstPointerはconst char*やconst void*など。SomeSizeTypeはsize_tやintなど。
Streamは自分で実装することもできますが、std::ostreamがこの条件を満たしているので、std::stringstreamやstd::ofstreamなどを使うこともできます。

msgpack::object::pack(Stream& s)(またはmsgpack::pack(Stream& s, object o))の方が使いやすいので、パフォーマンスが要求されるとき以外はストリームシリアライザを使うことはないと思います :-p

msgpack::unpacker

バッファ付きストリームデシリアライザ。
使い方はサンプルコード参照。ストリーム処理が必要ないは、msgpack::object msgpack::unpack(const char* data, size_t len, msgpack::zone& z) でデシリアライズできます。

シリアライズに失敗するとmsgpack::unpack_error(std::runtime_errorを継承している)を投げます。