Last active
August 29, 2015 14:10
-
-
Save kazeburo/68cd958bba2402e95346 to your computer and use it in GitHub Desktop.
ruby port of Starlet and Parallel::Prefork
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# gem install pico_http_parser proc-wait3 | |
# start_server --port 8080 --signal-on-hup=USR1 -- rackup -r ./starlet.rb -E production -s Starlet \ | |
# -O MaxWorkers=1 -O MaxRequestPerChild=1000 -O MinRequestPerChild=500 -O SpawnInterval=1 config.ru | |
# | |
require 'rubygems' | |
require 'rack' | |
require 'pico_http_parser' | |
require 'stringio' | |
require 'socket' | |
require 'rack/utils' | |
require 'io/nonblock' | |
require 'proc/wait3' | |
class PreforkEngine | |
attr_reader :signal_received | |
def initialize(options={}) | |
defaults = { | |
"max_workers" => 10, | |
"spawn_interval" => 0, | |
"err_respawn_interval" => 1, | |
"trap_signals" => { | |
"TERM" => "TERM" | |
} | |
} | |
@options = defaults.merge(options) | |
@signal_received = "" | |
@manager_pid = "" | |
@generation = 0 | |
@_no_adjust_until = 0.0 | |
@in_child = false | |
@worker_pids = {} | |
@options["trap_signals"].each do |k,kv| | |
Signal.trap(k) { |signo| | |
@signal_received = Signal.signame(signo) | |
} | |
end | |
Signal.trap("CHLD") { | |
#do nothing | |
} | |
end | |
def start(&block) | |
@manager_pid = $$ | |
@signal_received = "" | |
@generation += 1 | |
raise "cannot start another process while you are in child process" if @in_child | |
# main loop | |
while @signal_received.size == 0 | |
action = self._decide_action() if @_no_adjust_until <= Time.now.to_f | |
if action > 0 then | |
# start a new worker | |
# XXX: todo before_fork | |
pid = nil | |
begin | |
pid = fork | |
rescue => e | |
# fork failed | |
warn "fork failed:#{e}"; | |
self._update_spawn_delay(@options["err_respawn_interval"]) | |
next | |
end | |
if pid == nil then | |
@in_child = true | |
@options["trap_signals"].each do |k,kv| | |
Signal.trap(k,"SIG_DFL") | |
end | |
Signal.trap("CHLD","SIG_DFL") | |
exit!(0) if @signal_received.size > 0 | |
if block then | |
block.call if block | |
self.finish(true) | |
end | |
return false | |
end | |
# parent | |
# XXX: todo after fork | |
@worker_pids[pid] = @generation | |
self._update_spawn_delay(@options["spawn_interval"]) | |
end | |
if res = self._wait() then | |
exit_pid = res[0] | |
status = res[1] | |
self._on_child_reap(exit_pid, status) | |
if @worker_pids.delete(exit_pid) == @generation && status != 0 then | |
self._update_spawn_delay(@options["err_respawn_interval"]) | |
end | |
end | |
end | |
# send signals to workers | |
if action = self._action_for(@signal_received) then | |
sig = action[0] | |
interval = action[1] | |
if interval > 0 then | |
pids = @worker_pids.keys.sort | |
@delayed_task = proc { | |
pid = pids.shift | |
Process.kill(sig, pid) | |
if pids.empty? then | |
@delayed_task = nil | |
@delayed_task_at = nil | |
else | |
@delayed_task_at = Time.now.to_f + interval | |
end | |
} | |
@delayed_task_at = 0.0 | |
@delayed_task.call | |
else | |
self.signal_all_children(sig) | |
end | |
end | |
return true | |
end #start | |
def finish(exit_status) | |
exit_status = true if exit_status == nil | |
raise "finish() shouln't be called within the manager process\n" if @manager_pid == $$ | |
exit!(exit_status) | |
end #finish | |
def signal_all_children(sig) | |
@worker_pids.keys.sort.each do |pid| | |
Process.kill(sig,pid) | |
end | |
end #signal_all_children | |
def num_workers | |
return @worker_pids.keys.length | |
end | |
def _decide_action | |
return 1 if self.num_workers < @options["max_workers"] | |
return 0 | |
end #_decide_action | |
def _on_child_reap(pid,status) | |
#XXX todo on_child_reap | |
end | |
def _handle_delayed_task | |
while true | |
return nil if !@delayed_task | |
timeleft = @delayed_task_at - Time.now.to_f; | |
return timeleft if timeleft > 0 | |
@delayed_task.call | |
end | |
end #_handle_delayed_task | |
def _action_for(sig) | |
return nil if !@options["trap_signals"].has_key?(sig) | |
t = @options["trap_signals"][sig] | |
t = [t,0] if !t.kind_of?(Enumerable) | |
return t | |
end | |
def wait_all_children | |
#XXX todo timeout | |
while !@worker_pids.keys.empty? | |
if res = self._wait() then | |
if @worker_pids.delete(res[0]) then | |
self._on_child_reap(res[0],$?.status) | |
end | |
end | |
end | |
return 0 | |
end # wait_all_children | |
def _update_spawn_delay(secs) | |
@_no_adjust_until = secs ? Time.now.to_f + secs : 0.0 | |
end | |
def _wait() | |
#XXX always blocking | |
delayed_task_sleep = self._handle_delayed_task() | |
delayed_fork_sleep = self._decide_action > 0 ? [@_no_adjust_until - Time.now.to_f,0].max : nil | |
sleep_secs = [delayed_task_sleep,delayed_fork_sleep,self._max_wait].select {|v| v != nil} | |
begin | |
if sleep_secs.min != nil then | |
sleep(sleep_secs.min) | |
# nonblock | |
return Process.wait3(1) | |
else | |
#block | |
return Process.wait3(0) | |
end | |
rescue Errno::EINTR | |
# nothing | |
end | |
return nil | |
end #_wait | |
def _max_wait | |
return nil | |
end | |
end #class | |
module Rack | |
module Handler | |
class Starlet | |
DEFAULT_OPTIONS = { | |
:Host => '0.0.0.0', | |
:Port => 9292, | |
:MaxWorkers => 10, | |
:Timeout => 300, | |
:MaxRequestPerChild => 100, | |
:MinRequestPerChild => nil, | |
:SpawnInterval => nil, | |
:ErrRespawnInterval => nil | |
} | |
NULLIO = StringIO.new("").set_encoding('BINARY') | |
def self.run(app, options={}) | |
slf = new(options) | |
slf.setup_listener() | |
slf.run_worker(app) | |
end | |
def self.valid_options | |
{ | |
"Host=HOST" => "Hostname to listen on (default: 0.0.0.0)", | |
"Port=PORT" => "Port to listen on (default: 9292)", | |
} | |
end | |
def initialize(options={}) | |
@options = DEFAULT_OPTIONS.merge(options) | |
@server = nil | |
@_is_tcp = false | |
@_using_defer_accept = false | |
end | |
def setup_listener() | |
if ENV["SERVER_STARTER_PORT"] then | |
hostport, fd = ENV["SERVER_STARTER_PORT"].split("=",2) | |
if m = hostport.match(/(.*):(\d+)/) then | |
@options[:Host] = m[0] | |
@options[:Port] = m[1].to_i | |
else | |
@options[:Port] = hostport | |
end | |
@server = TCPServer.for_fd(fd.to_i) | |
@_is_tcp = true if !@server.local_address.unix? | |
end | |
if @server == nil | |
@server = TCPServer.new(@options[:Host], @options[:Port]) | |
@server.setsockopt(:SOCKET, :REUSEADDR, 1) | |
@_is_tcp = true | |
end | |
if RUBY_PLATFORM.match(/linux/) && @_is_tcp == true then | |
begin | |
@server.setsockopt(Socket::IPPROTO_TCP, 9, 1) | |
@_using_defer_accept = true | |
end | |
end | |
end | |
def run_worker(app) | |
pm_args = { | |
"max_workers" => @options[:MaxWorkers].to_i, | |
"trap_signals" => { | |
"TERM" => 'TERM', | |
"HUP" => 'TERM', | |
}, | |
} | |
if @options[:SpawnInterval] then | |
pm_args["trap_signals"]["USR1"] = ["TERM", @options[:SpawnInterval].to_i] | |
pm_args["spawn_interval"] = @options[:SpawnInterval].to_i | |
end | |
if @options[:ErrRespawnInterval] then | |
pm_args["err_respawn_interval"] = @options[:ErrRespawnInterval].to_i | |
end | |
pe = PreforkEngine.new(pm_args) | |
while !pe.signal_received.match(/^(TERM|USR1)$/) | |
pe.start { | |
srand | |
self.accept_loop(app) | |
} | |
end | |
pe.wait_all_children | |
end | |
def _calc_reqs_per_child | |
max = @options[:MaxRequestPerChild].to_i | |
if min = @options[:MinRequestPerChild] then | |
return (max - (max - min.to_i + 1) * rand).to_i | |
end | |
return max.to_i | |
end | |
def accept_loop(app) | |
@can_exit = true | |
@term_received = 0 | |
proc_req_count = 0 | |
Signal.trap('TERM') { | |
if @can_exit then | |
exit!(true) | |
end | |
@term_received += 1 | |
if @can_exit || @term_received > 1 then | |
exit!(true) | |
end | |
} | |
Signal.trap('PIPE', 'IGNORE') | |
max_reqs = self._calc_reqs_per_child() | |
while proc_req_count < max_reqs | |
@can_exit = true | |
connection = @server.accept | |
begin | |
connection.nonblock(true) { | |
peeraddr = nil | |
peerport = 0 | |
if @_is_tcp then | |
connection.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) | |
peer = connection.peeraddr | |
peeraddr = peer[2], | |
peerport = peer[1].to_s | |
end | |
proc_req_count += 1 | |
@_is_deferred_accept = @_using_defer_accept | |
env = { | |
'SERVER_NAME' => @options[:Host], | |
'SERVER_PORT' => @options[:Port].to_s, | |
'REMOTE_ADDR' => peeraddr, | |
'REMOTE_PORT' => peerport, | |
'rack.version' => [0,1], | |
'rack.errors' => STDERR, | |
'rack.multithread' => false, | |
'rack.multiprocess' => false, | |
'rack.run_once' => false, | |
'rack.url_scheme' => 'http' | |
} | |
self.handle_connection(env, connection, app) | |
} | |
ensure | |
connection.close | |
end | |
end | |
end | |
def handle_connection(env, connection, app) | |
buf = "" | |
while true | |
readed = self.read_timeout(connection) | |
if readed == nil then | |
next | |
end | |
@can_exit = false | |
buf += readed | |
reqlen = PicoHTTPParser.parse_http_request(buf,env) | |
if reqlen >= 0 then | |
# force donwgrade to 1.0 | |
env["SERVER_PROTOCOL"] = "HTTP/1.0" | |
# handle request | |
if (cl = env["CONTENT_LENGTH"].to_i) > 0 then | |
buf = buf.unpack('C*').slice(reqlen..-1).pack('C*') | |
buffer = StringIO.new("").set_encoding('BINARY') | |
while cl > 0 | |
chunk = "" | |
if buf.bytesize > 0 then | |
chunk = buf | |
buf = "" | |
else | |
readed = self.read_timeout(connection) | |
if readed == nil then | |
return | |
end | |
chunk += readed | |
end | |
buffer << chunk | |
cl -= chunk.bytesize | |
end | |
buffer.rewind | |
env["rack.input"] = buffer | |
else | |
env["rack.input"] = NULLIO | |
end | |
status, header, body = app.call(env) | |
res_header = "HTTP/1.0 "+status.to_s+" "+Rack::Utils::HTTP_STATUS_CODES[status]+"\r\nConnection: close\r\n" | |
header.each do |k,vs| | |
if k.downcase == "connection" then | |
next | |
end | |
res_header += k + ": " + vs + "\r\n" | |
end | |
res_header += "\r\n" | |
if body.length == 1 && body[0].bytesize < 40960 then | |
ret = self.write_all(connection,res_header+body[0]) | |
else | |
ret = self.write_all(connection,res_header) | |
body.each do |part| | |
self.write_all(connection,part) | |
end | |
end | |
return true | |
elsif reqlen == -2 then | |
# request is incomplete, do nothing | |
else | |
# error | |
return nil | |
end | |
end | |
end | |
def read_timeout(connection) | |
if @_is_deferred_accept then | |
@_is_deferred_accept = false | |
else | |
if !IO.select([connection],nil,nil,300) then | |
return nil | |
end | |
end | |
while true | |
begin | |
buf = connection.sysread(4096) | |
return buf | |
rescue IO::WaitReadable | |
# retry | |
break | |
rescue EOFError | |
# closed | |
return nil | |
end | |
end | |
end | |
def write_timeout(connection, buf) | |
while true | |
begin | |
len = connection.syswrite(buf) | |
return len | |
rescue IO::WaitReadable | |
# retry | |
break | |
rescue EOFError | |
# closed | |
return nil | |
end | |
end | |
if !IO.select(nil,[connection],nil,300) then | |
return nil | |
end | |
end | |
def write_all(connection, buf) | |
off = 0 | |
while buf.bytesize - off > 0 | |
ret = self.write_timeout(connection,buf.unpack('C*').slice(off..-1).pack('C*')) | |
if ret == nil then | |
return nil | |
end | |
off += ret | |
end | |
return buf.bytesize | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment