irbから学ぶRubyの並列処理 ~ forkからWebSocketまで

ブログを下記に移転しました。デザイン変更により移転先では記事が一層読みやすくなっていますので、よろしければ移動をお願い致します。

irbから学ぶRubyの並列処理 ~ forkからWebSocketまで : melborne.github.com

                                                                                      • -

世の中は並列化花ざかりだよ
人間はシングルタスクのままなのに
プログラミングするときは
マルチタスクが要求されるなんて
世知辛い世の中になったものだね


でも情報革命は始まったばかりだから
愚痴ってばかりもいられないよ
自分がその波にうまく乗れないとしても
うまく乗ってる人の様を
間近で見てみたいと思うんだ



そんなわけで..


Rubyのfork Thread Reactor EventMachine
WebSocketなどの並列化について少し学んだので
自分の理解をここにまとめておくよ

REPL

irbRubyにおける対話型の実行環境だよ
これは一般にはREPLと呼ばれてるんだ
REPLはユーザの入力を
読み取り(Read)
評価し(Eval)
出力する(Print) 処理を
繰り返すよ(Loop)


irbのコードは5000行にものぼるらしいけど
その核心は次のように1行で書けるよ

 loop{ puts eval gets }

getsでユーザ入力を読み取り
evalで評価し
putsで出力する処理を
loopで繰り返す
これじゃGEPLだけどね:)


このコードを保存して(gepl.rb)
実行してみよう

$ ruby gepl.rb
%w(ruby lisp haskell).map(&:upcase)
RUBY
LISP
HASKELL       

"hello, repl!".gsub('r','g')
hello, gepl!  

ちゃんと動いてるね
Ctrl+Cで終了するよ


通常loopは無限ループを生成するけど
先のコードではgetsのところで処理が止まり
ユーザからの入力を待ち受ける
ここがポイントだよ


ちなみにこのコードは
その入出力を明示的にして
次のようにも書けるね

loop do
  input = $stdin.gets
  output = eval(input)
  $stdout.puts output
end

デフォルトでグローバル変数$stdinと$stdoutには
標準入力 標準出力がセットされてるから
キーボードからの入力が読み取られ
ディスプレイに出力がなされるんだ

マルチユーザーREPL

REPLは1ユーザに対する対話環境だよ
でも複数ユーザで使えたらもっとうれしいよね
どうすればいい?


そうだよ
入出力と評価(eval)を切り離せばいいんだよ
いわゆるクライアント・サーバー方式だね
クライアントからの入力をサーバーに渡して評価し
結果をクライアントに出力する


じゃあ早速REPLサーバーを書いてみるよ

#repl_server.rb
require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept   # clientからの接続を待つ
  
  begin
    loop { client.puts eval client.gets }
  rescue
  ensure
    client.close
  end
end

Rubyならこんなに簡単に書けちゃうんだ
TCPServer.newでサーバーインスタンスを生成し
acceptメソッドでクライアントからの接続を待ち受けるよ
クライアントが接続したら
getsでユーザからの入力を評価し結果をユーザに返す
接続したクライアント(これをソケットと呼ぶよ)からgetsし
ソケットにputsしてるところがポイントだよ


$stdin $stdoutの参照先を
クライアントのソケットに切り替えるやり方にすると
最初のコードとの違いがはっきりするかもね

#repl_server.rb
require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  begin
    $stdin, $stdout = client, client
    loop { puts eval gets }
  rescue
  ensure
    client.close
    $stdin = STDIN
    $stdout = STDOUT
  end
end

ensure節ではこれらの後処理をしているよ


サーバーを立ち上げて
クライアントから接続してみようよ
telnetを使うね

$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
3.times { puts "Hello, friend" }
Hello, friend
Hello, friend
Hello, friend
3

いい感じだね
Ctrl+]に続きquitでtelnetの接続を切るよ

並列化REPL

でも先のサーバーには大きな問題があるよ
誰か一人が接続していると
他の人が接続できないつまり
複数の人が同時に使えないんだよ
先のコードで1つの接続がacceptされると
loop内のgetsは
その接続先ユーザからの入力を待ち続けることになる
でそのユーザの接続が切れてはじめて
処理はループされacceptで
別の接続を待ち受けられるようになるんだ
これは大問題だね
複数のターミナルから接続して試してみればわかるよ


さあここで並列化の出番だよ


Rubyで並列化を実現するにはいくつかの方法があるよ
ちょっとどんなやり方があるか考えてみてくれる?

個別接続による並列化

もっとも単純な方法は処理が終わるたびに
ユーザからの接続を毎回切る方法だよ
前のユーザの接続が切れれば
サーバーは別の接続を待てるからね
まあ
これを並列化と呼ぶのはどうかとも思うけど..


コードは次のような感じになるかな

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  client.puts eval client.gets
  client.close
end

evalしたものをクライアントに返したら
そのソケットを閉じる
これによってそのクライアントの接続は切れるから
別のクライアントからの接続を待ち受けられるようになる


構成がシンプルでいいんだけど
ユーザにとってはちょっと面倒だよね
使うたびに接続し直さなきゃならないからね
なんかWebサーバーみたいだよね..

forkによる並列化

2つ目は複数のプロセスを起動する方法だよ
Rubyでプロセスを並列化するにはKernel#forkを使うよ


forkのブロックで囲まれたコードは別プロセスで起動されるから
loop{}のところをforkのブロックに投げればよさそうだね
やってみるよ

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  fork do    # 別プロセスで起動
    begin
      loop { client.puts eval client.gets }
    rescue
    ensure
      client.close
    end
  end
end

acceptでクライアントが接続すると
forkで別プロセスが起動されて
その中でgetsの待ち受けがされるけど
メインプロセスは外側のループで先頭に戻り
これでacceptで別のクライアントの接続を待てるね


じゃあ複数のtelnetから接続して試してみよう

$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1 + 2
3
---------------------------------------
$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
[2011,10,1].join '-'
2011-10-1

うまくいったね


念のためこの状況で
複数のプロセスが立ってるか確認してみるよ

$ ps aux | grep repl_server
keyes     1335   0.3  0.1  2448356   1176 s001  R+    3:01PM   0:00.90 ruby repl_server.rb
keyes     1303   0.3  0.0  2448356   1040 s001  S+    2:59PM   0:01.30 ruby repl_server.rb
keyes     1301   0.3  0.2  2448356   3756 s001  R+    2:59PM   0:01.34 ruby repl_server.rb

3つのプロセスが立ってるのがわかるね


ただプロセスは個々に独立したメモリ空間を専有するから
接続ユーザ数が多くなるとちょっと心配だよね
またチャットサーバーのように
ユーザ間での情報のやり取りが必要な場合
プロセス間で通信させなきゃならないから
そんなときはちょっと厄介そうだよね

Threadによる並列化

3つ目はスレッドを使う方法だよ
スレッドは1つのプロセス内で処理を並走させる仕組みだよ
並走する処理は同じプロセス内にあるから
その間でのデータ共有が容易という利点があるんだ


じゃあThreadクラスを使ったサーバーを書いてみるよ

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  
  Thread.new(client) do |cl|
    begin
      loop { cl.puts eval cl.gets }
    rescue
    ensure
      cl.close
    end
  end
end

forkをThread.newに変えればいいだけだから簡単だね
ただスレッドは同じプロセス内で並走するから
acceptしたclientをブロック引数を通して
ちゃんと渡さないと問題が生じるよ


同じように複数のtelnetから接続してみるよ

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
[*1..10].select{|i| i.even? }
2
4
6
8
10
-------------------------------
Connected to localhost.
Escape character is '^]'.
Array.ancestors
Array
Enumerable
Object
Kernel
BasicObject
-------------------------------
$ ps aux |grep repl_server
keyes     1712   0.3  0.2  2451964   3808 s001  S+    7:03PM   0:00.24 ruby repl_server.rb

プロセスは1つのままだってこと確認できるよね


前でスレッドがマルチプロセスよりも
データ共有が容易って書いたから
そのサンプルも書いてみるよ
サーバーからの出力を
接続しているすべてのクライアントに出力する例だよ

require "socket"

server = TCPServer.new(60000)
clients = []   # 接続クライアントの管理用配列
loop do
  client = server.accept
  clients << client  # 接続クライアントを登録
  
  Thread.new(client) do |cl|
    begin
      loop do
        output = eval cl.gets
        clients.each { |c| c.puts output }  # 結果を全クライアントに配信
      end
    rescue
    ensure
      cl.close
      clients.delete(cl) # 切断したクライアントを管理対象外に
    end
  end
end

接続クライアント管理用の配列を用意して
結果を全員にブロードキャストすればいいね
簡単だね


ただスレッドモデルはスレッド間で共有するデータを
書き換えるような場合の取り扱いがちょっと厄介だよ
それとやっぱり各スレッドごとに
いつ来るかわからないデータを待っている
というのが無駄といえば無駄だよね


Reactorパターンによる並列化

4つ目はReactorパターンを使って並列化する方法だよ
Reactorパターンというのは簡単に言うと
一箇所でいろいろなイベントを待ち受けて
イベントが来たらこれに反応(リアクト)して
その種類に応じた処理を実行するモデルのことだよ
RubyでReactorパターンを実現するには
IO.selectメソッド(またはKernel#select)を使うよ


早速Reactor版REPLサーバーを書いてみるよ
REPLサーバーにおけるイベントには
サーバーに対するものつまりクライアントの接続と
クライアントに対するものつまりソケットへのデータ入力があるよ
これらをsocketsという配列で管理しよう

require "socket"

server = TCPServer.new(60000)
sockets = [server]
loop do
  r_sockets = IO.select(sockets)[0] # すべてのイベントを待ち受ける
  r_sockets.each do |socket|
    case socket
    when TCPServer     # サーバーに対するクライアントの接続があったとき
      client = socket.accept
      sockets << client
    when TCPSocket     # クライアントに対するデータ入力があったとき
      unless socket.eof?
        socket.puts eval socket.gets
      else
        socket.close
        sockets.delete(socket)
      end
    end
  end
end

IO.selectは登録したソケットに対する
入力/出力/例外のイベントを待ち受け
そのイベントが発生したソケットを返すけど
返り値はイベント別のソケットの配列になっているよ
ここでは入力イベントだけに関心があるから
配列の第一要素のみ取り出してるよ


そしてcase式でイベントのあった
ソケットの種類に応じて処理を切り分けてるよ
つまりサーバーがクライアントからの接続を受けたときは
TCPServerの節に入って
sockets配列に接続のあったクライアントが登録され
ループでselectに戻って次のイベントを待つよ
最初のクライアントの接続時にはsockets配列には
serverしか登録されていないから
処理は必ずここに来ることになるよ


一方クライアントにデータの入力があったときは
TCPSocketの節に入って入力データの処理をするよ
入力データがあるときはそれを評価し結果を返し
無いときはソケットを閉じてその接続を解放するよ
そしてループでまたselectに戻って次のイベントを待つよ


Reactorパターンでは
すべてのクライアントの接続は維持されたままなのに
処理が並走しないつまり単一プロセス単一スレッドで
複数クライアントからの要求に応じることができる
という点がユニークだよ
このモデルなら処理が並走することはないので
共有データを書き換えるようなことも簡単にできるよね

EventMachineによる並列化

ただwhen式でのソケットの切り分け処理が
面倒といえば面倒だよね
でも安心していいよ
EventMachineというライブラリを使えば
これが驚異的に簡単にできちゃうんだよ

require "eventmachine"

EM.run do
  EM.start_server('localhost', 60000) do |c|
    def c.receive_data(data)
      send_data eval(data)
    end
  end
end

EventMachineはRreactorパターンによる
イベント駆動型のI/Oインタフェースを提供するライブラリだよ
JavaScriptのNode.jsみたいなものなんだろうね


もうコードを見れば分かると思うけど
EM.runでイベントループが開始されて
クライアントからのデータ入力があると
receive_dataメソッドが呼び出されるので
ここでsend_dataを呼んでevalした入力を返せばいいんだ


EventMachineを使えば
チャットサーバーだって簡単に書けちゃうんだ

require "eventmachine"

module Chat
  @@channel = EM::Channel.new
  def post_init
    puts "-- someone connected"
    @sid = @@channel.subscribe { |data| send_data ">> #{data}" }
  end

  def receive_data(data)
    @@channel.push data
  end

  def unbind
    puts "-- someone disconnected from the server"
    @@channel.unsubscribe(@sid)
  end
end

EM.run do
  EM.start_server('localhost', 60000, Chat)
end

EMサーバーはクライアントの接続があるたびに
その引数でセットしたChatモジュールをインスタンス化して
その監視対象として登録するよ*1
インスタンスのpost_initメソッドはその接続時に
unbindメソッドはその切断時に呼び出され
receive_dataは先の例と同様にデータ受信時に呼び出されるよ


データをブロードキャストするにはEM::Channelを使うよ
subscribeでそのクライアントに対する処理を登録して
pushで呼び出せばいいんだ


gem install eventmachineして
telnetから試してみてね

WebSocket

ここまでくるとWebサーバー上でも
この並列化技術を使いたいと考えるのが人情だよね
そう
これこそがWebSocketなんだよ


そしてうれしいことにEventMachineには
そのためのプラグインem-websocketがあるんだ
gem install em-websocketして使うよ
サーバー側のコードは次のような感じだよ

require 'em-websocket'

EM.run {
  @channel = EM::Channel.new

  EM::WebSocket.start(:host => "localhost", :port => 60000) do |ws|
    sid = nil
    ws.onopen { sid = @channel.subscribe { |msg| ws.send msg } }
    ws.onmessage { |msg| @channel.push "#{sid}: #{msg}" }
    ws.onclose { @channel.unsubscribe(sid) }
  end
}

EM::WebSocket.startでサーバーインスタンスを立ち上げて
クライアントからの接続を待ち受けるよ
クライアントからの接続があるとonopenが呼ばれるから
ここでchannelに
メッセージをブロードキャストする処理を登録するよ
クライアントがテキストを送信するとonmessageが呼ばれるから
それをchannelにpushして登録した処理を呼ぶよ


次にクライアントサイドのコードだよ

<html>
  <head>
    <script src='http://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js'></script>
    <script>
      $(document).ready(function(){
        function debug (str) { $("#debug").append("<p>"+str+"</p>") };
        
        ws = new WebSocket("ws://localhost:60000/");
        ws.onopen = function() { debug("Welcome to Chattata!") };
        ws.onmessage = function(evt) { $("#msglist").append("<p>"+evt.data+"</p>") };
        ws.onclose = function() { debug("socket closed") };

        $("form").submit(function(){
          var msg = $("input#msg");
          ws.send(msg.val());
          msg.val('');
          return false;
        });
      });
    </script>
  </head>
  <body>
    <div id="debug"></div>
    <form>
      <input id="msg" type="text"></input>
    </form>
    <div id="msglist"></div>
  </body>
</html>

クライアント側では
サーバーに接続するソケットをインスタンス化するよ
ユーザがテキストを送信すると$("form").submitが呼ばれ
その内容はws.sendでソケットに送り出されるよ
これによってサーバー側のchannelに登録された処理が呼ばれ
接続されているクライアントに
テキストがブロードキャストされるよ
クライアント側ではこれをonmessageで受けて
テキストをwindow上に表示するよ


じゃあ試してみるよ


良い感じだね!
WebSocketはHTML5の新しい規格だから
対応ブラウザか確認してね


ちょっと長い投稿になっちゃったけど
最後まで付き合ってくれてありがとう

*1:モジュールをインスタンス化って変だよね..