読者です 読者をやめる 読者になる 読者になる

非同期双方向TCPコネクションマネージャ

日記改題。

分散ファイルシステムはまだプロトコルの設計が終わっていませんが、ちまちまとコードを書き始めています。昨日と今日は非同期で双方向なTCPコネクションマネージャ(コネクションプール?)を作ってみました…まだできていませんが。

V-FIELDではselect()を使っていますが、select()は遅いらしいので、aio_*(aio_read、aio_write…)です。epollやkqueueなんてものもありますが、aio_*の方が面白そうなので。



インターフェースは↓こんな感じです。

class ASStreamManager {
public:
        ASStreamManager(uint16_t listen_port,  unsigned int max_connection);
        ~ASStreamManager() throw();
public:
        typedef uint8_t magic_type;

        template<typename ASHandlerFunc>
                void registHandler(magic_type magic, ASHandlerFunc handler);

        negotiate(const NodeAddress& to, magic_type magic, NotifyClient<bool>& negotiate_notify);
};

class ASHandlerBase {
public:
        virtual ~ASHanlderBase() throw() {}
        virtual void operator() (AIOSocket& sock, size_t initbuflen) = 0;
};

はてなのシンタックスカラーリング]は素晴らしい)



飛んできたパケットの先頭1バイトを読んで、0だったら関数A、1だったらB、2だったらC…というのは良くあるパターンだと思いますが、そう言った関数をASStreamManager::registHandler()で登録しておきます。パケットが飛んできたときに、別のスレッドで登録しておいたASHandlerFuncが呼ばれます。

ASHandlerFuncの引数には、通信してきた相手につながっているソケット(sock)と、最初に読み込まれたバッファ(sock.buffer())とその長さ(initbuflen)が渡されます。



こちらから他のノードにデータを送りたいときは、ASStreamManager::negotiate()を使います。

ここで、negotiate()はネゴシエーションが終わっていなくてもすぐに返ってきます。ブロックしません。

ASStreamManagerは双方向で使うコネクションプールで、「相手から接続されたコネクション」も「こちらから接続しにいったコネクション」も、両方プールして使います。そのため、こちらから通信を開始したときに、相手側も同時に同じコネクションを使って通信を開始する可能性があります。衝突を回避するために、こちらからmagicを送った後に、magicと対応するack_magicが返ってこなかったら、別のコネクションを使うようにします(ネゴシエーション)。衝突したコネクションは切断します。

V-FIELDに実装したコネクションマネージャは、ack_magicを待って、ネゴシエーションが成功したことを確認してから、実際に送りたいデータを送り始めます。しかしASStreamManagerは、ネゴシエーションが成功したことを確認する前から、データを送り始められます。ネゴシエーションが成功したかどうかは、negotiate_notifyを使って非同期で知らせます。(negotiate_notify.wait()とすると、ネゴシエーションが終わるまでブロックして、成功か失敗かが返ってくる)

  1. magicを送る
  2. ack_magicを待つ
  3. データを送る
  4. データを受け取る

が、

  1. magicを送る
  2. データを送る
  3. ack_magicを待つ
  4. データを受け取る

になります。

1と2、3と4は同じパケットにまとめられるので、1ホップ削れます。こちらから大きなデータを送るという場合には、遅延ゼロで送りっぱなしにできます。

送っておいて実はネゴシエーションが失敗していたという場合には、送る前の状態にロールバックしないといけませんが、そこは通常の例外処理も同じでしょう。


ついでに、AIOSocketはバッファを持っています(サイズはASStreamManager::STREAM_BUFFER_SIZEで、おそらく16KB)。これはaio_readのために確保したバッファで、自由に使い回せます。つまり、メモリの確保と解放のオーバーヘッドを無くせます。


ソースコードはもう少し先に分散ファイルシステムと一緒に公開します。