Skip to content

Instantly share code, notes, and snippets.

@kazeburo
Last active August 29, 2015 14:10
Show Gist options
  • Save kazeburo/68cd958bba2402e95346 to your computer and use it in GitHub Desktop.
Save kazeburo/68cd958bba2402e95346 to your computer and use it in GitHub Desktop.
ruby port of Starlet and Parallel::Prefork
# -*- coding: utf-8 -*-
# gem install pico_http_parser proc-wait3
# start_server --port 8080 --signal-on-hup=USR1 -- rackup -r ./starlet.rb -E production -s Starlet \
# -O MaxWorkers=1 -O MaxRequestPerChild=1000 -O MinRequestPerChild=500 -O SpawnInterval=1 config.ru
#
require 'rubygems'
require 'rack'
require 'pico_http_parser'
require 'stringio'
require 'socket'
require 'rack/utils'
require 'io/nonblock'
require 'proc/wait3'
class PreforkEngine
attr_reader :signal_received
def initialize(options={})
defaults = {
"max_workers" => 10,
"spawn_interval" => 0,
"err_respawn_interval" => 1,
"trap_signals" => {
"TERM" => "TERM"
}
}
@options = defaults.merge(options)
@signal_received = ""
@manager_pid = ""
@generation = 0
@_no_adjust_until = 0.0
@in_child = false
@worker_pids = {}
@options["trap_signals"].each do |k,kv|
Signal.trap(k) { |signo|
@signal_received = Signal.signame(signo)
}
end
Signal.trap("CHLD") {
#do nothing
}
end
def start(&block)
@manager_pid = $$
@signal_received = ""
@generation += 1
raise "cannot start another process while you are in child process" if @in_child
# main loop
while @signal_received.size == 0
action = self._decide_action() if @_no_adjust_until <= Time.now.to_f
if action > 0 then
# start a new worker
# XXX: todo before_fork
pid = nil
begin
pid = fork
rescue => e
# fork failed
warn "fork failed:#{e}";
self._update_spawn_delay(@options["err_respawn_interval"])
next
end
if pid == nil then
@in_child = true
@options["trap_signals"].each do |k,kv|
Signal.trap(k,"SIG_DFL")
end
Signal.trap("CHLD","SIG_DFL")
exit!(0) if @signal_received.size > 0
if block then
block.call if block
self.finish(true)
end
return false
end
# parent
# XXX: todo after fork
@worker_pids[pid] = @generation
self._update_spawn_delay(@options["spawn_interval"])
end
if res = self._wait() then
exit_pid = res[0]
status = res[1]
self._on_child_reap(exit_pid, status)
if @worker_pids.delete(exit_pid) == @generation && status != 0 then
self._update_spawn_delay(@options["err_respawn_interval"])
end
end
end
# send signals to workers
if action = self._action_for(@signal_received) then
sig = action[0]
interval = action[1]
if interval > 0 then
pids = @worker_pids.keys.sort
@delayed_task = proc {
pid = pids.shift
Process.kill(sig, pid)
if pids.empty? then
@delayed_task = nil
@delayed_task_at = nil
else
@delayed_task_at = Time.now.to_f + interval
end
}
@delayed_task_at = 0.0
@delayed_task.call
else
self.signal_all_children(sig)
end
end
return true
end #start
def finish(exit_status)
exit_status = true if exit_status == nil
raise "finish() shouln't be called within the manager process\n" if @manager_pid == $$
exit!(exit_status)
end #finish
def signal_all_children(sig)
@worker_pids.keys.sort.each do |pid|
Process.kill(sig,pid)
end
end #signal_all_children
def num_workers
return @worker_pids.keys.length
end
def _decide_action
return 1 if self.num_workers < @options["max_workers"]
return 0
end #_decide_action
def _on_child_reap(pid,status)
#XXX todo on_child_reap
end
def _handle_delayed_task
while true
return nil if !@delayed_task
timeleft = @delayed_task_at - Time.now.to_f;
return timeleft if timeleft > 0
@delayed_task.call
end
end #_handle_delayed_task
def _action_for(sig)
return nil if !@options["trap_signals"].has_key?(sig)
t = @options["trap_signals"][sig]
t = [t,0] if !t.kind_of?(Enumerable)
return t
end
def wait_all_children
#XXX todo timeout
while !@worker_pids.keys.empty?
if res = self._wait() then
if @worker_pids.delete(res[0]) then
self._on_child_reap(res[0],$?.status)
end
end
end
return 0
end # wait_all_children
def _update_spawn_delay(secs)
@_no_adjust_until = secs ? Time.now.to_f + secs : 0.0
end
def _wait()
#XXX always blocking
delayed_task_sleep = self._handle_delayed_task()
delayed_fork_sleep = self._decide_action > 0 ? [@_no_adjust_until - Time.now.to_f,0].max : nil
sleep_secs = [delayed_task_sleep,delayed_fork_sleep,self._max_wait].select {|v| v != nil}
begin
if sleep_secs.min != nil then
sleep(sleep_secs.min)
# nonblock
return Process.wait3(1)
else
#block
return Process.wait3(0)
end
rescue Errno::EINTR
# nothing
end
return nil
end #_wait
def _max_wait
return nil
end
end #class
module Rack
module Handler
class Starlet
DEFAULT_OPTIONS = {
:Host => '0.0.0.0',
:Port => 9292,
:MaxWorkers => 10,
:Timeout => 300,
:MaxRequestPerChild => 100,
:MinRequestPerChild => nil,
:SpawnInterval => nil,
:ErrRespawnInterval => nil
}
NULLIO = StringIO.new("").set_encoding('BINARY')
def self.run(app, options={})
slf = new(options)
slf.setup_listener()
slf.run_worker(app)
end
def self.valid_options
{
"Host=HOST" => "Hostname to listen on (default: 0.0.0.0)",
"Port=PORT" => "Port to listen on (default: 9292)",
}
end
def initialize(options={})
@options = DEFAULT_OPTIONS.merge(options)
@server = nil
@_is_tcp = false
@_using_defer_accept = false
end
def setup_listener()
if ENV["SERVER_STARTER_PORT"] then
hostport, fd = ENV["SERVER_STARTER_PORT"].split("=",2)
if m = hostport.match(/(.*):(\d+)/) then
@options[:Host] = m[0]
@options[:Port] = m[1].to_i
else
@options[:Port] = hostport
end
@server = TCPServer.for_fd(fd.to_i)
@_is_tcp = true if !@server.local_address.unix?
end
if @server == nil
@server = TCPServer.new(@options[:Host], @options[:Port])
@server.setsockopt(:SOCKET, :REUSEADDR, 1)
@_is_tcp = true
end
if RUBY_PLATFORM.match(/linux/) && @_is_tcp == true then
begin
@server.setsockopt(Socket::IPPROTO_TCP, 9, 1)
@_using_defer_accept = true
end
end
end
def run_worker(app)
pm_args = {
"max_workers" => @options[:MaxWorkers].to_i,
"trap_signals" => {
"TERM" => 'TERM',
"HUP" => 'TERM',
},
}
if @options[:SpawnInterval] then
pm_args["trap_signals"]["USR1"] = ["TERM", @options[:SpawnInterval].to_i]
pm_args["spawn_interval"] = @options[:SpawnInterval].to_i
end
if @options[:ErrRespawnInterval] then
pm_args["err_respawn_interval"] = @options[:ErrRespawnInterval].to_i
end
pe = PreforkEngine.new(pm_args)
while !pe.signal_received.match(/^(TERM|USR1)$/)
pe.start {
srand
self.accept_loop(app)
}
end
pe.wait_all_children
end
def _calc_reqs_per_child
max = @options[:MaxRequestPerChild].to_i
if min = @options[:MinRequestPerChild] then
return (max - (max - min.to_i + 1) * rand).to_i
end
return max.to_i
end
def accept_loop(app)
@can_exit = true
@term_received = 0
proc_req_count = 0
Signal.trap('TERM') {
if @can_exit then
exit!(true)
end
@term_received += 1
if @can_exit || @term_received > 1 then
exit!(true)
end
}
Signal.trap('PIPE', 'IGNORE')
max_reqs = self._calc_reqs_per_child()
while proc_req_count < max_reqs
@can_exit = true
connection = @server.accept
begin
connection.nonblock(true) {
peeraddr = nil
peerport = 0
if @_is_tcp then
connection.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
peer = connection.peeraddr
peeraddr = peer[2],
peerport = peer[1].to_s
end
proc_req_count += 1
@_is_deferred_accept = @_using_defer_accept
env = {
'SERVER_NAME' => @options[:Host],
'SERVER_PORT' => @options[:Port].to_s,
'REMOTE_ADDR' => peeraddr,
'REMOTE_PORT' => peerport,
'rack.version' => [0,1],
'rack.errors' => STDERR,
'rack.multithread' => false,
'rack.multiprocess' => false,
'rack.run_once' => false,
'rack.url_scheme' => 'http'
}
self.handle_connection(env, connection, app)
}
ensure
connection.close
end
end
end
def handle_connection(env, connection, app)
buf = ""
while true
readed = self.read_timeout(connection)
if readed == nil then
next
end
@can_exit = false
buf += readed
reqlen = PicoHTTPParser.parse_http_request(buf,env)
if reqlen >= 0 then
# force donwgrade to 1.0
env["SERVER_PROTOCOL"] = "HTTP/1.0"
# handle request
if (cl = env["CONTENT_LENGTH"].to_i) > 0 then
buf = buf.unpack('C*').slice(reqlen..-1).pack('C*')
buffer = StringIO.new("").set_encoding('BINARY')
while cl > 0
chunk = ""
if buf.bytesize > 0 then
chunk = buf
buf = ""
else
readed = self.read_timeout(connection)
if readed == nil then
return
end
chunk += readed
end
buffer << chunk
cl -= chunk.bytesize
end
buffer.rewind
env["rack.input"] = buffer
else
env["rack.input"] = NULLIO
end
status, header, body = app.call(env)
res_header = "HTTP/1.0 "+status.to_s+" "+Rack::Utils::HTTP_STATUS_CODES[status]+"\r\nConnection: close\r\n"
header.each do |k,vs|
if k.downcase == "connection" then
next
end
res_header += k + ": " + vs + "\r\n"
end
res_header += "\r\n"
if body.length == 1 && body[0].bytesize < 40960 then
ret = self.write_all(connection,res_header+body[0])
else
ret = self.write_all(connection,res_header)
body.each do |part|
self.write_all(connection,part)
end
end
return true
elsif reqlen == -2 then
# request is incomplete, do nothing
else
# error
return nil
end
end
end
def read_timeout(connection)
if @_is_deferred_accept then
@_is_deferred_accept = false
else
if !IO.select([connection],nil,nil,300) then
return nil
end
end
while true
begin
buf = connection.sysread(4096)
return buf
rescue IO::WaitReadable
# retry
break
rescue EOFError
# closed
return nil
end
end
end
def write_timeout(connection, buf)
while true
begin
len = connection.syswrite(buf)
return len
rescue IO::WaitReadable
# retry
break
rescue EOFError
# closed
return nil
end
end
if !IO.select(nil,[connection],nil,300) then
return nil
end
end
def write_all(connection, buf)
off = 0
while buf.bytesize - off > 0
ret = self.write_timeout(connection,buf.unpack('C*').slice(off..-1).pack('C*'))
if ret == nil then
return nil
end
off += ret
end
return buf.bytesize
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment