EventMachine: Scalable Non-Blocking I/o in Ruby
EventMachine: Scalable Non-Blocking I/o in Ruby
EventMachine: Scalable Non-Blocking I/o in Ruby
require 'socket'
include Socket::Constants
socket = Socket.new(AF_INET, SOCK_STREAM, 0)
sockaddr = Socket.pack_sockaddr_in(2200, 'localhost')
socket.connect(sockaddr)
socket.puts "Hello from script 2."
puts "The server said, '#{socket.readline.chomp}'"
socket.close
Simple TCP Server
TCPServer#accept require 'socket'
server = TCPServer.new(2202)
to accept
while client = server.accept
connections from msg = client.readline
new clients client.write "You said: #{msg}"
client.close
TCPSocket#read* end
while true
sockets = [server] + clients
readable, writable = IO.select(sockets)
readable.each do |sock|
begin
if sock == server
clients << server.accept_nonblock
else
client, buf = sock, buffers[sock] ||= ''
Non-Blocking I/O
client.close
buffers.delete(client)
clients.delete(client)
end
Alternative to end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
threads: simply don’t # socket would block, try again later
end
block end
end
require 'socket'
server = TCPServer.new(2202)
list of clients clients = []
buffers = {}
while true
sockets = [server] + clients
readable, writable = IO.select(sockets)
readable.each do |sock|
begin
if sock == server
clients << server.accept_nonblock
else
client, buf = sock, buffers[sock] ||= ''
Non-Blocking I/O
client.close
buffers.delete(client)
clients.delete(client)
end
Alternative to end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
threads: simply don’t # socket would block, try again later
end
block end
end
require 'socket'
inbound server = TCPServer.new(2202)
list of clients clients = []
buffer per buffers = {}
readable.each do |sock|
begin
if sock == server
clients << server.accept_nonblock
else
client, buf = sock, buffers[sock] ||= ''
Non-Blocking I/O
client.close
buffers.delete(client)
clients.delete(client)
end
Alternative to end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
threads: simply don’t # socket would block, try again later
end
block end
end
require 'socket'
inbound server = TCPServer.new(2202)
list of clients clients = []
buffer per buffers = {}
Non-Blocking I/O
client.close
buffers.delete(client)
clients.delete(client)
end
Alternative to end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
threads: simply don’t # socket would block, try again later
end
block end
end
require 'socket'
inbound server = TCPServer.new(2202)
list of clients clients = []
buffer per buffers = {}
Non-Blocking I/O
client.close
buffers.delete(client)
clients.delete(client)
end
Alternative to end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
threads: simply don’t # socket would block, try again later
end
block end
end
EM does Non-Blocking I/O
handles low module EchoServer
level sockets def post_init
for you @buf = ''
end
inbound/ def receive_data(data)
@buf << data
outbound if @buf =~ /^.+?\r?\n/
buffers for send_data "You said: #{@buf}"
close_connection_after_writing
maximal end
throughput end
end
efficient i/o with
writev/readv require 'eventmachine'
EM.run do
EM.start_server '0.0.0.0', 2202, EchoServer
epoll & kqueue end
support
So, what’s a Reactor?
while reactor_running?
expired_timers.each{ |timer| timer.process }
new_network_io.each{ |io| io.process }
end
different from how you usually use ruby blocks. the block
is stored and invoked later (it’s asynchronous)
puts(1) puts(1)
1.times{ puts(2) } operation{ puts(3) }
puts(3) puts(2)
Events are simple
Instead of waiting for something to happen before
executing code,
Put that code in a proc,
Invoke the proc whenever something happens
Events are simple
Instead of waiting for something to happen before
executing code,
Put that code in a proc,
Invoke the proc whenever something happens
vs
queue.on_push = proc{ use(queue.pop) }
But: evented code is hard
nested blocks are hard to
parse
url = db.find_url
response = http.get(url) can’t use exceptions- error
email.send(response)
puts 'email sent' handling requires more work
EM.stop
stop the reactor and continue execution of the ruby script
EM.run{
puts "reactor started!"
EM.stop
}
EM.threadpool_size
instead of running the reactor in a thread, you can run
specific pieces of code inside a thread pool
useful for legacy blocking code, like database access
threadpool_size defaults to 20
EM does not use any threads by default, only when
you call EM.defer
EM.defer
on first invocation, spawns up EM.threadpool_size
threads in a pool
all the threads read procs from a queue and execute
them in parallel to the reactor thread
optional second proc is invoked with the result of the
first proc, but back inside the reactor thread
EM.defer(proc{
# running in a worker thread
result = long_blocking_call
result = process(result)
result
}, proc{ |result|
# back in the reactor thread
use(result)
})
Timing inside the reactor
EM::Timer
basic rule: don’t block the reactor
sleep(1) will block the reactor
use timers instead
EM.add_timer or EM::Timer.new
EM.add_periodic_timer or EM::PeriodicTimer.new
EM::PeriodicTimer.new(0.25) do
sleep 1 timer won’t fire every 0.25s
end
timer = EM::Timer.new(5){
puts 'wont happen'
}
cancel timer so it never fires timer.cancel
Managing events in the reactor
EM::Callback
many ways to specify event handlers
operation{ do_something }
operation(proc{ do_something })
operation(&method(:do_something))
operation(obj.method(:do_something))
EM::Callback
many ways to specify event handlers
operation{ do_something }
operation(proc{ do_something })
operation(&method(:do_something))
operation(obj.method(:do_something))
q = EM::Queue.new
processor = proc{ |item|
use(item){
q.pop(&processor)
}
}
q.pop(&processor)
EM::Queue
async queue
#pop does not return a value, takes a block instead
if the queue isn’t empty, block is invoked right away
otherwise, it is invoked when someone calls #push
q = EM::Queue.new
q.pop{ |item| use(item) }
q = EM::Queue.new
“recursive” proc processor = proc{ |item|
use(item){
q.pop(&processor)
}
}
q.pop(&processor)
EM::Queue
async queue
#pop does not return a value, takes a block instead
if the queue isn’t empty, block is invoked right away
otherwise, it is invoked when someone calls #push
q = EM::Queue.new
q.pop{ |item| use(item) }
q = EM::Queue.new
“recursive” proc processor = proc{ |item|
use(item){
q.pop(&processor)
}
}
pop first item q.pop(&processor)
EM::Queue
async queue
#pop does not return a value, takes a block instead
if the queue isn’t empty, block is invoked right away
otherwise, it is invoked when someone calls #push
q = EM::Queue.new
q.pop{ |item| use(item) }
q = EM::Queue.new
“recursive” proc processor = proc{ |item|
use(item){
pop next item q.pop(&processor)
}
}
pop first item q.pop(&processor)
EM::Channel
subscribers get a copy of each message published to
the channel
subscribe and unsubscribe at will
channel = EM::Channel.new
sid = channel.subscribe{ |msg|
p [:got, msg]
}
channel.push('hello world')
channel.unsubscribe(sid)
Reacting with subprocesses
EM.system
run external commands without blocking
block receives stdout and exitstatus
EM.system('ls'){ |output,status|
puts output if status.exitstatus == 0
}
EM.system
run external commands without blocking
block receives stdout and exitstatus
EM.system('ls'){ |output,status|
puts output if status.exitstatus == 0
}
EM.popen
lower level API used by EM.system
takes a handler and streams stdout to your handler
class }
class }
class }
module Handler
methods for each event def db_find_url
(instead of creating and db.find_url(method(:http_get))
passing around procs) end
def http_get(url)
handler module/class is http.get(url,
instantiated by the method(:email_send))
reactor end
def email_send(response)
use instance variables to email.send(response,
keep state method(:email_sent))
end
preferred way to write def email_sent
and organize EM code puts 'email sent'
end
end
Networking with the reactor
Network Servers and Clients
EM.start_server (and EM.stop_server) TCP Server
EM.connect (and EM.bind_connect) TCP Client
EM.open_datagram_socket UDP Socket
Network Servers and Clients
EM.start_server (and EM.stop_server) TCP Server
EM.connect (and EM.bind_connect) TCP Client
EM.open_datagram_socket UDP Socket
require 'socket'
server = TCPServer.new(2202)
while client = server.accept
msg = client.readline
client.write "You said: #{msg}"
client.close
end
module EchoHandler client.readline
def post_init blocks until it
puts "-- someone connected" receives a newline
@buf = ''
end with non-blocking
def receive_data(data) i/o, you get
@buf << data whatever data is
while line = @buf.slice!(/(.+)\r?\n/) available
send_data "You said: #{line}\n"
end
end
def unbind
puts "-- someone disconnected"
end
end
require 'socket'
EM.run{ server = TCPServer.new(2202)
EM.start_server( while client = server.accept
'0.0.0.0', 2202, EchoHander msg = client.readline
) client.write "You said: #{msg}"
} client.close
end
module EchoHandler client.readline
def post_init blocks until it
puts "-- someone connected" receives a newline
@buf = ''
end with non-blocking
def receive_data(data) i/o, you get
@buf << data whatever data is
while line = @buf.slice!(/(.+)\r?\n/) available
send_data "You said: #{line}\n"
end must verify that a
end full line was
def unbind
received
puts "-- someone disconnected"
end
end
require 'socket'
EM.run{ server = TCPServer.new(2202)
EM.start_server( while client = server.accept
'0.0.0.0', 2202, EchoHander msg = client.readline
) client.write "You said: #{msg}"
} client.close
end
module EchoHandler client.readline
def post_init blocks until it
puts "-- someone connected" receives a newline
@buf = ''
end with non-blocking
def receive_data(data) i/o, you get
@buf << data whatever data is
while line = @buf.slice!(/(.+)\r?\n/) available
send_data "You said: #{line}\n"
end must verify that a
end full line was
def unbind
received
puts "-- someone disconnected"
end TCP is a stream
end
require 'socket'
EM.run{ server = TCPServer.new(2202)
EM.start_server( while client = server.accept
'0.0.0.0', 2202, EchoHander msg = client.readline
) client.write "You said: #{msg}"
} client.close
end
TCP is a Stream
TCP is a Stream
send_data(“hello”)
send_data(“world”)
TCP is a Stream
send_data(“hello”) network
send_data(“world”)
TCP is a Stream
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
TCP is a Stream
def receive_data(data)
send_data "You said: #{data}"
end
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
TCP is a Stream
def receive_data(data)
send_data "You said: #{data}"
end
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
def receive_data(data)
(@buf ||= '') << data
if line = @buf.slice!(/(.+)\r?\n/)
send_data "You said: #{line}"
end
end
TCP is a Stream
def receive_data(data)
send_data "You said: #{data}"
end
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
def receive_data(data)
(@buf ||= '') << data append to buffer
if line = @buf.slice!(/(.+)\r?\n/)
send_data "You said: #{line}"
end
end
TCP is a Stream
def receive_data(data)
send_data "You said: #{data}"
end
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
def receive_data(data)
(@buf ||= '') << data append to buffer
if line = @buf.slice!(/(.+)\r?\n/) parse out lines
send_data "You said: #{line}"
end
end
TCP is a Stream
def receive_data(data)
send_data "You said: #{data}"
end
receive_data(“he”)
send_data(“hello”) network
receive_data(“llowo”)
send_data(“world”)
receive_data(“rld”)
def receive_data(data)
(@buf ||= '') << data append to buffer
if line = @buf.slice!(/(.+)\r?\n/) parse out lines
send_data "You said: #{line}"
end
end
def receive_data(data)
@buf ||= BufferedTokenizer.new("\n") included in EM
@buf.extract(data).each do |line|
send_data "You said: #{line}"
end
end
EM::Protocols
module RubyServer
include EM::P::ObjectProtocol
def receive_object(obj)
send_object({'you said' => obj})
end
end
EM::Protocols
module RubyServer ObjectProtocol
include EM::P::ObjectProtocol
defines receive_data
def receive_object(obj)
send_object({'you said' => obj})
end
end
EM::Protocols
module RubyServer ObjectProtocol
include EM::P::ObjectProtocol
defines receive_data
def receive_object(obj)
send_object({'you said' => obj})
end
end
External
HttpRequest
Redis http://wiki.github.com/
eventmachine/eventmachine/
CouchDB protocol-implementations
WebSocket
Cassandra
Doing more with the reactor
Other EM Features
EM.watch and EM.attach
use external file descriptors with the reactor
EM.watch_file
watch files and directories for changes
events: file_modified, file_deleted, file_moved
EM.watch_process
watch processes
events: process_forked, process_exited
EM.open_keyboard
receive_data from stdin
Using the reactor
Using EM in your webapp
Run EM in a thread
Use an EM enabled webserver
Thin or Rainbows!
call EM API methods directly from your actions
Use async_sinatra for streaming and delayed
responses
http://github.com/raggi/async_sinatra
aget '/proxy' do
aget '/delay/:n' do |n| url = params[:url]
EM.add_timer(n.to_i){ http = EM::HttpRequest.new(url).get
body "delayed for #{n}s" http.callback{ |response|
} body response.body
end }
end
DEMO
simple line based
chat server
EM::Channel to
distribute chat
messages
/say uses
EM.system and
EM::Queue to
invoke `say` on
the server
telnet 10.0.123.89 1337 EM::HttpRequest
and Yajl to
http://gist.github.com/329682 include tweets in
the chat
More Information
Homepage: http://rubyeventmachine.com
Group: http://groups.google.com/group/eventmachine
RDoc: http://eventmachine.rubyforge.org
Github: http://github.com/eventmachine/eventmachine
IRC: #eventmachine on irc.freenode.net
These slides: http://timetobleed.com
Questions?
@tmm1
github.com/tmm1