@ledsun blog

無味の味は佳境に入らざればすなわち知れず

DRbWebSocket::DRbMessageを作って捨てた

dRubyをWebSocketに載せるために、dRubyの1つの情報をWebSocketの1つのテキストフレームとして送信する方針で実装してみました。 でも、よく考えたら非効率っぽいのでやめました。

  • WebSocketのフレームのヘッダーは6バイト
  • dRubyはシンプルなLength-Prefixed Protocolでヘッダーは4バイト
  • dRubyのsend_requestで少なくとも5つの情報を送る

テキストフレームを情報ごとに5分割するとフレームヘッダーが6x5=30バイト必要です。 一つのテキストフレームに5つの情報をまとめて送ればフレームヘッダー 6 + dRubyヘッダー 4x5=26バイトで済みます。

前者の方が「WebSocketを使いこなしているっぽい」と思ってはしゃいで実装しました。 残念です。 供養のためにコードをここに残しておきます。

# frozen_string_literal: true

require "drb/drb"

module DRbWebSocket
  # Send and receive messages for DRb over WebSocket.
  # Use TextFrame for WebSocket.
  class DRbMessage < DRb::DRbMessage
    def send_request(stream, ref, msg_id, arg, b)
      stream.write(dump(ref.__drbref))
      stream.write(dump(msg_id.id2name))
      stream.write(dump(arg.length))
      arg.each do |e|
        stream.write(dump(e))
      end
      stream.write(dump(b))
    rescue StandardError
      raise(DRbConnError, $ERROR_INFO.message, $ERROR_INFO.backtrace)
    end

    def recv_request(stream)
      ref = load(stream)
      ro = DRb.to_obj(ref)
      msg = load(stream)

      argc = load(stream)
      raise(DRbConnError, "too many arguments") if @argc_limit < argc

      argv = Array.new(argc, nil)
      argc.times do |n|
        argv[n] = load(stream)
      end

      block = load(stream)

      [ro, msg, argv, block]
    end

    def send_reply(stream, succ, result)
      stream.write(dump(succ))
      stream.write(dump(result, !succ))
    rescue StandardError
      raise(DRbConnError, $ERROR_INFO.message, $ERROR_INFO.backtrace)
    end

    def load(soc)
      begin
        str = soc.gets
      rescue StandardError
        raise(DRb::DRbConnError, $ERROR_INFO.message, $ERROR_INFO.backtrace)
      end
      raise(DRb::DRbConnError, "connection closed") if str.nil?

      DRb.mutex.synchronize do
        Marshal.load(str)
      rescue NameError, ArgumentError
        DRbUnknown.new($ERROR_INFO, str)
      end
    end

    def dump(obj, error = false)
      case obj
      when DRbUndumped
        obj = make_proxy(obj, error)
      when Object
        # nothing
      else
        obj = make_proxy(obj, error)
      end

      begin
        str = Marshal.dump(obj)
      rescue StandardError
        str = Marshal.dump(make_proxy(obj, error))
      end

      str
    end
  end
end

心理的安全性

心理的安全性」という言葉の使われ方にモヤモヤすることがあります。 例えば

  • 心理的安全性があるので、攻撃的な言葉を使って他人を批判しても良い
  • 経営上の理由があっても解雇されるのは心理的安全性を損なう

このような使い方をしたら、ある個人の求める「心理的安全性」が他社の「心理的安全性」を損ないそうです。 「心理的安全性」とは、このような矛盾した概念なのでしょうか? そこで次の文献に当たりました。

この本は「心理的安全性」の提案者の本です。 「心理的安全性」が何かを理解するのに最適な雰囲気がします。 実際に読んでみると「心理的安全性」が何かを知るには、本文より解説が役に立ちました。 自分の理解を書いてみます。

心理的安全性」とはグループの状態を表す言葉です。 グループに所属する個人が感じるものではありません。 ですので「私の心理的安全性を損なう」という言い方は、「心理的安全性」の本来の定義からすると、矛盾した表現です。

また「心理的安全性」とは、イノベーションが起きるグループの性質を表した言葉です。 イノベーションが起きるグループは

を持っています。 多様性から今までにないアイデアの組み合わせが生まれイノベーションが起こります。 多様性のあるグループでも「他人と違う意見を言う」ことを恐れるグループでは多様な意見が出ません。 「心理的安全性」とはイノベーションが起きるグループの「他人と違う意見を言っても良い」性質を表す言葉です。

解雇される場合も、異なる意見を封殺するための解雇であれば「心理的安全性」を損ないます。 一方で、経営資源を集約するためなど別の理由のための解雇であれば「心理的安全性」は損ないません。 もちろん解雇することで多様性は損なわれます。 一時的にイノベーションは遠のくと思います。 それは経営上の失敗の結果です。 グループの多様性が市場に合っていなかったから、起きたイノベーションも市場に合わなかったのかもしれません。 一時的にグループを小さくして多様性のあり方を変更するのは、それほどおかしい話ではないように感じます。

WebSocketライブラリー: Wandsの誕生

Wandsと言う名前のgemを作りました。WebSocket通信用のライブラリーです。Web and Socketを略してWandsです。 魔法の杖のワンドとも掛けてあります。*1

Rubyにはfaye-websocket-rubyasync-websocketなどのライブラリーがあります。 これらのライブラリーはRack上で使う前提です。 つまりHTTPサーバーに組み込む想定です。 そりゃそうですよね。WebSocketはHTTPプロトコルの一部ですから。

dRubyに組み込む場合にはRackを使いません。 そこでTCPSocketをちょっとだけラップしたgemを作りました。

当初、ちょっとしたライブラリーなのでgemにするつもりはありませんでした。 Ruby 3.4のリリースパーティーに行って色々おしゃべりしているうちにgemにしたくなりました。 せっかく年末で時間があるのでgemにしました。 3年ぶりにgemを作りました。

3年も経つとジェネレーターが整備され、gemを作るのが簡単になっていました。 rbsも作りました。 テストにはasyncを使っています。 スレッドを立てなくてもクライアントとサーバーの接続テストができるのが面白いです。

次はこのgemを使ってdRubyのWebSocketプロトコルを作っていきます。

自作WebSocktライブラリーのテストがGitHub Actionsでだけ失敗する

失敗しているGitHub Actionsです。 https://github.com/ledsun/wands/actions/runs/12547524257/job/34985157639#step:4:48

次のようなエラーが起きています。

/opt/hostedtoolcache/Ruby/3.4.1/x64/lib/ruby/3.4.0/uri/generic.rb:601:in 'URI::Generic#check_host': bad component(expected host component): ::1 (URI::InvalidComponentError)
    from /opt/hostedtoolcache/Ruby/3.4.1/x64/lib/ruby/3.4.0/uri/generic.rb:640:in 'URI::Generic#host='
    from /home/runner/work/wands/wands/vendor/bundle/ruby/3.4.0/gems/webrick-1.9.1/lib/webrick/httprequest.rb:520:in 'WEBrick::HTTPRequest#parse_uri'
    from /home/runner/work/wands/wands/vendor/bundle/ruby/3.4.0/gems/webrick-1.9.1/lib/webrick/httprequest.rb:218:in 'WEBrick::HTTPRequest#parse'
    from /home/runner/work/wands/wands/lib/wands/web_socket_server.rb:72:in 'Wands::WebSocketServer#read_headers_from'
    from /home/runner/work/wands/wands/lib/wands/web_socket_server.rb:52:in 'Wands::WebSocketServer#accept'
    from /home/runner/work/wands/wands/test/test_wands.rb:15:in 'block (2 levels) in TestWands#test_open_connection'
    from /home/runner/work/wands/wands/vendor/bundle/ruby/3.4.0/gems/async-2.21.1/lib/async/task.rb:197:in 'block in Async::Task#run'
    from /home/runner/work/wands/wands/vendor/bundle/ruby/3.4.0/gems/async-2.21.1/lib/async/task.rb:435:in 'block in Async::Task#schedule'

こんな感じのコードが実行されているようです。

irb(main):007> uri = URI.parse('/')
=> #<URI::Generic />
irb(main):008> uri.host = '::1'

WEBRickソースコードでは https://github.com/ruby/webrick/blob/v1.9.1/lib/webrick/httprequest.rb#L512-L520 です。

      elsif self["host"]
        host, port = parse_host_request_line(self["host"])
      elsif @addr.size > 0
        host, port = @addr[2], @addr[1]
      else
        host, port = @config[:ServerName], @config[:Port]
      end
      uri.scheme = @forwarded_proto || scheme
      uri.host = host

HOSTヘッダーがなかったのでIPアドレスを取得しているようです。 こんな感じの動作だと思います。

irb(main):019> socket = TCPServer.new(11111)
=> #<TCPServer:fd 5, AF_INET, 0.0.0.0, 11111>
irb(main):020> socket.addr[2]
=> "0.0.0.0"

ローカル環境でテストを実行すると、IPv6アドレスの::1がとれて居るみたいです。 IPv6アドレスの時はuri.host = '[::1]'のようにアドレスを[]で囲ってあげる必要があります。

HTTPリクエストでHOSTヘッダーを設定すれば、このテストは通るようになりそうです。

しかし、他のIPv6環境でWEBrickを動かしても起きそうな気がします。 こんなイメージです。

irb(main):023> socket = TCPServer.new('::1', 11111)
=> #<TCPServer:fd 8, AF_INET6, ::1, 11111>
irb(main):025> uri.host = socket.addr[2]
/home/ledsun/.rbenv/versions/3.4.1/lib/ruby/3.4.0/uri/generic.rb:601:in 'URI::Generic#check_host': bad component(expected host component): ::1 (URI::InvalidComponentError)

DRb Websocket protocolの準備をする

RubyでWebSocketライブラリ - @ledsun blogで、RubyのWebSocketクラスができました。 これをdRubyに組み込みたいのです。 が、足りないAPIがあります。

例えば

https://github.com/ruby/drb/blob/69c2ef531f08a0874908a4306c014b325070e1fe/lib/drb/drb.rb#L987

readables, = IO.select([@socket, @shutdown_pipe_r])

です。 dRubyサーバーとして動いたときに、クライアントからの接続とCtrl + Cによる強制終了を両方待てるようにしています。 このときIO.selectの引数になります。 このためto_ioメソッドを実装しなくてはいけません。 to_ioメソッドはTCPSocketクラスが実装しています。 委譲すれば終わりです。

require 'socket'
require 'forwardable'
require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'
require_relative 'upgrade_request'
require_relative 'http_response'

module WANDS
  # This is a class that represents WebSocket, which has the same interface as TCPSocket.
  #
  # The WebSocket class is responsible for reading and writing messages to the server.
  #
  # Example usage:
  #
  # web_socket = WebSocket.open('localhost', 2345)
  # web_socket.write("Hello World!")
  #
  # puts web_socket.gets
  #
  # web_socket.close
  #
  class WebSocket
    include Protocol::WebSocket::Headers
    extend Forwardable

    attr_reader :remote_address
    def_delegators :@socket, :close, :to_io

    def self.open(host, port)
      socket = TCPSocket.new('localhost', 2345)
      request = UpgradeRequest.new
      socket.write(request.to_s)
      socket.flush

      response = HTTPResponse.new
      response.parse(socket)

      request.verify response

      self.new(socket)
    end

    def initialize(socket)
      @socket = socket
      @remote_address = socket.remote_address
    end

    # @return [String]
    def gets
      framer = Protocol::WebSocket::Framer.new(@socket)
      frame = framer.read_frame
      raise 'frame is not a text' unless frame.is_a? Protocol::WebSocket::TextFrame
      frame.unpack
    end

    def write(message)
      frame = Protocol::WebSocket::TextFrame.new(true, message)
      frame.write(@socket)
    end
  end
end

RubyでWebSocketライブラリ

RubyでWebSocketクライアントを書く その3 - @ledsun blog までの諸諸の調査をまとめます。 このライブラリの目的はdRubyTCPトランスポートを置き換えるために、TCPSocket、TCPServerのインターフェースに近い形で、WebSocketを扱う事です。

require 'socket'
require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'
require_relative 'upgrade_request'
require_relative 'http_response'

module WANDS
  # This is a class that represents WebSocket, which has the same interface as TCPSocket.
  #
  # The WebSocket class is responsible for reading and writing messages to the server.
  #
  # Example usage:
  #
  # web_socket = WebSocket.open('localhost', 2345)
  # web_socket.write("Hello World!")
  #
  # puts web_socket.gets
  #
  # web_socket.close
  #
  class WebSocket
    include Protocol::WebSocket::Headers
    attr_reader :remote_address

    def self.open(host, port)
      socket = TCPSocket.new('localhost', 2345)
      request = UpgradeRequest.new
      socket.write(request.to_s)
      socket.flush

      response = HTTPResponse.new
      response.parse(socket)

      request.verify response

      self.new(socket)
    end

    def initialize(socket)
      @socket = socket
      @remote_address = socket.remote_address
    end

    # @return [String]
    def gets
      framer = Protocol::WebSocket::Framer.new(@socket)
      frame = framer.read_frame
      raise 'frame is not a text' unless frame.is_a? Protocol::WebSocket::TextFrame
      frame.unpack
    end

    def write(message)
      frame = Protocol::WebSocket::TextFrame.new(true, message)
      frame.write(@socket)
    end

    def close = @socket&.close
  end
end
require 'socket'
require 'protocol/websocket/headers'
require 'webrick/httprequest'
require 'webrick/httpresponse'
require 'webrick/config'
require_relative 'web_socket'

module WANDS
  # The WebSocketServer class is responsible for accepting WebSocket connections.
  # This class has the same interface as TCPServer.
  #
  # Example usage:
  #
  # server = WebSocketServer.new('localhost', 2345)
  # loop do
  #  begin
  #   socket = server.accept
  #   next unless socket
  #   puts "Accepted connection from #{socket.remote_address.ip_address} #{socket.remote_address.ip_port}"
  #
  #   received_message = socket.gets
  #   puts "Received: #{received_message}"
  #
  #   socket.write received_message
  #   socket.close
  #  rescue WEBrick::HTTPStatus::EOFError => e
  #   STDERR.puts e.message
  #  rescue Errno::ECONNRESET => e
  #   STDERR.puts "#{e.message} #{socket.remote_address.ip_address} #{socket.remote_address.ip_port}"
  #  rescue EOFError => e
  #   STDERR.puts "#{e.message} #{socket.remote_address.ip_address} #{socket.remote_address.ip_port}"
  #  end
  # end
  #
  class WebSocketServer
    include Protocol::WebSocket::Headers

    def initialize(hostname, port)
      @tcp_server = TCPServer.new hostname, port
    end

    def accept
      socket = @tcp_server.accept

      headers = read_headers_from socket
      unless headers["upgrade"].include? PROTOCOL
        socket.close
        raise "Not a websocket request"
      end

      response = response_to headers
      response.send_response socket

      WebSocket.new socket
    rescue WEBrick::HTTPStatus::BadRequest => e
      STDERR.puts e.message
      socket.write "HTTP/1.1 400 Bad Request\r\n\r\n"
      socket.close
    end

    private

    def read_headers_from(socket)
      request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
      request.parse(socket)
      request.header
    end

    def response_to(headers)
      response_key = calculate_accept_nonce_from headers
      response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
      response.status = 101
      response.upgrade! PROTOCOL
      response[SEC_WEBSOCKET_ACCEPT] = response_key

      response
    end

    def calculate_accept_nonce_from(headers)
      key = headers[SEC_WEBSOCKET_KEY].first
      Nounce.accept_digest(key)
    end
  end
end
require 'erb'
require 'protocol/websocket/headers'

module WANDS
  # The request is used to upgrade the HTTP connection to a WebSocket connection.
  class UpgradeRequest
    include ::Protocol::WebSocket::Headers

    TEMPLATE = <<~REQUEST
      GET / HTTP/1.1
      Connection: Upgrade
      Upgrade: websocket
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Key: <%= @key %>

    REQUEST

    ERB = ERB.new(TEMPLATE).freeze

    def initialize
      @key = Nounce.generate_key
    end

    def to_s
      ERB.result(binding).gsub(/\r?\n/, "\r\n")
    end

    def verify(response)
      accept_digest = response.header[SEC_WEBSOCKET_ACCEPT].first
      accept_digest == Nounce.accept_digest(@key) || raise("Invalid accept digest")
    end
  end
end
module WANDS
  # This is a class that parses the response from the server and stores the headers in a hash.
  # The parse and header methods in this class are modeled on WEBrick::HTTPRequest.
  #
  # The expected HTTP response string is:
  #
  # HTTP/1.1 101 Switching Protocols
  # Upgrade: websocket
  # Connection: Upgrade
  # Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  # Sec-WebSocket-Protocol: chat
  # Sec-WebSocket-Version: 13
  #

  # Example usage:
  #
  # response = HTTPResponse.new
  # response.parse(socket)
  # response.header["upgrade"] # => ["websocket"]
  # response.header["connection"] # => ["Upgrade"]
  # response.header["sec-websocket-accept"] # => ["s3pPLMBiTxaQ9kYGzzhZRbK+xOo="]
  #
  class HTTPResponse
    attr_reader :header

    def parse(stream)
      @response = read_from stream
      @header = headers_of @response
    end

    def to_s
      @response
    end

    private

    def read_from(stream)
      response_string = ""
      while (line = stream.gets) != "\r\n"
        response_string += line
      end

      response_string
    end

    # Parse the headers from the HTTP response string.
    def headers_of(response_string)
      # Split the response string into headers and body.
      headers, _body = response_string.split("\r\n\r\n", 2)

      # Split the headers into lines.
      headers_lines = headers.split("\r\n")

      # The first line is the status line.
      # We don't need it, so we remove it from the headers.
      _status_line = headers_lines.shift

      # Parse the headers into a hash.
      headers_lines.map do |line|
        # Split the line into header name and value.
        header_name, value = line.split(': ', 2)
        [header_name.downcase, [value.strip]]
      end.to_h
    end
  end
end

RubyでWebSocketクライアントを書く その3

RubyでWebSocketクライアントを書く その2 - @ledsun blog でWebSocketコネクションが確立できました。 今回はWebSocketを使ってサーバーとメッセージを送受信します。

#!/usr/bin/env ruby
# frozen_string_literal: true

require "erb"
require "socket"
require_relative "protocol_websocket_server/web_socket"

URL = "ws://localhost:2345/"

module WANDS
  class HTTPResponse
    attr_reader :header

    def parse(stream)
      @response_string = read(stream)
      parse_response_string(@response_string)
    end

    def to_s
      @response_string
    end

    private

    def read(stream)
      response_string = ""
      while (line = stream.gets) != "\r\n"
        response_string += line
      end

      response_string
    end

    def parse_response_string(response_string)
      headers, _body = response_string.split("\r\n\r\n", 2)
      headers_lines = headers.split("\r\n")
      _status_line = headers_lines.shift
      @header = headers_lines.map do |line|
        key, value = line.split(': ', 2)
        [key.downcase, [value]]
      end.to_h
    end
  end

  class UpgradeWebSocketRequest
    include ::Protocol::WebSocket::Headers

    TEMPLATE = <<~REQUEST
      GET / HTTP/1.1
      Host: <%= @host %>:<%= @port %>
      Connection: Upgrade
      Upgrade: websocket
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Key: <%= @key %>

    REQUEST

    def initialize(host, port)
      @erb = ERB.new(TEMPLATE)
      @host = host
      @port = port
      @key = Nounce.generate_key
    end

    def to_s
      @erb.result(binding).gsub(/\r?\n/, "\r\n")
    end

    def verify(response)
      accept_digest = response.header[SEC_WEBSOCKET_ACCEPT].first
      accept_digest == Nounce.accept_digest(@key) || raise("Invalid accept digest")
    end
  end
end

request = WANDS::UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = WANDS::HTTPResponse.new
response.parse(socket)

puts "Response: #{response.to_s}"

request.verify response

web_socket = WebSocket.new(socket)
web_socket.write("Hello World!")

puts web_socket.gets

socket.close

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6 - @ledsun blog で作成したWebSocketクラスがそのままつかえました。 これでPure RubyなTCPSocketライクなインターフェースを持つWebSocket用のクラスが作成できそうです。

Lunar Lakeの性能計測記録

比較のためのコマンド。Windows 11上のWSLで実行しました。

time rake npm:ruby-head-wasm-wasi

比較対象

  • システムモデル    Prestige 15 A11SB
  • プロセッサ    11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz、2995 Mhz、4 個のコア、8 個のロジカル プロセッサ
 Executed in  810.32 secs    fish           external
    usr time   34.22 mins  209.00 micros   34.22 mins
    sys time   15.46 mins  205.00 micros   15.46 mins

Lunar Lake

  • システムモデル    XPS 13 9350
  • プロセッサ    Intel(R) Core(TM) Ultra 7 258V、2200 Mhz、8 個のコア、8 個のロジカル プロセッサ
Executed in  690.63 secs    fish           external
    usr time   26.89 mins  170.00 micros   26.89 mins
    sys time   20.60 mins  185.00 micros   20.60 mins

結果

実行時間 810.32/690.63 = 1.17 倍速くなりました。 思ったより速くなっていません。

CPU使用率

  • 2980 / (810.30 * 4) = 92%
  • 2849 / (690.63 * 8) = 52%

コアは増えていますが、CPU使用率が下がっています。 Ultra 7 258Vはヘテロコアで、4つの高性能コアと4つの低消費電力コアが入っています。

ruby.wasmのビルドはCPU並列が効きやすい処理ですが、ヘテロコアではあまり性能が出ないようです。 聞くところによると、CPU負荷がまばらになりやすい並列処理はヘテロコアの方が向くそうです。 興味深い違いです。

RubyでWebSocketクライアントを書く その2

RubyでWebSocketクライアントを書く - @ledsun blog で、ソケットからレスポンス文字列を読み出すところまで実装しました。 今回はレスポンス文字列に含まれているaccept digestを検証します。

module WANDS
  class HTTPResponse
    attr_reader :header

    def parse(stream)
      @response_string = read(stream)
      parse_response_string(@response_string)
    end

    def to_s
      @response_string
    end

    private

    def read(stream)
      response_string = ""
      while (line = stream.gets) != "\r\n"
        response_string += line
      end

      response_string
    end

    def parse_response_string(response_string)
      headers, _body = response_string.split("\r\n\r\n", 2)
      headers_lines = headers.split("\r\n")
      _status_line = headers_lines.shift
      @header = headers_lines.map do |line|
        key, value = line.split(': ', 2)
        [key.downcase, [value]]
      end.to_h
    end
  end

  class UpgradeWebSocketRequest
    include ::Protocol::WebSocket::Headers

    TEMPLATE = <<~REQUEST
    GET / HTTP/1.1
    Host: <%= @host %>:<%= @port %>
    Connection: Upgrade
    Upgrade: websocket
    Sec-WebSocket-Version: 13
    Sec-WebSocket-Key: <%= @key %>

    REQUEST

    def initialize(host, port)
      @erb = ERB.new(TEMPLATE)
      @host = host
      @port = port
      @key = Nounce.generate_key
    end

    def to_s
      @erb.result(binding).gsub(/\r?\n/, "\r\n")
    end

    def verify(response)
      accept_digest = response.header[SEC_WEBSOCKET_ACCEPT].first
      accept_digest == Nounce.accept_digest(@key) || raise("Invalid accept digest")
    end
  end
end

request = WANDS::UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = WANDS::HTTPResponse.new
response.parse(socket)

puts "Response: #{response.to_s}"

request.verify response

socket.close

HTTPレスポンス文字列を良い感じにパースしてくれるライブラリが見つからなかったので、パースするクラスを作りました。 WEBrick::HTTPRequestっぽいインターフェースにしてあります。 accept digestの検証は、サーバーと同一のアルゴリズムで計算して同じ値かどうかを見ています。

RubyでWebSocketクライアントを書く

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6 - @ledsun blog でWebSocketサーバーが作れました。 今度はWebSocketクライアントを作成します。

#!/usr/bin/env ruby
# frozen_string_literal: true

require "erb"
require "async/websocket"

class UpgradeWebSocketRequest
  include ::Protocol::WebSocket::Headers

  TEMPLATE = <<~REQUEST
      GET / HTTP/1.1
      Host: <%= @host %>:<%= @port %>
      Connection: Upgrade
      Upgrade: websocket
      Sec-WebSocket-Version: 13
      Sec-WebSocket-Key: <%= @key %>
  
    REQUEST

  def initialize(host, port)
    @erb = ERB.new(TEMPLATE)
    @host = host
    @port = port
    @key = Nounce.generate_key
  end

  def to_s
    @erb.result(binding).gsub(/\r?\n/, "\r\n")
  end
end

request = UpgradeWebSocketRequest.new('localhost', 2345)
puts request.to_s

socket = TCPSocket.new('localhost', 2345)
socket.write(request.to_s)
socket.flush

puts 'Request sent'

response = ""
while (line = socket.gets) != "\r\n"
  response += line
end

puts "Response: #{response}"

socket.close

こんな感じでWebSocketコネクションの接続までは行けているはずです。 つぎはメッセージを送るところです。

意外とHTTPリクエスト文字列を作ったり、HTTPレスポンス文字列を解析したりする簡単な方法がなくて苦戦しました。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その6

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その5 - @ledsun blog で機能するようになりました。 リファクタリングします。 TCPServerっぽいインターフェースにします。

まずはWebSocketクラス。 確立されたWebSocketコネクションを表すクラスです。

require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'

class WebSocket
  include Protocol::WebSocket::Headers

  def initialize(socket)
    @socket = socket
  end

  def gets
    framer = Protocol::WebSocket::Framer.new(@socket)
    frame = framer.read_frame
    raise 'frame is not a text' unless frame.is_a? Protocol::WebSocket::TextFrame
    frame.unpack
  end

  def write(message)
    frame = Protocol::WebSocket::TextFrame.new(true, message)
    frame.write(@socket)
  end

  def close
    @socket.close
  end
end

次にWebSocketServerクラス WebSocketの接続を待ち受けるクラスです。

require 'socket'
require 'protocol/websocket/headers'
require 'webrick/httprequest'
require 'webrick/httpresponse'
require 'webrick/config'

class WebSocketServer
  include Protocol::WebSocket::Headers

  def initialize(hostname, port)
    @server = TCPServer.new hostname, port
  end

  def accept
    socket = @server.accept

    headers = read_headers_from socket
    unless headers["upgrade"].include? PROTOCOL
      socket.close
      raise "Not a websocket request"
    end

    response_key = calculate_accept_nonce_from headers
    response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
    response.status = 101
    response.upgrade! PROTOCOL
    response[SEC_WEBSOCKET_ACCEPT] = response_key

    response.send_response socket

    WebSocket.new socket
  end

  private

  def read_headers_from(socket)
    request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
    request.parse(socket)
    request.header
  end

  def calculate_accept_nonce_from(headers)
    key = headers[SEC_WEBSOCKET_KEY].first
    Nounce.accept_digest(key)
  end
end

最後にアプリケーション

require_relative 'web_socket.rb'
require_relative 'web_socket_server.rb'

server = WebSocketServer.new 'localhost', 2345

loop do
  socket = server.accept
  puts "Received: #{socket.gets}"

  socket.write "Loud and clear!"
  puts "Sent message"
  socket.close
end

どうです?かっこいいでしょう。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その5

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その4 - @ledsun blog で、HTTPリクエスト文字列、HTTPレスポンス文字列の処理をWEBrickで行うことにしました。 今回はWebSocketのFrameの読み書きをprotocol-websocketを使って行います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'
require 'protocol/websocket/framer'
require 'protocol/websocket/text_frame'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headers = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headers["upgrade"] = Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headers
  puts "response_key: #{response_key}"

  response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
  response.status = 101
  response.upgrade! Protocol::WebSocket::Headers::PROTOCOL
  response['Sec-WebSocket-Accept'] = response_key

  # ソケットそのものはHTTP通信で使われているものと同じ
  response.send_response socket
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  framer = Protocol::WebSocket::Framer.new(socket)
  request_frame = framer.read_frame
  raise 'frame is not a text' unless request_frame.is_a? Protocol::WebSocket::TextFrame
  puts "Received: #{request_frame.unpack}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response_frame = Protocol::WebSocket::TextFrame.new(true, response_message)
  response_frame.write(socket)

  socket.close
end

バイナリ読んでOPCODEを特定して、マスクを剥がしてPackする一連の面倒な処理が、シュッと書けました。 protocol-websocketすごい!便利。 このあとは、リファクタリングして、Async::WebSocketを使ったクライアントと通信できるかなど、試したいとおもいます。

Protocol::WebSocket::FramerProtocol::WebSocket::Framerの設計がかっこいい。 入出力の型をStreamにしてあるおかげで、普通のSocketも読み書きできるし、async-websocketで使っているAsync::IO::Streamも使える。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その4

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その3 - @ledsun blog ではWEBrickを使ってHTTPレスポンス文字列を作成しました。 今度は protocol-websocket を使ってHTTPレスポンス文字列を作成します。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'
require 'protocol/http/headers'
require 'protocol/http/response'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headers = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headers["upgrade"] = Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headers
  puts "response_key: #{response_key}"

  response_headers = Protocol::HTTP::Headers.new
  response_headers.add Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT, response_key
  response = Protocol::HTTP::Response.new("HTTP/1.1",
                                          101,
                                          response_headers,
                                          nil,
                                          Protocol::WebSocket::Headers::PROTOCOL)
  response_string = <<~HTTP
    #{response.version} #{response.status} Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    #{response.headers.to_h.map { |k, v| "#{k}: #{v.join}" }.join("\n")}
  
  HTTP

  puts response_string
  # ソケットそのものはHTTP通信で使われているものと同じ
  socket.write response_string
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

レスポンス文字列作成部分だけを抜き出します。 protocol-websocketでは

response_headers = Protocol::HTTP::Headers.new
response_headers.add Protocol::WebSocket::Headers::SEC_WEBSOCKET_ACCEPT, response_key
response = Protocol::HTTP::Response.new("HTTP/1.1",
                                        101,
                                        response_headers,
                                        nil,
                                        Protocol::WebSocket::Headers::PROTOCOL)

response_string = <<~HTTP
  #{response.version} #{response.status} Switching Protocols
  Upgrade: websocket
  Connection: Upgrade
  #{response.headers.to_h.map { |k, v| "#{k}: #{v.join}" }.join("\n")}

HTTP

socket.write response_string

Protocol::HTTP::Responseにはレスポンス文字列を返すメソッドがなかったので、なかなか面倒になってしまいました。

WebSocketでは

response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
response.status = 101
response.upgrade! Protocol::WebSocket::Headers::PROTOCOL
response['Sec-WebSocket-Accept'] = response_key
response.send_response socket

でした。 どうやらHTTPリクエストやHTTPレスポンスを素朴に扱いたい場合は、WEBrickを使う方が便利そうです。

次回はいよいよWebSocketのFrameを作る部分でs。う

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その3

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その2 - @ledsun blog でリクエスト文字列の解析をWEBrickに置き換えました。 今度はレスポンス文字列の作成にWEBrick::HTTPResponseを使います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'

def read_headers_from(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

def calculate_accept_nonce_from(headers)
  key = headers[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  ::Protocol::WebSocket::Headers::Nounce.accept_digest(key)
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headears = read_headers_from socket

  # WebSocketリクエストかどうかを判定する
  unless headears["upgrade"] = ::Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  response_key = calculate_accept_nonce_from headears
  puts "response_key: #{response_key}"

  response = WEBrick::HTTPResponse.new(WEBrick::Config::HTTP)
  response.status = 101
  response.upgrade! ::Protocol::WebSocket::Headers::PROTOCOL
  response['Sec-WebSocket-Accept'] = response_key

  # ソケットそのものはHTTP通信で使われているものと同じ
  response.send_response socket
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end

今の時点ではWEBrick::HTTPResponseProtocol::HTTP::Responseの違いを理解できていません。

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その2

RubyでつくるWebSocketサーバーにprotocol-websocket gemを取り入れる その1 - @ledsun blog ではaccept_nonceの計算にprotocol-websocketを利用しました。 今度はヘッダの解析に使います。 Async::WebSocket gemの素振り その8 - @ledsun blogでは、HTTPリクエストのパースにRackを使いました。 今回はRackを使わずにWEBrickを使います。

require 'socket'
require 'webrick'
require 'protocol/websocket/headers'

def headers_of(socket)
  request = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
  request.parse(socket)
  request.header
end

server = TCPServer.new 'localhost',
                       2345

loop do
  # ここら辺はecho_server.rbと同じ。純粋なTCP通信
  socket = server.accept

  # HTTPリクエストheaderを読み込む
  headears = headers_of(socket)

  # WebSocketリクエストかどうかを判定する
  unless headears["upgrade"] = ::Protocol::WebSocket::Headers::PROTOCOL
    puts "Not a websocket request"
    socket.close
    next
  end

  ws_key = headears[::Protocol::WebSocket::Headers::SEC_WEBSOCKET_KEY].first
  puts "ws_key: #{ws_key}"

  response_key = ::Protocol::WebSocket::Headers::Nounce.accept_digest(ws_key)
  puts "response_key: #{response_key}"

  handshake_response = <<~EOS
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: #{response_key}

  EOS

  # ソケットそのものはHTTP通信で使われているものと同じ
  socket.write handshake_response
  puts 'Handshake response sent'

  # ここからはWebSocket通信
  first_byte = socket.getbyte
  fin = first_byte & 0b10000000
  opcode = first_byte & 0b00001111

  raise 'fin bit is not set' unless fin
  raise 'opcode is not a text' unless opcode == 0x1

  second_byte = socket.getbyte
  is_masked = second_byte & 0b10000000
  payload_size = second_byte & 0b01111111

  raise 'mask bit is not set' unless is_masked
  raise 'payload size > 125 is not supported' unless payload_size <= 125

  puts "Payload size: #{payload_size}"

  mask = 4.times.map { socket.getbyte }
  puts "Mask: #{mask}"

  data = payload_size.times.map.with_index { socket.getbyte ^ mask[_2 % 4] }
  puts "Data: #{data.pack('C*')}"

  # クライアントにデータを返す
  response_message = "Loud and clear!"
  response = [0b10000001,
              response_message.size,
              response_message
            ].pack("CCA#{response_message.size}")
  socket.write response

  socket.close
end