irbから学ぶRubyの並列処理 ~ forkからWebSocketまで
ブログを下記に移転しました。デザイン変更により移転先では記事が一層読みやすくなっていますので、よろしければ移動をお願い致します。
irbから学ぶRubyの並列処理 ~ forkからWebSocketまで : melborne.github.com
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
世の中は並列化花ざかりだよ
人間はシングルタスクのままなのに
プログラミングするときは
マルチタスクが要求されるなんて
世知辛い世の中になったものだね
でも情報革命は始まったばかりだから
愚痴ってばかりもいられないよ
自分がその波にうまく乗れないとしても
うまく乗ってる人の様を
間近で見てみたいと思うんだ
そんなわけで..
Rubyのfork Thread Reactor EventMachine
WebSocketなどの並列化について少し学んだので
自分の理解をここにまとめておくよ
REPL
irbはRubyにおける対話型の実行環境だよ
これは一般にはREPLと呼ばれてるんだ
REPLはユーザの入力を
読み取り(Read)
評価し(Eval)
出力する(Print) 処理を
繰り返すよ(Loop)
irbのコードは5000行にものぼるらしいけど
その核心は次のように1行で書けるよ
loop{ puts eval gets }
getsでユーザ入力を読み取り
evalで評価し
putsで出力する処理を
loopで繰り返す
これじゃGEPLだけどね:)
このコードを保存して(gepl.rb)
実行してみよう
$ ruby gepl.rb %w(ruby lisp haskell).map(&:upcase) RUBY LISP HASKELL "hello, repl!".gsub('r','g') hello, gepl!
ちゃんと動いてるね
Ctrl+Cで終了するよ
通常loopは無限ループを生成するけど
先のコードではgetsのところで処理が止まり
ユーザからの入力を待ち受ける
ここがポイントだよ
ちなみにこのコードは
その入出力を明示的にして
次のようにも書けるね
loop do input = $stdin.gets output = eval(input) $stdout.puts output end
デフォルトでグローバル変数$stdinと$stdoutには
標準入力 標準出力がセットされてるから
キーボードからの入力が読み取られ
ディスプレイに出力がなされるんだ
マルチユーザーREPL
REPLは1ユーザに対する対話環境だよ
でも複数ユーザで使えたらもっとうれしいよね
どうすればいい?
そうだよ
入出力と評価(eval)を切り離せばいいんだよ
いわゆるクライアント・サーバー方式だね
クライアントからの入力をサーバーに渡して評価し
結果をクライアントに出力する
じゃあ早速REPLサーバーを書いてみるよ
#repl_server.rb require "socket" server = TCPServer.new(60000) loop do client = server.accept # clientからの接続を待つ begin loop { client.puts eval client.gets } rescue ensure client.close end end
Rubyならこんなに簡単に書けちゃうんだ
TCPServer.newでサーバーインスタンスを生成し
acceptメソッドでクライアントからの接続を待ち受けるよ
クライアントが接続したら
getsでユーザからの入力を評価し結果をユーザに返す
接続したクライアント(これをソケットと呼ぶよ)からgetsし
ソケットにputsしてるところがポイントだよ
$stdin $stdoutの参照先を
クライアントのソケットに切り替えるやり方にすると
最初のコードとの違いがはっきりするかもね
#repl_server.rb require "socket" server = TCPServer.new(60000) loop do client = server.accept begin $stdin, $stdout = client, client loop { puts eval gets } rescue ensure client.close $stdin = STDIN $stdout = STDOUT end end
ensure節ではこれらの後処理をしているよ
サーバーを立ち上げて
クライアントから接続してみようよ
telnetを使うね
$ telnet localhost 60000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 3.times { puts "Hello, friend" } Hello, friend Hello, friend Hello, friend 3
いい感じだね
Ctrl+]に続きquitでtelnetの接続を切るよ
並列化REPL
でも先のサーバーには大きな問題があるよ
誰か一人が接続していると
他の人が接続できないつまり
複数の人が同時に使えないんだよ
先のコードで1つの接続がacceptされると
loop内のgetsは
その接続先ユーザからの入力を待ち続けることになる
でそのユーザの接続が切れてはじめて
処理はループされacceptで
別の接続を待ち受けられるようになるんだ
これは大問題だね
複数のターミナルから接続して試してみればわかるよ
さあここで並列化の出番だよ
Rubyで並列化を実現するにはいくつかの方法があるよ
ちょっとどんなやり方があるか考えてみてくれる?
個別接続による並列化
もっとも単純な方法は処理が終わるたびに
ユーザからの接続を毎回切る方法だよ
前のユーザの接続が切れれば
サーバーは別の接続を待てるからね
まあ
これを並列化と呼ぶのはどうかとも思うけど..
コードは次のような感じになるかな
require "socket" server = TCPServer.new(60000) loop do client = server.accept client.puts eval client.gets client.close end
evalしたものをクライアントに返したら
そのソケットを閉じる
これによってそのクライアントの接続は切れるから
別のクライアントからの接続を待ち受けられるようになる
構成がシンプルでいいんだけど
ユーザにとってはちょっと面倒だよね
使うたびに接続し直さなきゃならないからね
なんかWebサーバーみたいだよね..
forkによる並列化
2つ目は複数のプロセスを起動する方法だよ
Rubyでプロセスを並列化するにはKernel#forkを使うよ
forkのブロックで囲まれたコードは別プロセスで起動されるから
loop{}のところをforkのブロックに投げればよさそうだね
やってみるよ
require "socket" server = TCPServer.new(60000) loop do client = server.accept fork do # 別プロセスで起動 begin loop { client.puts eval client.gets } rescue ensure client.close end end end
acceptでクライアントが接続すると
forkで別プロセスが起動されて
その中でgetsの待ち受けがされるけど
メインプロセスは外側のループで先頭に戻り
これでacceptで別のクライアントの接続を待てるね
じゃあ複数のtelnetから接続して試してみよう
$ telnet localhost 60000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 1 + 2 3 --------------------------------------- $ telnet localhost 60000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. [2011,10,1].join '-' 2011-10-1
うまくいったね
念のためこの状況で
複数のプロセスが立ってるか確認してみるよ
$ ps aux | grep repl_server keyes 1335 0.3 0.1 2448356 1176 s001 R+ 3:01PM 0:00.90 ruby repl_server.rb keyes 1303 0.3 0.0 2448356 1040 s001 S+ 2:59PM 0:01.30 ruby repl_server.rb keyes 1301 0.3 0.2 2448356 3756 s001 R+ 2:59PM 0:01.34 ruby repl_server.rb
3つのプロセスが立ってるのがわかるね
ただプロセスは個々に独立したメモリ空間を専有するから
接続ユーザ数が多くなるとちょっと心配だよね
またチャットサーバーのように
ユーザ間での情報のやり取りが必要な場合
プロセス間で通信させなきゃならないから
そんなときはちょっと厄介そうだよね
Threadによる並列化
3つ目はスレッドを使う方法だよ
スレッドは1つのプロセス内で処理を並走させる仕組みだよ
並走する処理は同じプロセス内にあるから
その間でのデータ共有が容易という利点があるんだ
じゃあThreadクラスを使ったサーバーを書いてみるよ
require "socket" server = TCPServer.new(60000) loop do client = server.accept Thread.new(client) do |cl| begin loop { cl.puts eval cl.gets } rescue ensure cl.close end end end
forkをThread.newに変えればいいだけだから簡単だね
ただスレッドは同じプロセス内で並走するから
acceptしたclientをブロック引数を通して
ちゃんと渡さないと問題が生じるよ
同じように複数のtelnetから接続してみるよ
Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. [*1..10].select{|i| i.even? } 2 4 6 8 10 ------------------------------- Connected to localhost. Escape character is '^]'. Array.ancestors Array Enumerable Object Kernel BasicObject ------------------------------- $ ps aux |grep repl_server keyes 1712 0.3 0.2 2451964 3808 s001 S+ 7:03PM 0:00.24 ruby repl_server.rb
プロセスは1つのままだってこと確認できるよね
前でスレッドがマルチプロセスよりも
データ共有が容易って書いたから
そのサンプルも書いてみるよ
サーバーからの出力を
接続しているすべてのクライアントに出力する例だよ
require "socket" server = TCPServer.new(60000) clients = [] # 接続クライアントの管理用配列 loop do client = server.accept clients << client # 接続クライアントを登録 Thread.new(client) do |cl| begin loop do output = eval cl.gets clients.each { |c| c.puts output } # 結果を全クライアントに配信 end rescue ensure cl.close clients.delete(cl) # 切断したクライアントを管理対象外に end end end
接続クライアント管理用の配列を用意して
結果を全員にブロードキャストすればいいね
簡単だね
ただスレッドモデルはスレッド間で共有するデータを
書き換えるような場合の取り扱いがちょっと厄介だよ
それとやっぱり各スレッドごとに
いつ来るかわからないデータを待っている
というのが無駄といえば無駄だよね
Reactorパターンによる並列化
4つ目はReactorパターンを使って並列化する方法だよ
Reactorパターンというのは簡単に言うと
一箇所でいろいろなイベントを待ち受けて
イベントが来たらこれに反応(リアクト)して
その種類に応じた処理を実行するモデルのことだよ
RubyでReactorパターンを実現するには
IO.selectメソッド(またはKernel#select)を使うよ
早速Reactor版REPLサーバーを書いてみるよ
REPLサーバーにおけるイベントには
サーバーに対するものつまりクライアントの接続と
クライアントに対するものつまりソケットへのデータ入力があるよ
これらをsocketsという配列で管理しよう
require "socket" server = TCPServer.new(60000) sockets = [server] loop do r_sockets = IO.select(sockets)[0] # すべてのイベントを待ち受ける r_sockets.each do |socket| case socket when TCPServer # サーバーに対するクライアントの接続があったとき client = socket.accept sockets << client when TCPSocket # クライアントに対するデータ入力があったとき unless socket.eof? socket.puts eval socket.gets else socket.close sockets.delete(socket) end end end end
IO.selectは登録したソケットに対する
入力/出力/例外のイベントを待ち受け
そのイベントが発生したソケットを返すけど
返り値はイベント別のソケットの配列になっているよ
ここでは入力イベントだけに関心があるから
配列の第一要素のみ取り出してるよ
そしてcase式でイベントのあった
ソケットの種類に応じて処理を切り分けてるよ
つまりサーバーがクライアントからの接続を受けたときは
TCPServerの節に入って
sockets配列に接続のあったクライアントが登録され
ループでselectに戻って次のイベントを待つよ
最初のクライアントの接続時にはsockets配列には
serverしか登録されていないから
処理は必ずここに来ることになるよ
一方クライアントにデータの入力があったときは
TCPSocketの節に入って入力データの処理をするよ
入力データがあるときはそれを評価し結果を返し
無いときはソケットを閉じてその接続を解放するよ
そしてループでまたselectに戻って次のイベントを待つよ
Reactorパターンでは
すべてのクライアントの接続は維持されたままなのに
処理が並走しないつまり単一プロセス単一スレッドで
複数クライアントからの要求に応じることができる
という点がユニークだよ
このモデルなら処理が並走することはないので
共有データを書き換えるようなことも簡単にできるよね
EventMachineによる並列化
ただwhen式でのソケットの切り分け処理が
面倒といえば面倒だよね
でも安心していいよ
EventMachineというライブラリを使えば
これが驚異的に簡単にできちゃうんだよ
require "eventmachine" EM.run do EM.start_server('localhost', 60000) do |c| def c.receive_data(data) send_data eval(data) end end end
EventMachineはRreactorパターンによる
イベント駆動型のI/Oインタフェースを提供するライブラリだよ
JavaScriptのNode.jsみたいなものなんだろうね
もうコードを見れば分かると思うけど
EM.runでイベントループが開始されて
クライアントからのデータ入力があると
receive_dataメソッドが呼び出されるので
ここでsend_dataを呼んでevalした入力を返せばいいんだ
EventMachineを使えば
チャットサーバーだって簡単に書けちゃうんだ
require "eventmachine" module Chat @@channel = EM::Channel.new def post_init puts "-- someone connected" @sid = @@channel.subscribe { |data| send_data ">> #{data}" } end def receive_data(data) @@channel.push data end def unbind puts "-- someone disconnected from the server" @@channel.unsubscribe(@sid) end end EM.run do EM.start_server('localhost', 60000, Chat) end
EMサーバーはクライアントの接続があるたびに
その引数でセットしたChatモジュールをインスタンス化して
その監視対象として登録するよ*1
各インスタンスのpost_initメソッドはその接続時に
unbindメソッドはその切断時に呼び出され
receive_dataは先の例と同様にデータ受信時に呼び出されるよ
データをブロードキャストするにはEM::Channelを使うよ
subscribeでそのクライアントに対する処理を登録して
pushで呼び出せばいいんだ
gem install eventmachineして
telnetから試してみてね
WebSocket
ここまでくるとWebサーバー上でも
この並列化技術を使いたいと考えるのが人情だよね
そう
これこそがWebSocketなんだよ
そしてうれしいことにEventMachineには
そのためのプラグインem-websocketがあるんだ
gem install em-websocketして使うよ
サーバー側のコードは次のような感じだよ
require 'em-websocket' EM.run { @channel = EM::Channel.new EM::WebSocket.start(:host => "localhost", :port => 60000) do |ws| sid = nil ws.onopen { sid = @channel.subscribe { |msg| ws.send msg } } ws.onmessage { |msg| @channel.push "#{sid}: #{msg}" } ws.onclose { @channel.unsubscribe(sid) } end }
EM::WebSocket.startでサーバーインスタンスを立ち上げて
クライアントからの接続を待ち受けるよ
クライアントからの接続があるとonopenが呼ばれるから
ここでchannelに
メッセージをブロードキャストする処理を登録するよ
クライアントがテキストを送信するとonmessageが呼ばれるから
それをchannelにpushして登録した処理を呼ぶよ
次にクライアントサイドのコードだよ
<html> <head> <script src='http://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js'></script> <script> $(document).ready(function(){ function debug (str) { $("#debug").append("<p>"+str+"</p>") }; ws = new WebSocket("ws://localhost:60000/"); ws.onopen = function() { debug("Welcome to Chattata!") }; ws.onmessage = function(evt) { $("#msglist").append("<p>"+evt.data+"</p>") }; ws.onclose = function() { debug("socket closed") }; $("form").submit(function(){ var msg = $("input#msg"); ws.send(msg.val()); msg.val(''); return false; }); }); </script> </head> <body> <div id="debug"></div> <form> <input id="msg" type="text"></input> </form> <div id="msglist"></div> </body> </html>
クライアント側では
サーバーに接続するソケットをインスタンス化するよ
ユーザがテキストを送信すると$("form").submitが呼ばれ
その内容はws.sendでソケットに送り出されるよ
これによってサーバー側のchannelに登録された処理が呼ばれ
接続されているクライアントに
テキストがブロードキャストされるよ
クライアント側ではこれをonmessageで受けて
テキストをwindow上に表示するよ
じゃあ試してみるよ
良い感じだね!
WebSocketはHTML5の新しい規格だから
対応ブラウザか確認してね
ちょっと長い投稿になっちゃったけど
最後まで付き合ってくれてありがとう