Expand description
A simple communication infrastructure providing typed exchange channels.
This crate is part of the timely dataflow system, used primarily for its inter-worker communication. It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
Threads are spawned with an allocator::Generic
, whose
allocate
method returns a pair of several send endpoints and one
receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
To be communicated, a type must implement the Bytesable
trait.
Channel endpoints also implement a lower-level push
and pull
interface (through the Push
and Pull
traits), which is used for more precise control of resources.
§Examples
use timely_communication::{Allocate, Bytesable};
/// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}
impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}
fn length_in_bytes(&self) -> usize {
self.payload.len()
}
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}
fn main() {
// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::from_args(std::env::args()).unwrap();
let guards = timely_communication::initialize(config, |mut allocator| {
println!("worker {} of {} started", allocator.index(), allocator.peers());
// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);
// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
}
// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {
allocator.receive();
if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}
allocator.release();
}
allocator.index()
});
// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
}
This should produce output like:
worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)
Re-exports§
pub use allocator::Generic as Allocator;
pub use allocator::Allocate;
pub use allocator::Exchangeable;
pub use initialize::initialize;
pub use initialize::initialize_from;
pub use initialize::Config;
pub use initialize::WorkerGuards;
Modules§
- Types and traits for the allocation of channels.
- A type that can unpark specific threads.
- Initialization logic for a generic instance of the
Allocate
channel allocation trait. - Configuration and events for communication logging.
- Networking code for sending and receiving fixed size
Vec<u8>
between machines.
Traits§
- A type that can be serialized and deserialized through
Bytes
. - Pulling elements of type
T
. - Pushing elements of type
T
.