分散ファイルシステムのインターフェース

少しずつ分散ファイルシステムの実装を進めています。aio_read()を使ったTCPコネクションマネージャの実装がだいたいできました。なんだかんだで既に3500行を突破…。



IOインターフェースについて。

ファイルシステムなら、インターフェースとしてopen()やwrite()やread()があって、stat()があって、となりますが、SQLだとSQL文でアクセスすることになります。しかしどちらにしても、アクセスはwriteとread(とロック)に分けらます。それをうまく分散してくれるインフラさえあれば、そのインフラにアクセスするインターフェースは取り替えられるはずです。


そこで、インターフェースは切り替え可能なアーキテクチャにしようと思っています。ある名前から対応する値を取り出すことができる、いわゆる連想配列を提供するだけで、インターフェースは後付けします。
「名前」は可変長の文字列で、「値」は可変長のバイト列です。(名前もバイト列でOK)

ファイルシステムインターフェースは、「名前」=「ファイルパス」、「値」=「ファイルのメタデータと中身」として、分散インフラにアクセスします。たとえば、「値」の先頭から32バイトはパーミッションやタイムスタンプなどのメタデータで、33バイト目以降をファイルの中身とします。

一方で、ストレージインターフェースもセットで取り替え可能に。「値」の先頭から32バイトへの書き込みがあればファイルのパーミッションやタイムスタンプを変更し、33バイト目以降に書き込みがあればファイルの中身を変更します。

             アプリケーション
                    ↓ chown("/path/to/hoge", 0, 0)
+---------アクセスインターフェース----------+
                    ↓ write("/path/to/hoge"の0バイト目から2バイトに、"00"をwrite)
               分散インフラ
                    ↓ write("/path/to/hoge"の0バイト目から2バイトに、"00"をwrite)を保持せよ
+--------ストレージインターフェース---------+
                    ↓ chown("/path/to/hoge", 0, 0)
        HDD、別のファイルシステム、メモリ…


アクセスインターフェースから分散インフラのアクセス方法は、基本的にwriteとreadだけになるわけですが、トランザクションはスイッチで切り替えられる必要があります。

write系では、おそらくdirect_write()、buffered_write()、atomic_write()、の3種類になります。これは5/2のエントリに書いた、「流れてきたデータを下層のファイルシステムに書き込む方法は3つ」の3つに対応します。

direct_write()では書き込み中に読み込みがあったとき、書き込みかけのデータが読み込まれます。buffered_write()でも、書き込みかけのデータが読み込まれますが、direct_write()よりもその確率が低い(direct_とbuffered_は片方だけになるかもしれません)。atomic_write()は、書き込みかけのデータが読み込まれることはありません(その代わり遅い)。

2回連続してwriteするときに、1回もwriteしていないときのデータか、2回writeした後のデータしか読み込めてはいけないようにするには、最初にロックフラグを立てるなりの処理をすることになります。(分散インフラでlockもサポートするべきかもしれない)



read系では、direct_read()、cooperative_read()、filtered_read()になると思います。
direct_read()は、atomic_write()されているデータであろうが無視して書き込みかけデータを読み込みます。atomic_write()の努力が水の泡になるわけですが、オーバーヘッドは最も小さい。cooperative_read()は、atomic_write()されているデータであれば、書き込みかけのデータがあれば、書き込みが完了するまで待ってから読み込みます。また、読み込み中にwriteされないようにします。

filtered_read()は、読み込み時にフィルタをかけるcooperative_read()です。これは高速な検索をしたいときに使うことになると思います。データを全部readしてから自前で検索するようにすると、巨大なデータの中から小さなデータを探し出したいときは、大量のトラフィックが無駄になります。そこでread先で検索してもらって、結果だけ受け取ります。

これは呼び出し方からして特殊になるはずで、↓のような感じになると思います。

// 参考 cooperative_read()
void cooperative_read( const char* key,
                       uint64_t from,
                       uint32_t len,
                       void* buffer  );
                       // bufferの長さはlen

// filtered_read()
void filtered_read( const char* key,
                    int filter_id,
                    const void* filter_arg,
                    uint32_t filter_arg_len,
                    FilteredResult* result   );
                    // 事前にbufferは確保しなくていい。結果はresultに入る

template <typename Function>
void regist_filter(int id, Function filter);

struct FilteredResult : private boost::noncopyable {
        std::vector< std::pair<boost::shared_array<char>, uint32_t> > buffer_vector;
        // バッファとその長さの組の配列
};

filtered_read()は、filter_argを引数としてフィルタ関数を実行して、その結果をresultに入れます。resultは、バッファとその長さの組が配列になっていて、フィルタをかけた後の結果が入っています。どんな形式で入っているかは、フィルタ関数(filter)が決めます。フィルタ関数には、おそらくストレージインターフェースのメンバ関数を使うことになると思います。

フィルタ関数は、あらかじめregist_filter_functionでフィルタIDを登録しておきます。呼び出すときはそのフィルタIDを指定して呼び出します。(つまり、すべてのノードが同じフィルタIDに同じ関数を登録していないとおかしなことになります。すべてのノードは同じアクセスインターフェースを使っていないといけない)



atomic_write()もfiltered_read()もアイディアだけで、実装するとしてもかなり先になると思います。ファイルシステムとして使うならdirect_write()(またはbuffered_write())とdirect_read()しか必要ないので、まずはここから。