memstoredの実装詳細 1000行に収まる高速サーバーについて

1000行はテキトーなハッタリだが良くあること ;-)
memstored の本体は memstored.cc ファイル1つに収まっており、その行数は600行程度しかない。memstored のポイントは、少ない行数で高速なサーバーをカンタンに書けたことであった。memstored の実装について詳しく紹介してみる。

memstoredのソースコード
http://svn.coderepos.org/share/lang/c/memstored/trunk/memstored.cc


ソースコードを読むと、処理のほとんどはmp::iothreadsに任せているが、

  • プロトコルのストリームパーサ
  • 読み込み用のバッファリング
  • メインロジック

の3つは、mp::iothreads とはほぼ完全に分離していることが分かる。逆に言えば、この3つを実装しさえすれば、高速なサーバーを記述できることになる。*1


ちなみにプロトコルに MessagePack を使うと、MessagePack の C++ API にはストリームパーサとバッファリング機構が含まれているので、

  • メインロジック

だけを書けば良くなる!


さて、memstoredのmain関数周辺は、以下のようになっている。

static std::auto_ptr<memstored::Server> g_server;

int main(int argc, char* argv[])
{
    memstored::PARAM.parse(argc, (const char**)argv);
    g_server.reset(new memstored::Server());
    g_server->run();
    g_server->join();
    return 0;
}
  1. 引数の解釈
  2. サーバーを初期化
  3. サーバーを開始
  4. サーバーが終了するまで待つ

という処理を行っている。典型的。


引数解釈の部分はどうでもいいので置いておき、2.のサーバーの初期化処理を見てみる:

Server::Server() :
    m_db(PARAM.dbname.c_str()),
    m_sock(PARAM.listen_address)
{
    mp::iothreads::manager::initialize();
    mp::iothreads::reader::initialize(PARAM.read_threads);
    mp::iothreads::writer::initialize(PARAM.write_threads);
    mp::iothreads::listener::initialize();
    //mp::iothreads::connector::initialize(PARAM.connect_threads);

    // signal handling
    // Binary Hacks #52
    sigset_t ss;
    sigemptyset(&ss);
    sigaddset(&ss, SIGHUP);
    sigaddset(&ss, SIGINT);
    sigaddset(&ss, SIGTERM);
    m_signal_thread.reset(new mp::pthread_signal(ss, &Server::signal_end));

    // ignore SIGPIPE
    if( signal(SIGPIPE, SIG_IGN) == SIG_ERR ) {
        throw std::runtime_error(strerror(errno));
    }

    mp::iothreads::listen(m_sock.get(), &Server::accepted, (void*)m_db.get());
}

最初にmp::iothreads::*::initialize()という関数呼び出しが並んでいるが、ここでmp::iothreadsの初期化を行っている。mp::iothreadsはイベント駆動型のサーバーを記述するためのフレームワークで、memstoredの簡潔さはmp::iothreadsに依るところが大きい。

中盤ではシグナルハンドラをセットしている。ここでは Binary HacksHack #52 sigwaitで非同期シグナルを同期的に処理する に載っていたテクニックを使っている。普通シグナルハンドラでは非同期シグナルセーフな関数の呼び出しか volatile sig_atomic_t な変数の操作しかできないが*2、シグナルハンドル専用のスレッドを立ち上げることでこの制限を回避している。詳しくは Binary Hacks か mp::pthread_signalのソースコード参照。


それより重要なのは、最後の mp::iothreads::listen(m_sock.get(), &Server::accepted, (void*)m_db.get()); という一文になる。これを実行すると、m_sock.get()(←これはファイルディスクリプタを返す)で accept(2) し、新しいクライアントが接続してきたら、Server::accepted()関数が呼ばれるようになる。引数には m_db.get() が渡される。

accept(2) や connect(2) のようなブロックする可能性のある関数呼び出しは、基本的にすべてフレームワーク側(mp::iothreads)で行うことになる。ブロックする関数を呼び出しをせずにイベント駆動マシン止めないようにするのが、高速なサーバーを記述する上でのポイントになる。


その Server::accepted() 関数の実装はこの通り:

void Server::accepted(void* db, int fd)
{
    if(fd >= 0) {
        mp::set_nonblock(fd);
        mp::iothreads::add<Connection>(fd, (TCADB*)db);
    }
}

新しいクライアントが接続してくるとこの関数が呼ばれる。

mp::set_nonblock(fd) は、ファイルディスクリプタに O_NONBLOCK フラグをセットしている。

mp::iothreads::add(fd, (TCADB*)db); は、Connectionクラスをイベントハンドラとして、mp::iothreads にファイルディスクリプタを登録している。ここで:

という処理が登録される。

この挙動から分かるように、Connectionクラスのインスタンスの寿命とコネクションの寿命は一致する。つまりConnectionクラスのコンストラクタでコネクションごとに保持すべきリソース(バッファなど)を初期化したり、デストラクタで確実に後始末をするようにしたりすることができる。
これで close()しわすれイベントハンドラメモリリーク を確実に防ぐことができる。


イベントハンドラは複数のスレッドで並列に処理される。つまりConnection::read_event()はマルチスレッドで実行される。

この図の「Inter-thread Communication」が mp::iothreads::add() で行われている。


続いて Connection::read_event() 関数を見てみる。この関数はクライアントからデータが送られてくると呼ばれる。

void Connection::read_event()
try {
    if(m_free < PARAM.buffer_reserve_size) {
        // m_bufferを確保する ... 略 ...
    }

    ssize_t rl = ::read(fd(), m_buffer+m_used, m_free);
    if(rl < 0) {
        if(errno == EAGAIN || errno == EINTR) {
            return;
        } else {
            throw std::runtime_error("read error");
        }
    } else if(rl == 0) {
        throw std::runtime_error("connection closed");
    }

    // ... 略 ...

    while( (ret = memproto_parser_execute(&m_memproto, m_buffer, m_used, &m_off)) > 0) {
        if( (ret = memproto_dispatch(&m_memproto)) <= 0) {
            throw std::runtime_error("unknown command");
        }
    }

    // ... 略 ...
}

まずm_bufferに十分な容量のメモリが確保されているかどうかを確認している。続いてソケットからデータを読み込み、いつものエラー処理を行う。
例外を投げてもソケットはデストラクタで確実にcloseされるので、バンバン例外を投げて良い。


その後 memrpoto_parser_execute()関数と memproto_parser_dispatch()関数 を呼び出しているが、これはmemcachedプロトコルのストリームパーサで紹介したストリームパーサのバイナリプロトコル版である。
「データを次々に投げ込んでいくと内部の状態が遷移していき、ゴールの状態にたどり着くとパース完了、という状態遷移型のパーサ」で、イベント駆動型のサーバーを実装するために適している。


ここからの処理は少し端折るが、プロトコルのパースが成功し、パースされたリクエストパケットが例えばGETリクエストだったとすると、Connection::memproto_getx()関数が呼ばれるようになっている。
というわけで Connection::memprot_getx()関数を見てみる:

void Connection::memproto_getx(memproto_header* h, const char* key, uint16_t keylen)
{
    bool cmd_k = (h->opcode == MEMPROTO_CMD_GETK || h->opcode == MEMPROTO_CMD_GETKQ);
    bool cmd_q = (h->opcode == MEMPROTO_CMD_GETQ || h->opcode == MEMPROTO_CMD_GETKQ);
    int vallen = 0;
    void* val = tcadbget(m_db, key, keylen, &vallen);
    if(val) {
        uint32_t flags = htonl(0);
        send_response(h, MEMPROTO_RES_NO_ERROR,
                key, (cmd_k ? keylen : 0),
                val, vallen,
                (char*)&flags, sizeof(flags),
                0);
    } else if(!cmd_q) {
        send_response_nodata(h, MEMPROTO_RES_KEY_NOT_FOUND, 0);
    }
}

tcadbget()でTokyo Cabinetのデータベースから値を取り出している。もし値が存在すればsend_response(...)を呼び出し、存在しなければ send_response_nodata(...); を呼び出している。

後者はさて置き、前者のsend_response()関数はこのようになっている:

void Connection::send_response(memproto_header* h,
        uint8_t status,
        const char* key, uint16_t keylen,
              void* val, uint32_t vallen,
        const char* extra, uint16_t extralen,
        uint64_t cas)
{
    struct iovec vb[4];
    mp::iothreads::reqvec vr[4];
    unsigned int vi = 0;

    char header[24];
    pack_header(header, status, h->opcode,
            keylen, vallen, extralen,
            h->opaque, cas);

    vb[0].iov_base = header;
    vb[0].iov_len  = sizeof(header);
    vr[0] = mp::iothreads::reqvec();
    ++vi;

    if(extralen > 0) {
        vb[vi].iov_base = const_cast<char*>(extra);
        vb[vi].iov_len  = extralen;
        vr[vi] = mp::iothreads::reqvec();
        ++vi;
    }

    if(keylen > 0) {
        vb[vi].iov_base = const_cast<char*>(key);
        vb[vi].iov_len  = keylen;
        vr[vi] = mp::iothreads::reqvec();
        ++vi;
    }

    if(vallen > 0) {
        vb[vi].iov_base = val;
        vb[vi].iov_len  = vallen;
        vr[vi] = mp::iothreads::reqvec(
                mp::iothreads::writer::finalize_free, val);
        ++vi;
    }

    mp::iothreads::send_datav(fd(), vb, vr, vi);
}

せっせと配列を初期化しているが、最終的に mp::iothreads::send_datav() 関数を呼び出している。この関数はまず writev(2) でデータを送ろうと試みたあと、すべて送信しきれなければ送信専用のスレッドにデータを受け渡す。
ここでもブロックしないのがポイントである。イベント駆動マシンを止めずにスレッドを走らせ続けることができる。

*1:ストリームパーサは [http://www.complang.org/ragel/:title=Ragel] を使うと効率よく書ける。HTTPのストリームパーサが必要なら [http://mongrel.rubyforge.org/:title=Mongrel] の中に Ragel で書かれたストリームパーサが含まれているので参考になる。

*2:http://d.hatena.ne.jp/yupo5656/20040712/p2