@ledsun blog

無味の味は佳境に入らざればすなわち知れず

RubyでWebSocketクライアントを書く その3

RubyでWebSocketクライアントを書く その2 - @ledsun blog でWebSocketコネクションが確立できました。 今回はWebSocketを使ってサーバーとメッセージを送受信します。

#!/usr/bin/env ruby
# frozen_string_literal: true

require "erb"
require "socket"
require_relative "protocol_websocket_server/web_socket"

URL = "ws://localhost:2345/"

module WANDS
  class HTTPResponse
    attr_reader :header

    def parse(stream)
      @response_string = read(stream)
      parse_response_string(@response_string)
    end

    def to_s
      @response_string
    end

    private

    def read(stream)
      response_string = ""
      while (line = stream.gets) != "\r\n"
        response_string += line
      end

      response_string
    end

    def parse_response_string(response_string)
      headers, _body = response_string.split("\r\n\r\n", 2)
      headers_lines = headers.split("\r\n")
      _status_line = headers_lines.shift
      @header = headers_lines.map do |line|
        key, value = line.split(': ', 2)
        [key.downcase, [value]]
      end.to_h
    end
  end

  class UpgradeWebSocketRequest
    include ::Protocol::WebSocket::Headers

    TEMPLATE = <<~REQUEST
      GET / HTTP/1.1
      Host: <%= @host %>:<%= @port %>
      Connection: Upgrade
      Upgrade: websocket
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Key: <%= @key %>

    REQUEST

    def initialize(host, port)
      @erb = ERB.new(TEMPLATE)
      @host = host
      @port = port
      @key = Nounce.generate_key
    end

    def to_s
      @erb.result(binding).gsub(/\r?\n/, "\r\n")
    end

    def verify(response)
      accept_digest = response.header[SEC_WEBSOCKET_ACCEPT].first
      accept_digest == Nounce.accept_digest(@key) || raise("Invalid accept digest")
    end
  end
end

request = WANDS::UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = WANDS::HTTPResponse.new
response.parse(socket)

puts "Response: #{response.to_s}"

request.verify response

web_socket = WebSocket.new(socket)
web_socket.write("Hello World!")

puts web_socket.gets

socket.close

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6 - @ledsun blog で作成したWebSocketクラスがそのままつかえました。 これでPure RubyなTCPSocketライクなインターフェースを持つWebSocket用のクラスが作成できそうです。

Lunar Lakeの性能計測記録

比較のためのコマンド。Windows 11上のWSLで実行しました。

time rake npm:ruby-head-wasm-wasi

比較対象

  • システムモデル    Prestige 15 A11SB
  • プロセッサ    11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz、2995 Mhz、4 個のコア、8 個のロジカル プロセッサ
 Executed in  810.32 secs    fish           external
    usr time   34.22 mins  209.00 micros   34.22 mins
    sys time   15.46 mins  205.00 micros   15.46 mins

Lunar Lake

  • システムモデル    XPS 13 9350
  • プロセッサ    Intel(R) Core(TM) Ultra 7 258V、2200 Mhz、8 個のコア、8 個のロジカル プロセッサ
Executed in  690.63 secs    fish           external
    usr time   26.89 mins  170.00 micros   26.89 mins
    sys time   20.60 mins  185.00 micros   20.60 mins

結果

実行時間 810.32/690.63 = 1.17 倍速くなりました。 思ったより速くなっていません。

CPU使用率

  • 2980 / (810.30 * 4) = 92%
  • 2849 / (690.63 * 8) = 52%

コアは増えていますが、CPU使用率が下がっています。 Ultra 7 258Vはヘテロコアで、4つの高性能コアと4つの低消費電力コアが入っています。

ruby.wasmのビルドはCPU並列が効きやすい処理ですが、ヘテロコアではあまり性能が出ないようです。 聞くところによると、CPU負荷がまばらになりやすい並列処理はヘテロコアの方が向くそうです。 興味深い違いです。

RubyでWebSocketクライアントを書く その2

RubyでWebSocketクライアントを書く - @ledsun blog で、ソケットからレスポンス文字列を読み出すところまで実装しました。 今回はレスポンス文字列に含まれているaccept digestを検証します。

module WANDS
  class HTTPResponse
    attr_reader :header

    def parse(stream)
      @response_string = read(stream)
      parse_response_string(@response_string)
    end

    def to_s
      @response_string
    end

    private

    def read(stream)
      response_string = ""
      while (line = stream.gets) != "\r\n"
        response_string += line
      end

      response_string
    end

    def parse_response_string(response_string)
      headers, _body = response_string.split("\r\n\r\n", 2)
      headers_lines = headers.split("\r\n")
      _status_line = headers_lines.shift
      @header = headers_lines.map do |line|
        key, value = line.split(': ', 2)
        [key.downcase, [value]]
      end.to_h
    end
  end

  class UpgradeWebSocketRequest
    include ::Protocol::WebSocket::Headers

    TEMPLATE = <<~REQUEST
    GET / HTTP/1.1
    Host: <%= @host %>:<%= @port %>
    Connection: Upgrade
    Upgrade: websocket
    Sec-WebSocket-Version: 13
    Sec-WebSocket-Key: <%= @key %>

    REQUEST

    def initialize(host, port)
      @erb = ERB.new(TEMPLATE)
      @host = host
      @port = port
      @key = Nounce.generate_key
    end

    def to_s
      @erb.result(binding).gsub(/\r?\n/, "\r\n")
    end

    def verify(response)
      accept_digest = response.header[SEC_WEBSOCKET_ACCEPT].first
      accept_digest == Nounce.accept_digest(@key) || raise("Invalid accept digest")
    end
  end
end

request = WANDS::UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = WANDS::HTTPResponse.new
response.parse(socket)

puts "Response: #{response.to_s}"

request.verify response

socket.close

HTTPレスポンス文字列を良い感じにパースしてくれるライブラリが見つからなかったので、パースするクラスを作りました。 WEBrick::HTTPRequestっぽいインターフェースにしてあります。 accept digestの検証は、サーバーと同一のアルゴリズムで計算して同じ値かどうかを見ています。

RubyでWebSocketクライアントを書く

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6 - @ledsun blog でWebSocketサーバーが作れました。 今度はWebSocketクライアントを作成します。

#!/usr/bin/env ruby
# frozen_string_literal: true

require "erb"
require "async/websocket"

class UpgradeWebSocketRequest
  include ::Protocol::WebSocket::Headers

  TEMPLATE = <<~REQUEST
      GET / HTTP/1.1
      Host: <%= @host %>:<%= @port %>
      Connection: Upgrade
      Upgrade: websocket
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Key: <%= @key %>
  
    REQUEST

  def initialize(host, port)
    @erb = ERB.new(TEMPLATE)
    @host = host
    @port = port
    @key = Nounce.generate_key
  end

  def to_s
    @erb.result(binding).gsub(/\r?\n/, "\r\n")
  end
end

request = UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = ""
while (line = socket.gets) != "\r\n"
  response += line
end

puts "Response: #{response}"

socket.close

こんな感じでWebSocketコネクションの接続までは行けているはずです。 つぎはメッセージを送るところです。

意外とHTTPリクエスト文字列を作ったり、HTTPレスポンス文字列を解析したりする簡単な方法がなくて苦戦しました。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その5 - @ledsun blog で機能するようになりました。 リファクタリングします。 TCPServerっぽいインターフェースにします。

まずはWebSocketクラス。 確立されたWebSocketコネクションを表すクラスです。

require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'

class WebSocket
  include Protocol::WebSocket::Headers

  def initialize(socket)
    @socket = socket
  end

  def gets
    framer = Protocol::WebSocket::Framer.new(@socket)
    frame = framer.read_frame
    raise 'frame is not a text' unless frame.is_a? Protocol::WebSocket::TextFrame
    frame.unpack
  end

  def write(message)
    frame = Protocol::WebSocket::TextFrame.new(true, message)
    frame.write(@socket)
  end

  def close
    @socket.close
  end
end

次にWebSocketServerクラス WebSocketの接続を待ち受けるクラスです。

require 'socket'
require 'protocol/websocket/headers'
require 'webrick/httprequest'
require 'webrick/httpresponse'
require 'webrick/config'

class WebSocketServer
  include Protocol::WebSocket::Headers

  def initialize(hostname, port)
    @server = TCPServer.new hostname, port
  end

  def accept
    socket = @server.accept

    headers = read_headers_from socket
    unless headers["upgrade"].include? PROTOCOL
      socket.close
      raise "Not a websocket request"
    end

    response_key = calculate_accept_nonce_from headers
    response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
    response.status = 101
    response.upgrade! PROTOCOL
    response[SEC_WEBSOCKET_ACCEPT] = response_key

    response.send_response socket

    WebSocket.new socket
  end

  private

  def read_headers_from(socket)
    request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
    request.parse(socket)
    request.header
  end

  def calculate_accept_nonce_from(headers)
    key = headers[SEC_WEBSOCKET_KEY].first
    Nounce.accept_digest(key)
  end
end

最後にアプリケーション

require_relative 'web_socket.rb'
require_relative 'web_socket_server.rb'

server = WebSocketServer.new 'localhost', 2345

loop do
  socket = server.accept
  puts "Received: #{socket.gets}"

  socket.write "Loud and clear!"
  puts "Sent message"
  socket.close
end

どうです?かっこいいでしょう。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その5

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その4 - @ledsun blog で、HTTPリクエスト文字列、HTTPレスポンス文字列の処理をWEBrickで行うことにしました。 今回はWebSocketのFrameの読み書きをprotocol-websocketを使って行います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headers = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headers["upgrade"] = Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headers
  puts "response_key: #{response_key}"

  response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
  response.status = 101
  response.upgrade! Protocol::WebSocket::Headers::PROTOCOL
  response['Sec-WebSocket-Accept'] = response_key

  # ソケットそのものはHTTP通信で使われているものと同じ
  response.send_response socket
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  framer = Protocol::WebSocket::Framer.new(socket)
  request_frame = framer.read_frame
  raise 'frame is not a text' unless request_frame.is_a? Protocol::WebSocket::TextFrame
  puts "Received: #{request_frame.unpack}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response_frame = Protocol::WebSocket::TextFrame.new(true, response_message)
  response_frame.write(socket)

  socket.close
end

バイナリ読んでOPCODEを特定して、マスクを剥がしてPackする一連の面倒な処理が、シュッと書けました。 protocol-websocketすごい!便利。 このあとは、リファクタリングして、Async::WebSocketを使ったクライアントと通信できるかなど、試したいとおもいます。

Protocol::WebSocket::FramerProtocol::WebSocket::Framerの設計がかっこいい。 入出力の型をStreamにしてあるおかげで、普通のSocketも読み書きできるし、async-websocketで使っているAsync::IO::Streamも使える。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その4

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その3 - @ledsun blog ではWEBrickを使ってHTTPレスポンス文字列を作成しました。 今度は protocol-websocket を使ってHTTPレスポンス文字列を作成します。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'
require 'protocol/http/headers'
require 'protocol/http/response'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headers = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headers["upgrade"] = Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headers
  puts "response_key: #{response_key}"

  response_headers = Protocol::HTTP::Headers.new
  response_headers.add Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT, response_key
  response = Protocol::HTTP::Response.new("HTTP/1.1",
                                          101,
                                          response_headers,
                                          nil,
                                          Protocol::WebSocket::Headers::PROTOCOL)
  response_string = <<~HTTP
    #{response.version} #{response.status} Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    #{response.headers.to_h.map { |k, v| "#{k}: #{v.join}" }.join("\n")}
  
  HTTP

  puts response_string
  # ソケットそのものはHTTP通信で使われているものと同じ
  socket.write response_string
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

レスポンス文字列作成部分だけを抜き出します。 protocol-websocketでは

response_headers = Protocol::HTTP::Headers.new
response_headers.add Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT, response_key
response = Protocol::HTTP::Response.new("HTTP/1.1",
                                        101,
                                        response_headers,
                                        nil,
                                        Protocol::WebSocket::Headers::PROTOCOL)

response_string = <<~HTTP
  #{response.version} #{response.status} Switching Protocols
  Upgrade: websocket
  Connection: Upgrade
  #{response.headers.to_h.map { |k, v| "#{k}: #{v.join}" }.join("\n")}

HTTP

socket.write response_string

Protocol::HTTP::Responseにはレスポンス文字列を返すメソッドがなかったので、なかなか面倒になってしまいました。

WebSocketでは

response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
response.status = 101
response.upgrade! Protocol::WebSocket::Headers::PROTOCOL
response['Sec-WebSocket-Accept'] = response_key
response.send_response socket

でした。 どうやらHTTPリクエストやHTTPレスポンスを素朴に扱いたい場合は、WEBrickを使う方が便利そうです。

次回はいよいよWebSocketのFrameを作る部分でs。う

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その3

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その2 - @ledsun blog でリクエスト文字列の解析をWEBrickに置き換えました。 今度はレスポンス文字列の作成にWEBrick::HTTPResponseを使います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  ::Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headears = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headears["upgrade"] = ::Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headears
  puts "response_key: #{response_key}"

  response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
  response.status = 101
  response.upgrade! ::Protocol::WebSocket::Headers::PROTOCOL
  response['Sec-WebSocket-Accept'] = response_key

  # ソケットそのものはHTTP通信で使われているものと同じ
  response.send_response socket
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

今の時点ではWEBrick::HTTPResponseProtocol::HTTP::Responseの違いを理解できていません。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その2

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その1 - @ledsun blog ではaccept_nonceの計算にprotocol-websocketを利用しました。 今度はヘッダの解析に使います。 Async::WebSocket gemの素振り その8 - @ledsun blogでは、HTTPリクエストのパースにRackを使いました。 今回はRackを使わずにWEBrickを使います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'

def headers_of(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headears = headers_of(socket)

  # WebSocketリクエストかどうかを判定する
  unless headears["upgrade"] = ::Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  ws_key = headears[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  puts "ws_key: #{ws_key}"

  response_key = ::Protocol::WebSocket::Headers::Nounce.accept_digest(ws_key)
  puts "response_key: #{response_key}"

  handshake_response = <<~EOS
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: #{response_key}

  EOS

  # ソケットそのものはHTTP通信で使われているものと同じ
  socket.write handshake_response
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その1

Async::WebSocket gemの素振り その8 - @ledsun blogにてRubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れると良さそうなことがわかりました。 やってみましょう。

require 'socket'
require 'protocol/websocket/headers'

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストを読み込む
  http_request = ""
  while (line = socket.gets) && (line != "\r\n")
    http_request += line
  end
  puts http_request

  # WebSocketリクエストかどうかを判定する
  unless match = http_request.match(/^Sec-WebSocket-Key: (\S+)/)
    puts "Not a websocket request"
    socket.close
    next
  end

  ws_key = match[1]
  puts "ws_key: #{ws_key}"

  response_key = ::Protocol::WebSocket::Headers::Nounce.accept_digest(ws_key)
  puts "response_key: #{response_key}"

  handshake_response = <<~EOS
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: #{response_key}

  EOS

  # ソケットそのものはHTTP通信で使われているものと同じ
  socket.write handshake_response
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

最も置き換えが簡単そうなaccept_nonceの計算を::Protocol::WebSocket::Headers::Nounce.accept_digestに置き換えました。 なお、このサーバーでは、前回まで使っていたclint.rbはうごきません。 代わりにブラウザからWebSocketでメッセージを送るindex.htmlを使って動作確認します。

<html>
<head>
    <title>WebSocket Client</title>
    <script src="https://cdn.jsdelivr.net/npm/@ruby/[email protected]/dist/browser.script.iife.js"></script>
</head>
<body>
  <script type="text/ruby">
    require "js"

    ws = JS.global[:WebSocket].new("ws://localhost:2345")
    ws[:onopen] = -> (event) {
      ws.send("Hello, Server")
    }
    ws[:onmessage] = ->(event) {
      p event[:data]
    }
  </script>
</body>
</html>

Async::WebSocket gemの素振り その8

Async::WebSocket gemの素振り その7 - @ledsun blog で残された最後のAsync::WebSocket依存::Async::WebSocket::Connection.callを取り除きます。

require "protocol/http/body/readable"
require 'protocol/websocket/framer'
require 'protocol/websocket/extensions'
require 'protocol/websocket/connection'
require "protocol/rack/request"
require "protocol/rack/adapter"

class Body < ::Protocol::HTTP::Body::Readable
  # We prefer streaming directly as it's the lowest overhead.
  def stream?
    true
  end

  def call(stream)
    framer = ::Protocol::WebSocket::Framer.new(stream)
    connection = ::Protocol::WebSocket::Connection.new(framer)
    ::Protocol::WebSocket::Extensions::Server.default.apply(connection)

    # Echo back!
    message = connection.read
    connection.write message
  end
end

class App
  def self.call(env)
    req = ::Protocol::Rack::Request[env]
    res = ::Protocol::HTTP::Response.new req.version,
                                         101,
                                         response_headers_for(req),
                                         Body.new,
                                         ::Protocol::WebSocket::Headers::PROTOCOL
    Protocol::Rack::Adapter.make_response(env, res)
  end

  def self.response_headers_for(req)
    headers = ::Protocol::HTTP::Headers.new
    headers.add ::Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT,
                nonce_for(req.headers)
    headers
  end

  def self.nonce_for(headers)
    accept_nounce = headers[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY]&.first
    ::Protocol::WebSocket::Headers::Nounce.accept_digest(accept_nounce)
  end
end

run App

元々はRubyでWebSocketサーバー - @ledsun blogのように WebSocketのコネクション確立とメッセージ処理を分離したかったです。 Async::WebSocket依存を取り除いて見えたのは、Rack依存の部分でした。 方針を間違えていた事に気がつきました。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れると良さそうです。

Async::WebSocket gemの素振り その7

Async::WebSocket gemの素振り その6 - @ledsun blogAsync::HTTP::Body::Hijack.wrapを使うようになりました。 今回はAsync::HTTP::Body::Hijack.wrapを展開します。

require "protocol/http/body/readable"
require "protocol/http/body/writable"
require "protocol/http/body/stream"
require 'async/websocket/connection'
require 'protocol/websocket/framer'
require 'protocol/websocket/extensions'
require "protocol/rack/request"
require "protocol/rack/adapter"

class MyReadable < ::Protocol::HTTP::Body::Readable
  def initialize(block, input)
    @block = block
    @input = input

    @task = nil
    @stream = nil
    @output = nil
  end

  # We prefer streaming directly as it's the lowest overhead.
  def stream?
    true
  end

  def call(stream)
    @block.call(stream)
  end

  attr :input

  # Has the producer called #finish and has the reader consumed the nil token?
  def empty?
    @output&.empty?
  end

  def ready?
    @output&.ready?
  end

  # Read the next available chunk.
  def read
    @output = ::Protocol::HTTP::Body::Writable.new
    @stream = ::Protocol::HTTP::Body::Stream.new(@input, @output)
    @block.call(@stream)
    @output.read
  end

  def inspect
    "\#<#{self.class} #{@block.inspect}>"
  end

  def to_s
    "<Hijack #{@block.class}>"
  end
end

class App
  def self.call(env)
    req = ::Protocol::Rack::Request[env]

    # Set a handler for received messages
    handler = lambda do |stream|
      framer = ::Protocol::WebSocket::Framer.new(stream)
      conn = ::Async::WebSocket::Connection.call framer,
                                                 nil,
                                                 ::Protocol::WebSocket::Extensions::Server.default

      # Echo back!
      while message = conn.read
        conn.write message
      end
    end
    body = MyReadable.new(handler, req.body)

    # Prepare response headers
    response_headers = ::Protocol::HTTP::Headers.new
    accept_nounce = req.headers[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY]&.first
    response_headers.add ::Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT,
                         ::Protocol::WebSocket::Headers::Nounce.accept_digest(accept_nounce)

    # Return response
    res = ::Protocol::HTTP::Response.new(req.version, 101, response_headers, body, ::Protocol::WebSocket::Headers::PROTOCOL)
    Protocol::Rack::Adapter.make_response(env, res)
  end
end

run App

Readableクラスが増えて長くなりました。 Async::WebSocketへの依存が1つ減りました。 残るは::Async::WebSocket::Connection.callです。 ::Async::WebSocket::Connection.callを展開したら、もう少しシンプルにできそうです。

Async::WebSocket gemの素振り その6

Async::WebSocket gemの素振り その5 - @ledsun blog::Async::WebSocket::Response.forが気になりました。

::Async::WebSocket::Response.forソースコードです。

def self.for(request, headers = nil, **options, &body)
    if request.version =~ /http\/1/i
        return UpgradeResponse.new(request, headers, **options, &body)
    elsif request.version =~ /http\/2/i
        return ConnectResponse.new(request, headers, **options, &body)
    end
    
    raise UnsupportedVersionError, "Unsupported HTTP version: #{request.version}!"
end

実態は::Async::WebSocket::UpgradeResponseインスタンスを返すだけです。 現在の実験環境はHTTP1です。 ::Async::WebSocket::UpgradeResponseまで展開してみます。

require 'async/http/body/hijack'
require 'async/websocket/connection'
require 'protocol/websocket/framer'
require 'protocol/websocket/extensions'
require "protocol/rack/request"
require "protocol/rack/adapter"

class App
  def self.response(request, &block)
    headers = ::Protocol::HTTP::Headers.new
    accept_nounce = request.headers[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY]&.first
    headers.add ::Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT,
                ::Protocol::WebSocket::Headers::Nounce.accept_digest(accept_nounce)

    body = Async::HTTP::Body::Hijack.wrap(request, &block)

    ::Protocol::HTTP::Response.new(request.version, 101, headers, body, ::Protocol::WebSocket::Headers::PROTOCOL)
  end

  def self.call(env)
    req = ::Protocol::Rack::Request[env]
    res = self.response(req) do |stream|
      framer = ::Protocol::WebSocket::Framer.new(stream)
      conn = ::Async::WebSocket::Connection.call framer,
                                                 nil,
                                                 ::Protocol::WebSocket::Extensions::Server.default

      # Echo back!
      while message = conn.read
        conn.write message
      end

      res
    end

    Protocol::Rack::Adapter.make_response(env, res)
  end
end

run App

リクエストのSec-WebSocket-Keyヘッダーの値のダイジェストをレスポンスのSec-WebSocket-Acceptヘッダーで返します。 RubyでWebSocketサーバー - @ledsun blogでやったことを思い出しました。

WebSocketメッセージを処理するブロックはAsync::HTTP::Body::Hijack.wrapで動くようです。

Async::WebSocket gemの素振り その5

Async::WebSocket gemの素振り その4 - @ledsun blog でブロック呼び出しの入れ子が増えてしまったので、再度展開します。 理解しやすくするため、エラー処理も消します。

require 'async/websocket/response'
require 'async/websocket/connection'
require 'protocol/websocket/framer'
require 'protocol/websocket/extensions'
require "protocol/rack/request"
require "protocol/rack/adapter"

class App
  def self.call(env)
    req = ::Protocol::Rack::Request[env]
    res = ::Async::WebSocket::Response.for(req, protocol: nil) do |stream|
      framer = ::Protocol::WebSocket::Framer.new(stream)
      conn = ::Async::WebSocket::Connection.call framer,
                                                 nil,
                                                 ::Protocol::WebSocket::Extensions::Server.default

      # Echo back!
      while message = conn.read
        conn.write message
      end

      res
    end

    Protocol::Rack::Adapter.make_response(env, res)
  end
end

run App

コネクションを開く処理と、メッセージを受け取る処理が一箇所にある点に違和感があります。 一見すると、WebSocket経由のメッセージを全部受け取ってから、コネクション確立のレスポンスを返しそう見えます。

実際に動かすと、レスポンスは即座に返してコネクション確立して、メッセージを待ちます。 ::Async::WebSocket::Response.forの辺りに秘密がありそうです。 次はそこを見ていきます。

Async::WebSocket gemの素振り その4

Async::WebSocket gemの素振り その3 - @ledsun blog にて、ブロック呼び出しが入れ子になっていたのを解消しました。 次にAsync::WebSocket::Adapters::HTTP.openを展開します。

require 'async/websocket/response'
require 'async/websocket/connection'
require 'protocol/websocket/framer'
require 'protocol/websocket/extensions'
require "protocol/rack/request"
require "protocol/rack/adapter"

class App
  def self.websocket?(request)
    Array(request.protocol).any? { |e| e.casecmp?(::Protocol::WebSocket::Headers::PROTOCOL) }
  end

  def self.open(request)
    if websocket?(request)

      response = ::Async::WebSocket::Response.for(request, protocol: nil) do |stream|
        framer = ::Protocol::WebSocket::Framer.new(stream)
        connection = ::Async::WebSocket::Connection.call(framer, nil, ::Protocol::WebSocket::Extensions::Server.default)

        yield connection
      ensure
        connection&.close
        stream.close
      end

      # Once we get to this point, we no longer need to hold on to the request:
      request = nil

      return response
    end
  end

  def self.call(env)
    req = ::Protocol::Rack::Request[env]

    res = open(req) do |conn|
      # Echo back!
      while message = conn.read
        conn.write message
      end
    end

    Protocol::Rack::Adapter.make_response(env, res)
  end
end

run App

また、ブロック呼び出しの入れ子が増えてしまいました。