分散ファイルシステムのインターフェース
少しずつ分散ファイルシステムの実装を進めています。aio_read()を使ったTCPコネクションマネージャの実装がだいたいできました。なんだかんだで既に3500行を突破…。
IOインターフェースについて。
ファイルシステムなら、インターフェースとしてopen()やwrite()やread()があって、stat()があって、となりますが、SQLだとSQL文でアクセスすることになります。しかしどちらにしても、アクセスはwriteとread(とロック)に分けらます。それをうまく分散してくれるインフラさえあれば、そのインフラにアクセスするインターフェースは取り替えられるはずです。
そこで、インターフェースは切り替え可能なアーキテクチャにしようと思っています。ある名前から対応する値を取り出すことができる、いわゆる連想配列を提供するだけで、インターフェースは後付けします。
「名前」は可変長の文字列で、「値」は可変長のバイト列です。(名前もバイト列でOK)
ファイルシステムインターフェースは、「名前」=「ファイルパス」、「値」=「ファイルのメタデータと中身」として、分散インフラにアクセスします。たとえば、「値」の先頭から32バイトはパーミッションやタイムスタンプなどのメタデータで、33バイト目以降をファイルの中身とします。
一方で、ストレージインターフェースもセットで取り替え可能に。「値」の先頭から32バイトへの書き込みがあればファイルのパーミッションやタイムスタンプを変更し、33バイト目以降に書き込みがあればファイルの中身を変更します。
アプリケーション ↓ chown("/path/to/hoge", 0, 0) +---------アクセスインターフェース----------+ ↓ write("/path/to/hoge"の0バイト目から2バイトに、"00"をwrite) 分散インフラ ↓ write("/path/to/hoge"の0バイト目から2バイトに、"00"をwrite)を保持せよ +--------ストレージインターフェース---------+ ↓ chown("/path/to/hoge", 0, 0) HDD、別のファイルシステム、メモリ…
アクセスインターフェースから分散インフラのアクセス方法は、基本的にwriteとreadだけになるわけですが、トランザクションはスイッチで切り替えられる必要があります。
write系では、おそらくdirect_write()、buffered_write()、atomic_write()、の3種類になります。これは5/2のエントリに書いた、「流れてきたデータを下層のファイルシステムに書き込む方法は3つ」の3つに対応します。
direct_write()では書き込み中に読み込みがあったとき、書き込みかけのデータが読み込まれます。buffered_write()でも、書き込みかけのデータが読み込まれますが、direct_write()よりもその確率が低い(direct_とbuffered_は片方だけになるかもしれません)。atomic_write()は、書き込みかけのデータが読み込まれることはありません(その代わり遅い)。
2回連続してwriteするときに、1回もwriteしていないときのデータか、2回writeした後のデータしか読み込めてはいけないようにするには、最初にロックフラグを立てるなりの処理をすることになります。(分散インフラでlockもサポートするべきかもしれない)
read系では、direct_read()、cooperative_read()、filtered_read()になると思います。
direct_read()は、atomic_write()されているデータであろうが無視して書き込みかけデータを読み込みます。atomic_write()の努力が水の泡になるわけですが、オーバーヘッドは最も小さい。cooperative_read()は、atomic_write()されているデータであれば、書き込みかけのデータがあれば、書き込みが完了するまで待ってから読み込みます。また、読み込み中にwriteされないようにします。
filtered_read()は、読み込み時にフィルタをかけるcooperative_read()です。これは高速な検索をしたいときに使うことになると思います。データを全部readしてから自前で検索するようにすると、巨大なデータの中から小さなデータを探し出したいときは、大量のトラフィックが無駄になります。そこでread先で検索してもらって、結果だけ受け取ります。
これは呼び出し方からして特殊になるはずで、↓のような感じになると思います。
// 参考 cooperative_read() void cooperative_read( const char* key, uint64_t from, uint32_t len, void* buffer ); // bufferの長さはlen // filtered_read() void filtered_read( const char* key, int filter_id, const void* filter_arg, uint32_t filter_arg_len, FilteredResult* result ); // 事前にbufferは確保しなくていい。結果はresultに入る template <typename Function> void regist_filter(int id, Function filter); struct FilteredResult : private boost::noncopyable { std::vector< std::pair<boost::shared_array<char>, uint32_t> > buffer_vector; // バッファとその長さの組の配列 };
filtered_read()は、filter_argを引数としてフィルタ関数を実行して、その結果をresultに入れます。resultは、バッファとその長さの組が配列になっていて、フィルタをかけた後の結果が入っています。どんな形式で入っているかは、フィルタ関数(filter)が決めます。フィルタ関数には、おそらくストレージインターフェースのメンバ関数を使うことになると思います。
フィルタ関数は、あらかじめregist_filter_functionでフィルタIDを登録しておきます。呼び出すときはそのフィルタIDを指定して呼び出します。(つまり、すべてのノードが同じフィルタIDに同じ関数を登録していないとおかしなことになります。すべてのノードは同じアクセスインターフェースを使っていないといけない)
atomic_write()もfiltered_read()もアイディアだけで、実装するとしてもかなり先になると思います。ファイルシステムとして使うならdirect_write()(またはbuffered_write())とdirect_read()しか必要ないので、まずはここから。