セッションサーバーの実装 - Internal Partty!.org

Partty!.orgの中核は、ターミナルの操作画面を中継するセッションサーバーです。セッションサーバーはC++で書いてあります。
ターミナルの操作画面をホスト(partty.orgコマンド)から受け取り、Flexやtelnetのゲストに転送します。


セッションサーバーは主に以下の3つの部分から構成されています:

Multiplexer
1つのホストからターミナルの画面データを受け取り、複数のゲストに転送します。また、複数のゲストからキー入力を受け取り、ホストに転送します。
Server
ホストからの接続を受け付け、Multiplexerを起動(fork(2))します。
Gate
ゲストからの接続を受け付け、Multiplexerにファイルディスクリプタを受け渡します。

関係を図にすると↓こんな感じになります。


Multiplexerの動作方法は単純で、select(2)やpoll(2)などの、いわゆるIO多重化のサンプルプログラムのようなプログラムです。

  • ホストからデータ(ターミナル画面)が届いたら、(1)すべてのゲストにコピーする、(2)録画データに追記する
  • ゲストからデータ(キー入力)が届いたら、ホストにコピーする
  • Gateからデータ(新しいゲストから接続)が届いたら、ゲストのリストに加える


基本的には上の動作をしているだけですが、応答速度を良くするために、ゲスト1台ごとにバッファ(転送待ちバッファ)をもうけています。ホストからデータが届いたとき、すぐにゲストに転送できればすぐに転送しますが、そうでない場合はそのゲストの転送待ちバッファに溜めておきます。転送待ちバッファにデータが溜まっている間は、そのゲストを書き込み待ちで監視し、書き込み可能になったときにデータを送ります。


ポイントはIO多重化に直接selectやepollを使うのではなく、mpio::eventで抽象化しているところです。これによってイベント待ち部分のコードがいくぶんすっきり書けています。

int MultiplexerIMPL::run(void)
{
        int fd;
        short event;
        while(1) {
                while( mpev.next(&fd, &event) ) {   // ←mpio::event
                        if( fd == host ) {  // Host -> Guests
                                io_host(fd, event);
                        } else if( fd == gate ) {  // Gate
                                io_gate(fd, event);
                        } else {    // Gate -> Host
                                io_guest(fd, event, writable_guest[fd]);
                        }
                }
                if( mpev.wait() < 0 ) { 
                        return -1;
                }
        }
        return 0;
}


mpio::eventは非常に簡単に使えるのでオススメです。



細かいところでは、ゲストごとに持つデータ(転送待ちバッファとか)を配列で保持しており、ファイルディスクリプタを添え字にしてO(1)で取り出せるようにしています。が、ファイルディスクリプタはint型だからといって、配列の大きさをINT_MAXにするわけにはいかないので(ポインタの配列にするだけで8GBになる!)、配列は可変長になるようにしています。
そこでC++なら可変長配列にはstd::vectorを使うところですが、std::vectorは配列の大きさが変わるときに要素のコピーが発生します。コピーされたくない場合はオブジェクトのポインタを入れるようにするわけですが*1、小さなオブジェクトをいちいちポインタでアクセスするのはどうも精神衛生上よろしくない。std::mapにしてもいいですが、オブジェクトにアクセスするためにいちいち二分探索するのも精神衛生上よろしくない。


そんな理由で、CHUNK = [格納するオブジェクトのサイズ × CHUNK_SIZE], ARRAY = [ CHUNK, CHUNK, CHUNK, ... ]という、一種の固定長メモリプールのような感じで管理しています。配列を拡張するときは、配列自体を拡張するのではなく、新しいCHUNKを追加します。これなら配列を拡張しても(CHUNKを追加しても)既に追加されているオブジェクトのアドレスは変わらないので、コピーコンストラクタを走らせなくて済みます。
オブジェクトへのアクセスはARRAY[index / CHUNK_SIZE][index % CHUNK_SIZE]と除算と剰余でアクセスできるので、O(1)で安心。


このコンテナは、先頭から順にオブジェクトを詰めて行かなくてもいい言うことで、sparse_arrayというクラス名になっています。

template <typename T>
class sparse_array {
public:
        typedef size_t size_type;
        sparse_array();
        ~sparse_array();
        inline void set(size_type index);    // オブジェクトのセット(Tのデフォルトコンストラクタでセット)
MP_ARGS_BEGIN
        template <MP_ARGS_TEMPLATE>  // オブジェクトのセット(TのコンストラクタにMP_ARGS_PARAMSを渡してセット)
        inline void set(size_type index, MP_ARGS_PARAMS);
MP_ARGS_END
        inline void reset(size_type index);    // オブジェクトの削除
        inline T& data(size_type index);    // オブジェクトの参照
        inline const T& data(size_type index) const;    // オブジェクトのconst参照
        inline bool test(size_type index) const;    // オブジェクトが格納されているか否か検査
        inline size_type capacity(void) const;
private:
        // ...
};


MP_ARGS_BEGINやMP_ARGS_PARAMSといった怪しげな記述がありますが、これは↓こう展開されます。

        template <typename A1>
        inline void set(size_type index, A1 a1);

        template <typename A1, typename A2>
        inline void set(size_type index, A1 a1, A2 a2);

        template <typename A1, typename A2, typename A3>
        inline void set(size_type index, A1 a1, A2 a2, A3 a3);

        // ...

sparse_array::set()は可変長個の引数をとり、それがTのコンストラクタに渡されます。オブジェクトを追加するときにもコピーコンストラクタは走らせません。


MP_ARGS_*プリプロセッサは、正統C++ならマクロでがんばるところなのでしょうが、まぁそう肩肘張らなくてもいいかなと思い、rubyで処理しています。

NEED_PREPROCESS = event_class.h event_impl.h dispatch.h ios.h io.h mempool.h sparse_array.h
all: $(NEED_PREPROCESS)

%.h: %.pre.h
	ruby -e '\
		def args(n, &block) ;\
			Array.new(n) {|i| yield i+1 } .join(", ") ;\
		end ;\
		code = ARGF.read ;\
		code.gsub!(/^MP_ARGS_BEGIN$$(.*?)^MP_ARGS_END$$/m) {|s| ;\
			re = [] ;\
			1.upto(15) {|n| \
				m = s.split("\n")[1..-2].join("\n") ;\
				m.gsub! /MP_ARGS_TEMPLATE/,   args(n) {|i| "typename A#{i}" } ;\
				m.gsub! /MP_ARGS_PARAMS/, args(n) {|i| "A#{i} a#{i}" } ;\
				m.gsub! /MP_ARGS_FUNC/,       args(n) {|i| "a#{i}" } ;\
				re << m ;\
			} ;\
			re.join("\n") ;\
		} ;\
		puts code' \
	$< > $@

また今度書くネタ:

  • Serverの実装とか
  • GateからMultiplexerへのファイルディスクリプタの転送
  • Hostの実装とか。ttyを全部キャプチャする方法
  • Flexでターミナルエミュレータを実装した件
  • FlexでTelnetクライアントを実装した件
  • 録画データのデータ構造とか
  • FlexでCapttyプレイヤーを実装した件
  • Ramazeで書いたWebインターフェイス部分いろいろ
  • はてブを一覧表示するJavaScript

*1:boost::ptr_vectorとか