読者です 読者をやめる 読者になる 読者になる

非同期双方向TCPコネクションプールの実装メモ

TCPで、1回接続したコネクションをとっておいて、後で使い回したい。つまりコネクションプールを作りたい。これをどうやるか。4/29のエントリの続きです。

V-FIELDでは、データを待ち受けるときはselect()を使って、溜めてあるソケットに変化があったら、スレッドプールからスレッドを取り出して処理を行う、ということを行っていました。データを送りたいときは普通に溜めてあるソケットを使ってデータを送るのですが、両側から同時にデータが送信される可能性があるので、一度小さなデータをやりとりしてネゴシエーションしてから、実際のデータを送ります。コネクションプールの実装方法としては標準的な方法かな、と思っています。

それが、どうもselect()にはいろいろと問題があるらしいので、今度は違う実装にしたいと思っています。そこで、aio_read()。普通はselect()の次はepollかkqueue、となるらしいですが、aio_read()の方が面白そうなので、と。


aio_read()は、「関数ポインタ」「バッファ」「ソケット」を含んだ構造体を渡すと、そのソケットに何かデータが届いたときに、関数ポインタを実行してくれるというもの。このとき先の構造体を引数として渡してくれます。バッファにはデータが入っています。ソケットからバッファにデータを入れる処理や、関数を実行するときにスレッドを立ち上げる処理をカーネル内でやってくれるらしく、速いらしいです。

aio_read()は、データを待ち受けるときに使う。こちらからデータを送りたいときは、まず「どこかの変数」に、どこにデータを送りたいかを書いておく。次にaio_cancel()という関数を使って、aio_read()の予約をキャンセルする。するとaio_read()に登録されていた関数ポインタは、「キャンセルされました」という情報と共に実行される。このとき「どこかの変数」を調べて、どこにデータを送りたいがためにキャンセルされたのかを調べる。調べたら、データが送られてくるのを待つ。方やaio_cancel()を発行したスレッドは、データを待っているスレッドとは別に動いているので、そのままデータを送る。送れば相手は何かデータを返してくる。返ってきたら、データを待っているスレッドがそれを受け取る。ここで期待したヘッダが付いていない場合、コネクションが衝突したと判断する。

なにやら複雑になっていますが、aio_cancel()でaio_read()をキャンセルしたら、そのまま自分で(aio_cancel()を呼び出したスレッドで)データを送って受ければ良いのでは、とも思うわけですが、キャンセルしたとしてもせっかくaio_read()のおかげでスレッドが1つ起動するので、これを有効活用したい。シングルコアなCPUではスレッドを2つ使おうが1つにしようが性能は変わらない(むしろ2つだと遅くなる)わけですが、今後はx86でもマルチコア化は必定なので、並列処理できるところはどんどん並列化した方が良いはず。それで排他処理が必要になると遅くなってしまうわけですが、今回の場合はデータの送信とデータの待ち受けなので、排他処理は必要ないはず。(データの待ち受け側は、データを全部送り終わったのかどうか知らないといけないけど、それは接続相手から送られてくるデータで分かる。よってcondition不要、ロック不要)



前提条件として、接続先のアドレスはNodeAddressというクラスを使って表現します。これはIPアドレスとポート番号が19バイトの固定長バイト列に入っているものです。(IPv4IPv6に両対応しています)

class NodeAddress {  // 一部
public:
    inline bool isIPv6(void) const throw();
    inline const struct in6_addr* getin6addr(void) const throw();
    inline const struct in_addr   getin4addr(void) const throw();
    inline const uint16_t getPortByNetworkByteOrder(void) const throw();
private:
    char m_data[19];
};

struct NodeAddressHash {  // ハッシュ関数
    size_t operator() (const NodeAddress& node) const throw();
};

さて、以上の仕組みを実装しようと既に2週間ほど経っているのですが、なかなか進んでいません。大きな問題は、aio_read()渡した各種ポインタが、一度どこか得体の知れないところに飛んでいってしまうこと。そして、突然関数ポインタが呼ばれて返ってくる。返ってくるまでの間にメモリを解放してはいけないし、かといって要らなくなれば解放しないとメモリリークになる。aio_read()にスマートポインタが渡せれば、と思うものの、そうはいかない。



struct ASHandlerFunction {
    virtual ~ASHandlerFunction() throw() {}
    virtual void operator() (AIOSocket& socket, char* initbuf, size_t initbuf_len) = 0;
};

struct ASSendFunction {
    virtual ~ASSendFunction() throw() {}
    virtual void operator() (AIOSocket& socket) = 0;
};

class ASStreamManagerIMPL {
public:
    // データ待ち受け用
    typedef uint8_t magic_type;
    void registHandler(magic_type magic, ASHandlerFunction* handler);
public:
    // データ送信用
    void negotiate(const NodeAddress& to, ASSendFunction* proc);

    // 本当はこうしたいものの…
    // template <typename Func>
    // void negotiate(const NodeAddress& to, Func proc);

private:
    class ManagedEntry : private boost::noncopyable {
    public:
        // アクセサ未定
    private:
        NegotiateRequest m_req;
        boost::scoped_ptr<AIOContext> m_aiocb;
    };

    class NegotiateRequest : private boost::noncopyable {
    public:
        inline bool isRequested() const;
        void run(AIOSocket& socket);
    public:
        void request(ASSendFunction* proc);
    private:
        boost::scoped_ptr<ASSendFunction> m_proc;
    };

public:
    typedef stdext::hash_multimap<NodeAddress, boost::shared_ptr<ManagedEntry>, NodeAddressHash> managed_type;
private:
    // プールとそのロック
    managed_type m_managed;
    boost::mutex m_managed_lock;
};

class AIOContext : boost::noncopyable {
public:
    // アクセサ未定

public:
    // 内部バッファを新しく確保したバッファと入れ替える。前のバッファを返す。
    char* switchBuffer(void);

private:
    // バッファ    // scoped_ptrは中身を入れ替えられない
    std::auto_array<char> m_buffer;

     // ソケット
    int m_socket;

    // 関数ポインタ
    ASHandlerFunc* handler;

    // プールのコンテナへのイテレータ。ソケットをclose()したらプールから取り除かないといけない
    ASStreamManagerIMPL::managed_type::iterator m_iterator;
};

class AIOSocket {
private:
    // m_contexはASStreamManagerIMPL::m_managedの中身の
    // ManagedEntryの持っているメンバ変数へのポインタなので、
    // こちらの方が寿命が長いとSegmentation Faultで落ちる
    // AIOSocketは中(AIOContext)にバッファを持っているのがポイント
    AIOContext* m_context;
public:
    size_t write(const void* buf, size_t count);
    size_t read(void* buf, size_t count);
    size_t sendfile(int in_fd, off_t *offset, size_t count);
    // ...
};

こちらからデータを送信したいときは、ASStraemManager::negotiate()を使う。送りたい宛先のアドレスと、キャンセルしたaio_readスレッドから呼んで欲しい関数(proc)を渡す。すると、negotiate()を呼び出したスレッドと、procは、別のスレッドで実行される。procは関数オブジェクトなので、メンバ変数にデータを持たせておくことも可。関数オブジェクトはtemplateを使ってASSendFunctionを継承しなくてもいいようにしたいところなのだけど、そのまま呼ぶわけではなく1回コンテナに入れないといけないので、型情報を維持するにはこの方法しか思いつかない。ただ継承したものを基底クラスで保持するときはポインタで持っていないといけないので、どうしてもまたメモリ管理が必要になってしまう。むむむ。

これだけのデータメンバが必要なことは分かるのですが、1回得体の知れないところに飛んでいってしまう以上、ポインタで持っていないといけないところが厄介。うーむ、やっぱりGCが欲い…そもそもaio_系がGCに対応していないのでダメですが。





もう一つ問題なのが、コネクションをプールしているコンテナ(ASStreamManagerIMPL::m_managed)のロック粒度が大きすぎないか?ということ。どこかと通信を開始するたびにこのコンテナをロックするので、ここのロック粒度は小さくしたい。つまり、コンテナを小分けにしたい。どう分けると良いか?

それから、hash_multimap(SGI STLのハッシュテーブル)のような複雑なコンテナは実は要らないのでは?という疑問。ハッシュテーブルは検索が速いのは良いのですが、検索する前に1回ハッシュを計算しないといけないので、そのオーバーヘッドが実は大きいのかもしれない。それにコネクションプールと言ってもLAN内でしか使わず、1台のノードについて1本、多くてもせいぜい2本しかコネクションしか張らないので、エントリの数はがんばっても200かそこらです。これくらいの数でハッシュテーブルは要るのか、要らないのか…

それから、V-FIELDの挙動を調べていて分かったのですが、コネクションを張り直す(ネゴシエーションが衝突する)ことは、ほとんどない。それからノードの追加や離脱は頻繁には発生しない。そのため、エントリの挿入や削除の頻度は多くない。それよりも参照(検索)が圧倒的に多い。というわけで、これはソート済みvector(Effective STLの第23項に書いてあった)が向いていそう。

コンテナのキーに使うNodeAddressは、内部データを一つの固定長配列で持っているという、完全効率指向のクラスにしているので、operator<にはmemcmpが使える。memcmpといえばgccでは組み込み関数で、最適化が効いて速そうな気がする。

ただソート済みvectorにすると今度は、挿入や削除のときにイテレータが無効化されてしまう。そのためソケットをclose()したのでコンテナからエントリを削除したい、というときに、どのエントリを削除すればいいのか分からなくなってしまう。うーむ。NodeAddressで検索→ポインタを線形検索でいいかな。どうせ頻度は多くないわけだし。