23 unstable releases (11 breaking)

new 0.15.0 Dec 6, 2024
0.14.0 Nov 11, 2024
0.13.0 Oct 29, 2024
0.12.0 Mar 10, 2021
0.1.2 Nov 2, 2015

#96 in Memory management

Download history 204/week @ 2024-08-22 277/week @ 2024-08-29 197/week @ 2024-09-05 191/week @ 2024-09-12 229/week @ 2024-09-19 219/week @ 2024-09-26 168/week @ 2024-10-03 240/week @ 2024-10-10 237/week @ 2024-10-17 790/week @ 2024-10-24 2612/week @ 2024-10-31 3359/week @ 2024-11-07 2941/week @ 2024-11-14 2652/week @ 2024-11-21 2347/week @ 2024-11-28 2348/week @ 2024-12-05

11,028 downloads per month
Used in 14 crates (via timely)

MIT license

125KB
2K SLoC

A simple communication infrastructure providing typed exchange channels.

This crate is part of the timely dataflow system, used primarily for its inter-worker communication. It may be independently useful, but it is separated out mostly to make clear boundaries in the project.

Threads are spawned with an allocator::Generic, whose allocate method returns a pair of several send endpoints and one receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker, if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.

To be communicated, a type must implement the Bytesable trait.

Channel endpoints also implement a lower-level push and pull interface (through the Push and Pull traits), which is used for more precise control of resources.

Examples

use timely_communication::{Allocate, Bytesable};

/// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
pub struct Message {
    /// Text contents.
    pub payload: String,
}

impl Bytesable for Message {
    fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
        Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
    }

    fn length_in_bytes(&self) -> usize {
        self.payload.len()
    }

    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
        writer.write_all(self.payload.as_bytes()).unwrap();
    }
}

fn main() {

    // extract the configuration from user-supplied arguments, initialize the computation.
    let config = timely_communication::Config::from_args(std::env::args()).unwrap();
    let guards = timely_communication::initialize(config, |mut allocator| {

        println!("worker {} of {} started", allocator.index(), allocator.peers());

        // allocates a pair of senders list and one receiver.
        let (mut senders, mut receiver) = allocator.allocate(0);

        // send typed data along each channel
        for i in 0 .. allocator.peers() {
            senders[i].send(Message { payload: format!("hello, {}", i)});
            senders[i].done();
        }

        // no support for termination notification,
        // we have to count down ourselves.
        let mut received = 0;
        while received < allocator.peers() {

            allocator.receive();

            if let Some(message) = receiver.recv() {
                println!("worker {}: received: <{}>", allocator.index(), message.payload);
                received += 1;
            }

            allocator.release();
        }

        allocator.index()
    });

    // computation runs until guards are joined or dropped.
    if let Ok(guards) = guards {
        for guard in guards.join() {
            println!("result: {:?}", guard);
        }
    }
    else { println!("error in computation"); }
}

This should produce output like:

worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)

Dependencies

~1.9–2.8MB
~50K SLoC