ローファイ日記

出てくるコード片、ぼくが書いたものは断りがない場合 MIT License としています http://udzura.mit-license.org/

Rustでsocketpairを使ってIPCしようとしてみたが...

タイトルの通りなんだけど、おそらくはTwitterで教えてもらった

github.com

このあたりを使って、Channelのようなインタフェースでプロセス間通信を行ったほうが良いのだろう、と思いつつ、なるべく生システムコールで実装してみるテスト。

Rustでソケットペア

UnixStream::pair() がある。

doc.rust-lang.org

以下のようにUNIX domain socketのペアを作成できるので、あとはforkして、sock1を親に、sock2を子供に残せばよろしい。

let (sock1, sock2) = UnixStream::pair()?;

が、ここで、親側のsock2の方と、子供側のsock1の方のfdは明示的に close(2) しておかないといけない。ところでRustではUnixStream(の持っているfd)に対して明示的なcloseを呼ぶのには工夫が必要である。

メソッドとして close(2) 相当を持っていないしそんなトレイトもないので、一旦File Descriptorの表現(RawFd 型)に変換してそれに対してnixの nix::unistd::close() を呼ぶ。

ここで、RawFd型への変換の際に UnixStream::into_raw_fd() を呼ぶことで所有権を奪っておく必要がある。そうしないと(たとえば as_raw_fd() を呼んだりした場合) UnixStream のインスタンス自体のdropが呼ばれた際にも、fdをクローズする処理が走るので、二重クローズになってしまうそう。

こういう感じで書いた。

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (sock1, sock2) = UnixStream::pair()?;
    let sock = match unsafe { fork()? } {
        ForkResult::Parent { child } => {
            println!("Added child. Pid: {}", child);
            close(sock2.into_raw_fd())?;
            sock1.into_raw_fd()
        }
        ForkResult::Child => {
            let cmd = CString::new("/proc/self/exe").unwrap();
            close(sock1.into_raw_fd())?;
            // ...ここでexecするので実際はこの下に至らない
            0
        }
    };
    //...
}

多分、他にも drop を明示的に呼ばせるとかできる気がする。

そして、ここで問題として、sock1の所有権が一度奪われてしまうので、後続の処理でsock1に対してデータを送ることができなくなってしまう...。

なので、一度 RawFd をsock変数で受け取り、後続の処理で UnixStream::from_raw_fd() を読んでもう一度 UnixStream に戻して読み書きができるようにする。なお、 UnixStream::from_raw_fd() 自体は unsafe 。

//...
    let mut sock1 = unsafe { UnixStream::from_raw_fd(sock) };
    for i in 0..5 {
        for _ in 0..=i {
            sock1.write(b"byte ")?;
        }
        sock1.write(b"\n")?;
        sleep(Duration::from_secs(1));
    }
    while let Ok(status) = waitpid(None, None) {
        println!("Reaped child. Status: {:?}", status);
    }
//...

ついでに FD_CLOEXEC を奪ってexecした後で使う

せっかくなのでもう少しややこしいことをした。子プロセスの fd を、execした後でも継承してそこからデータを取得できるようにする。

Rustのコードから作成するfdは普通 FD_CLOEXEC フラグがついてるので、 exec したら自動でクローズして消滅する。 close-on-exec については man open とかその辺で 。

FD_CLOEXEC はfdから明示的に奪うことができる。具体的には fcntl(2) を nix::fcntl::fcntl() から呼べるそうなので呼んだ。上記のfork周りのコードがこうなる。

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (sock1, sock2) = UnixStream::pair()?;
    let sock = match unsafe { fork()? } {
        ForkResult::Parent { child } => {
            println!("Added child. Pid: {}", child);
            close(sock2.into_raw_fd())?;
            sock1.into_raw_fd()
        }
        ForkResult::Child => {
            let cmd = CString::new("/proc/self/exe").unwrap();
            close(sock1.into_raw_fd())?;

            let newsock = sock2.into_raw_fd();
            let _r = fcntl(newsock, FcntlArg::F_SETFD(FdFlag::empty()))?;

            let args = vec![
                CString::new("child").unwrap(),
                CString::new(format!("{}", newsock)).unwrap(),
            ];
            execvp(&cmd, &args.as_ref())?;
            0
        }
    };

    let mut sock1 = unsafe { UnixStream::from_raw_fd(sock) };
    //...
}

コードの通り、自分自身のバイナリを execvp() している*1。プログラム側で自分の $0 を見て処理を分岐することにする。mainの冒頭に以下を追加。

fn main() -> Result<(), Box<dyn std::error::Error>> {
    if &args().nth(0).unwrap() == "child" {
        println!("I'm a new child process");
        let sock2: RawFd = args().nth(1).unwrap().parse()?;
        let mut sock2 = unsafe { UnixStream::from_raw_fd(sock2) };

        loop {
            let mut buf = [0u8; 1024];
            if sock2.read(&mut buf)? == 0 {
                break;
            }
            print!("{}", String::from_utf8((&buf).to_vec())?.trim_matches('\0'));
        }
        println!("All drained.");
        sleep(Duration::from_secs(3));

        return Ok(());
    }
    //...
}

fd番号をexecve(execvp)経由でコマンド引数に渡し、新しいプログラムから取得する。その番号を UnixStream::from_raw_fd() で元のソケットに変換している。

これで、親プロセスから子プロセスにデータを送りつけることができるようになった。

動作の様子はこんな感じです。なんかこのサンプルのPIDでかいね...。

$ ../target/debug/cloexec-example 
Added child. Pid: 164043
I'm a new child process
byte 
byte byte 
byte byte byte 
byte byte byte byte 
byte byte byte byte byte 
All drained.
Reaped child. Status: Exited(Pid(164043), 0)

コードの全体はこちら。

github.com


という感じで、ソケットペアで2つの別々のプログラム同士で通信するサンプルを書いてみたけれど、unsafe操作が多すぎるし、自分で書くようなところではなさそう...。servo/ipc-channel はおそらくservoプロジェクトの一部で信頼できるコードのような気がするので試してみたいところ。

*1:よく考えたらp不要か。