Showing posts with label asio. Show all posts
Showing posts with label asio. Show all posts

Tuesday, April 06, 2010

Bind illustrated

Asynchronous operations in Asio all expect a function object argument, the completion handler, which they invoke when the asynchronous operation completes. The signature of the handler depends on the type of operation. For example, a handler posted using io_service::post() must have the signature:

void handler();

while an asynchronous wait operation expects:

void handler(error_code ec);

and asynchronous read/write operations want:

void handler(error_code ec, size_t length);

Non-trivial applications will need to pass some context to the completion handler, such as a this pointer. One way to do this is to use a function object adapter like boost::bind, std::tr1::bind or (as of C++0x) std::bind.

Unfortunately, for many C++ programmers, bind represents a little bit of magic. This is not helped by the impenetrable compiler errors that confront you when you use it incorrectly. And, in my experience, the underlying concept (where some function arguments are bound up-front, while others are delayed until the point of call) can present quite a steep learning curve.

I have put together some diagrams to help explain how bind works. For clarity, I have taken a few liberties with C++ syntax (e.g. omitting the parameter types on the function call operator) and (over-)simplified bind's implementation. Finally, the examples are limited to those likely to be useful with Asio. Comments and suggestions welcome.


bind can be used to adapt a user-supplied function expecting one argument into a function object that takes zero arguments. The bound value (123 in this example) is stored in a function object and is automatically passed to the user-supplied function as required:


[ click images for full size ]

Binding an argument can be used to turn a class member function into a zero-argument function object. As you know, non-static member functions have an implicit this parameter. This means that an appropriate pointer needs to be bound into the function object:


Alternatively, the implicit this can be made explicit by adapting a member function into a function object taking one argument:


Function objects will often use both bound arguments and arguments supplied at the point of use. This can be done using member functions:


or non-member functions:


Sometimes the function object's point of use will supply arguments which are not required to call the target function. bind will automatically discard these surplus arguments:


The surplus argument(s) need not be the at the end of the function object signature:


Finally, bind allows you to the reorder arguments to adapt the target function to the necessary function object signature:

Monday, April 05, 2010

Timeouts by analogy

Most networking-enabled applications have to deal with timeouts. Read or write operations may continue indefinitely, and programs need a way to determine when to tear down connections, resend requests, or take whatever other measures are necessary.

Asio includes the deadline_timer class for managing timeouts. This class aims to provide a minimal interface for scheduling events. Of course, minimalism gives little in the way of design guidance, so some users struggle in finding an elegant way to incorporate timers and timeouts into their programs.

From the minimalist perspective of Asio, there's no one true right way to do it. (Perhaps there's no better proof of that than my design preferences having changed over the years.) Yet that answer doesn't get programs written, so in this post I will try to present a simple mental model for managing timers.

Parking meters

High-traffic, commercial areas near where I live have limited on-street parking. The street parking that is available is metered. It's the usual drill:


  • Park your vehicle.

  • Feed some coins into the parking meter (or, as is more likely these days, swipe your credit card or send an SMS).

  • Go do whatever you came to do.

  • Make sure you return to your vehicle before the meter expires.

If you don't get back in time, you'd better hope your vehicle hasn't had a visit from the parking inspector. A visit means a ticket under the wipers and a nasty fine due.

Parking meters are a good analogy for reasoning about timeouts because it's easy to identify the two actors:


  • The driver of the vehicle.

  • The parking inspector.

The driver performs the following steps:


  1. Feeds the meter.

  2. Leaves the vehicle to run some errands.

  3. Returns to the vehicle.

  4. If no ticket has been issued, repeats from step 1.

  5. If a fine has been issued, goes home.

The parking inspector's job is simple:


  1. Checks whether the meter has expired.

  2. If the meter has expired, writes up a ticket.

  3. If the meter has not expired, notes how much time is remaining.

  4. Goes off for a walk until the remaining time has elapsed.

Using the analogy to inform program design

Hopefully you've already guessed how these actors map to networked applications:


  • The driver represents your protocol handling code.

  • The parking inspector corresponds to your timeout management logic.

Let's take a look at how this works in a very simple use case.

// The "driver" actor.
void session::handle_read(error_code ec, size_t length)
{
// On entering this function we have returned to the vehicle.

if (!ec)
{
// Phew, no ticket. Feed the meter.
my_timer.expires_from_now(seconds(5));

// Process incoming data.
// ...

// Run some more errands.
my_socket.async_read_some(buffer(my_buffer),
bind(&session::handle_read, this, _1, _2));
}
else
{
// We got a ticket. Go home.
}
}

// The "parking inspector" actor.
void session::handle_timeout(error_code ec)
{
// On entering this function we are checking the meter.

// Has the meter expired?
if (my_timer.expires_from_now() < seconds(0))
{
// Write up a ticket.
my_socket.close();
}
else
{
// Note remaining time and go for a walk.
my_timer.async_wait(
bind(&session::handle_timeout, this, _1));
}
}

It's important to remember that the driver may need to run multiple errands each time they leave the vehicle. In protocol terms, you might have a fixed-length header followed by a variable-length body. You only want to "feed the meter" once you have received a complete message:

// First part of the "driver" actor.
void session::handle_read_header(error_code ec)
{
// We're not back at the vehicle yet.

if (!ec)
{
// Process header.
// ...

// Run some more errands.
async_read(my_socket, buffer(my_body),
bind(&session::handle_read_body, this, _1));
}
}

// Second part of the "driver" actor.
void session::handle_read_body(error_code ec)
{
// On entering this function we have returned to the vehicle.

if (!ec)
{
// Phew, no ticket. Feed the meter.
my_timer.expires_from_now(seconds(5));

// Process complete message.
// ...

// Run some more errands.
async_read(my_socket, buffer(my_header),
bind(&session::handle_read_header, this, _1));
}
else
{
// We got a ticket. Go home.
}
}

There are many variations on this theme. For example, you may feed the meter between consecutive errands, varying the amount of money inserted (i.e. setting different length timeouts) depending on which errand comes next. In protocol terms, that might mean allowing up to 30 seconds between messages, but only a further 5 seconds is permitted once the message header has been received.

As I indicated earlier, there's no single right way to manage timeouts. In fact, there are many different facets to this problem that are probably worth exploring in their own right. However, I think that the approach shown here is probably suited to most applications and I would recommend it as a starting point when designing your timeout handling.

Wednesday, March 31, 2010

A potted guide to stackless coroutines

Keen-eyed Asio users may have noticed that Boost 1.42 includes a new example, HTTP Server 4, that shows how to use stackless coroutines in conjunction with asynchronous operations. This follows on from the coroutines I explored in the previous three posts, but with a few improvements. In particular:

  • the pesky entry pseudo-keyword is gone; and
  • a new fork pseudo-keyword has been added.

The result bears a passing resemblance to C#'s yield and friends. This post aims to document my stackless coroutine API, but before launching into a long and wordy explanation, here's a little picture to liven things up:

[ Click image for full size. This image was generated from this source code using asioviz. ]

class coroutine

Every coroutine needs to store its current state somewhere. For that we have a class called coroutine:

class coroutine
{
public:
coroutine();
bool is_child() const;
bool is_parent() const;
bool is_complete() const;
};

Coroutines are copy-constructible and assignable, and the space overhead is a single int. They can be used as a base class:

class session : coroutine
{
...
};

or as a data member:

class session
{
...
coroutine coro_;
};

or even bound in as a function argument using bind() (see previous post). It doesn't really matter as long as you maintain a copy of the object for as long as you want to keep the coroutine alive.

reenter

The reenter macro is used to define the body of a coroutine. It takes a single argument: a pointer or reference to a coroutine object. For example, if the base class is a coroutine object you may write:

reenter (this)
{
... coroutine body ...
}

and if a data member or other variable you can write:

reenter (coro_)
{
... coroutine body ...
}

When reenter is executed at runtime, control jumps to the location of the last yield or fork.

The coroutine body may also be a single statement. This lets you save a few keystrokes by writing things like:

reenter (this) for (;;)
{
...
}

Limitation: The reenter macro is implemented using a switch. This means that you must take care when using local variables within the coroutine body. The local variable is not allowed in a position where reentering the coroutine could bypass the variable definition.

yield statement

This form of the yield keyword is often used with asynchronous operations:

yield socket_->async_read_some(buffer(*buffer_), *this);

This divides into four logical steps:

  1. yield saves the current state of the coroutine.
  2. The statement initiates the asynchronous operation.
  3. The resume point is defined immediately following the statement.
  4. Control is transferred to the end of the coroutine body.

When the asynchronous operation completes, the function object is invoked and reenter causes control to transfer to the resume point. It is important to remember to carry the coroutine state forward with the asynchronous operation. In the above snippet, the current class is a function object object with a coroutine object as base class or data member.

The statement may also be a compound statement, and this permits us to define local variables with limited scope:

yield
{
mutable_buffers_1 b = buffer(*buffer_);
socket_->async_read_some(b, *this);
}

yield return expression ;

This form of yield is often used in generators or coroutine-based parsers. For example, the function object:

struct interleave : coroutine
{
istream& is1;
istream& is2;
char operator()(char c)
{
reenter (this) for (;;)
{
yield return is1.get();
yield return is2.get();
}
}
};

defines a trivial coroutine that interleaves the characters from two input streams.

This type of yield divides into three logical steps:

  1. yield saves the current state of the coroutine.
  2. The resume point is defined immediately following the semicolon.
  3. The value of the expression is returned from the function.

yield ;

This form of yield is equivalent to the following steps:

  1. yield saves the current state of the coroutine.
  2. The resume point is defined immediately following the semicolon.
  3. Control is transferred to the end of the coroutine body.

This form might be applied when coroutines are used for cooperative threading and scheduling is explicitly managed. For example:

struct task : coroutine
{
...
void operator()()
{
reenter (this)
{
while (... not finished ...)
{
... do something ...
yield;
... do some more ...
yield;
}
}
}
...
};
...
task t1, t2;
for (;;)
{
t1();
t2();
}

yield break ;

The final form of yield is adopted from C# and is used to explicitly terminate the coroutine. This form is comprised of two steps:

  1. yield sets the coroutine state to indicate termination.
  2. Control is transferred to the end of the coroutine body.
Once terminated, calls to is_complete() return true and the coroutine cannot be reentered.

Note that a coroutine may also be implicitly terminated if the coroutine body is exited without a yield, e.g. by return, throw or by running to the end of the body.

fork statement ;

The fork pseudo-keyword is used when "forking" a coroutine, i.e. splitting it into two (or more) copies. One use of fork is in a server, where a new coroutine is created to handle each client connection:

reenter (this)
{
do
{
socket_.reset(new tcp::socket(io_service_));
yield acceptor->async_accept(*socket_, *this);
fork server(*this)();
} while (is_parent());
... client-specific handling follows ...
}

The logical steps involved in a fork are:

  1. fork saves the current state of the coroutine.
  2. The statement creates a copy of the coroutine and either executes it immediately or schedules it for later execution.
  3. The resume point is defined immediately following the semicolon.
  4. For the "parent", control immediately continues from the next line.

The functions is_parent() and is_child() can be used to differentiate between parent and child. You would use these functions to alter subsequent control flow.

Note that fork doesn't do the actual forking by itself. It is your responsibility to write the statement so that it creates a clone of the coroutine and calls it. The clone can be called immediately, as above, or scheduled for delayed execution using something like io_service::post().

Wednesday, August 12, 2009

Composed operations, coroutines and code makeover

In the previous two posts, I showed some nifty macros for doing clean and simple stackless coroutines with asio. Hold on to your hats, because in this post we'll see what these coroutines can really do for your asio programs.

A design goal of asio is to provide a basis for further levels of abstraction. One of the ways to develop abstractions on top of asio is to create what I like to call composed operations. These are simply operations that are made up of calls to other lower-level operations. Asio already includes some composed operations to address common network programming problems: async_read and async_write to deal with short reads and writes; and async_read_until to perform delimited reads.

As an example, let's say we want to write a composed operation that echoes all data received on a socket until an error occurs. The way I have done this in the past (and the way composed operations like async_read_until are written) is to implement the operation as a set of function objects:

template <typename Handler>
struct echo_read_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
}

template <typename Handler>
struct echo_write_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
};

template <typename Handler>
void echo_read_handler::operator()(
error_code ec, size_t length)
{
if (!ec)
{
echo_write_handler write_handler =
{ socket, working_buffer, handler };

async_write(socket,
buffer(working_buffer, length),
write_handler);
}
else
handler(ec);
}

template <typename Handler>
void echo_write_handler::operator()(
error_code ec, size_t /*length*/)
{
if (!ec)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}
else
handler(ec);
}

and a convenience function which acts as the public interface for the abstraction:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}

Not very encouraging if you want to write your own abstractions, is it? Now imagine you've been asked to develop a composed operation to send an email using SMTP. That would involve about a dozen lower level operations, so even I probably wouldn't bother if I had to use a function object approach.

Coroutines to the rescue

In the previous two posts we already saw how to combine stackless coroutines with asio's asynchronous operations, simply by prepending the yield "keyword". I'm sure you know where this is going... We can also use a coroutine to implement a composed operation.

Let's rewrite async_echo as a coroutine:

template <typename Handler>
struct echo_coro
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
coroutine coro;
void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer), *this);

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length), *this);
}

handler(ec);
}
}
};

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_coro coro = { socket, working_buffer, handler };
coro();
}

The code is much shorter and clearer than the function object version. For an SMTP email operation the savings would be so much more, but I'll leave that as an exercise for the reader.

Now you might think this next point is obvious, but I'm going to say it anyway and put it in bold because it's important:

The fact that the composed operation is implemented as a coroutine is entirely transparent to the caller.

What does this mean? It means:

  • You can write your composed operations as coroutines, or not, as you choose.
  • You can combine those composed operations still further using coroutines (or not).

And so on and so on, up through as many layers of abstraction as you think you can reasonably fit into your program.

An alternative approach

One aspect of the implementation above still bothers me a little: repetition. Specifically, the repetition of the operation's template parameter list and the arguments (socket, working_buffer and handler) when defining the coroutine's function object.

Here's an alternative design that implements the composed operation in a single function:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
}

handler(ec);
}
}

(N.B. box() wraps the handler with another function object to prevent evaluation of the handler as a nested bind expression.)

Of course, we've just traded one type of repetition for another: the bind expressions to create the completion handlers. At this point, I think it's a matter of taste which approach you use.

Lambdas == code liposuction

It's left to C++0x lambdas to make the coroutine-in-one-function approach the clear winner in brevity, and perhaps not in the way you first expect. Combined with auto, you can use lambdas as local functions to eliminate repeated code:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
auto resume = [&]()
{
return bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
};

reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
resume());

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
resume());
}

handler(ec);
}
}

What we end up with is a composed operation in one function, a concise coroutine to specify the asynchronous control flow, and a single definition of how to reenter the coroutine.

Phew. I think we're done.

Monday, August 10, 2009

Secret sauce revealed

In my previous post, I showed a little program using stackless coroutines with asio. Obviously there's no yield keyword in C++, so without further ado here's the preprocessor magic that makes it possible:

class coroutine
{
public:
coroutine() : value_(0) {}
private:
friend class coroutine_ref;
int value_;
};

class coroutine_ref
{
public:
coroutine_ref(coroutine& c) : value_(c.value_) {}
coroutine_ref(coroutine* c) : value_(c->value_) {}
operator int() const { return value_; }
int operator=(int v) { return value_ = v; }
private:
int& value_;
};

#define reenter(c) \
switch (coroutine_ref _coro_value = c)

#define entry \
extern void you_forgot_to_add_the_entry_label(); \
bail_out_of_coroutine: break; \
case 0

#define yield \
if ((_coro_value = __LINE__) == 0) \
{ \
case __LINE__: ; \
(void)&you_forgot_to_add_the_entry_label; \
} \
else \
for (bool _coro_bool = false;; \
_coro_bool = !_coro_bool) \
if (_coro_bool) \
goto bail_out_of_coroutine; \
else

Unlike the preprocessor-based coroutines presented here, the above macros let you yield from a coroutine without having to return from a function. Instead, the yield simply breaks you out of the reenter block. That trick is what allows us to write a server in one function.

Yes, yes, I know. An echo server in a single function is a bit of a gimmick. The following snippet may give a better idea of how the coroutines can be used:

class session : coroutine
{
public:
session(tcp::acceptor& acceptor)
: acceptor_(acceptor),
socket_(new tcp::socket(acceptor.get_io_service())),
data_(new array<char, 1024>)
{
}

void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (this)
{
entry:
for (;;)
{
yield acceptor_.async_accept(
*socket_, *this);

while (!ec)
{
yield socket_->async_read_some(
buffer(*data_), *this);

if (ec) break;

yield async_write(*socket_,
buffer(*data_, length), *this);
}

socket_->close();
}
}
}

private:
tcp::acceptor& acceptor_;
shared_ptr<tcp::socket> socket_;
shared_ptr<array<char, 1024> > data_;
};

Compared to the usual boost::bind-based approach of doing callbacks, the control flow is all in one place and easy to follow.

But wait, there's more... In the next post I'll reveal the real power you get when you combine stackless coroutines and asio.

Wednesday, July 29, 2009

Wife says: "I can't believe it works"

Just a teaser:

int main()
{
try
{
using asio::ip::tcp;
using namespace boost::lambda;

asio::io_service io_service;
tcp::acceptor acceptor(io_service,
tcp::endpoint(tcp::v4(), 54321));

const int max_clients = 100;
coroutine coro[max_clients];
std::auto_ptr<tcp::socket> socket[max_clients];
asio::error_code ec[max_clients];
std::size_t length[max_clients];
boost::array<char, 1024> data[max_clients];

// Kick off all the coroutines.
int n = -1;
for (int i = 0; i < max_clients; ++i)
{
socket[i].reset(new tcp::socket(io_service));
io_service.post(
unlambda((
var(n) = i
)));
}

for (; io_service.run_one() > 0; n = -1)
{
if (n != -1)
{
reenter (coro[n])
{
entry:
for (;;)
{
// Wait for a client to connect.
yield acceptor.async_accept(
*socket[n],
unlambda((
var(n) = n,
var(ec[n]) = _1
)));

// Echo at will.
while (!ec[n])
{
yield socket[n]->async_read_some(
asio::buffer(data[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1,
var(length[n]) = _2
)));

if (!ec[n])
{
yield asio::async_write(
*socket[n],
asio::buffer(data[n], length[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1
)));
}
}

// Clean up before accepting next client.
socket[n]->close();
}
}
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}

One function. One fully asynchronous server. Bog standard C++.

Monday, October 06, 2008

Asynchronous Fork/Join using Asio

As most Asio users will no doubt be aware, multiple threads can call io_service::run() to set up a pool of threads from which the completion handlers will be executed. This can be used in conjunction with io_service::post() to execute arbitrary tasks in the thread pool.

In some rare spare moments I have used this facility to dabble in parallel algorithms (mainly to do with sorting large data sets). However, I was not completely satisfied with its ease-of-use when it came to implementing the algorithms.

Recently I came across the new Fork/Join framework that's going to be included in Java 7. I was particularly struck by the simplicity of the coinvoke() function, and was inspired to implement something similar on top of Asio. Of course, what with Asio and my preferred mode of thinking, I wanted to implement an asynchronous version.

And the result...

I created a function template that may be used to initiate two or more tasks to run in parallel:

template <typename TaskCont0, ..., typename TaskContN,
typename Task0, ..., typename TaskN, Cont>
void coinvoke(asio::io_service& io_service,
Task0 task0, ..., TaskN taskN, Cont cont);

Each task must be a function object with a single argument, where the argument is the continuation function object for the task. The TaskContN template parameters explicitly specify the function signatures used for each of the task continuations. For example:

coinvoke<void(int), void(int)>(task0, task1, cont);

says that both tasks have a continuation with the signature void(int). The combining continuation has a signature that is the concatenation of all of the task continuations' arguments. In the above example, that's void(int, int).

The operation of coinvoke() works as follows (click image for full size view):

You can get the implementation here. Don't expect the source to be readable; it contains hairy uses of template metaprogramming and preprocessor magic.

Why continuations?

You might be wondering why each task passes its result to a continuation function rather than simply returning it. The answer to that is that a task need not be a single calcuation; it could instead be the start of a chain of asynchronous operations. This means that coinvoke() could be used to simplify management of parallel operations, such as writing data to multiple sockets, and not having the handler called until all operations have finished. I plan to explore those and other related ideas further in the near future, but for now let's just look at parallel algorithms.

Fibonacci revisited

The equivalent implementation of the Fibonacci example from the Java Fork/Join paper looks like:

void combine_fib(
int a, int b,
function<void(int)> h)
{
h(a + b);
}

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
bind(calc_fib, ref(io_service), n - 1, _1),
bind(calc_fib, ref(io_service), n - 2, _1),
bind(combine_fib, _1, _2, f));
}
}

Can I have C++0x lambdas now, please?

The need to define a separate combine_fib is not ideal, since a key part of the algorithm is off in another spot. Fortunately, C++0x's new monomorphic lambdas come to the rescue:

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 1, c),
},
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 2, c),
},
[=f](int a, int b)
{
f(a + b);
});
}
}

A useful example

Since that has to come close to being the most convoluted way of calculating a Fibonacci value, here is an example where coinvoke() is used for a parallel merge sort:

template <typename Iterator>
void merge(
Iterator begin,
Iterator middle,
Iterator end,
function<void()> f)
{
std::inplace_merge(begin, middle, end);
f();
}

template <typename Iterator>
void sort(
asio::io_service& io_service,
Iterator begin,
Iterator end,
function<void()> f)
{
std::size_t n = end - begin;
if (n <= 16384)
{
std::sort(begin, end);
io_service.post(f);
}
else
{
coinvoke<void(), void()>(io_service,

// First task sorts the initial half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin, begin + n / 2, _1),

// Second task sorts the latter half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin + n / 2, end, _1),

// Continuation function merges the two sorted ranges.
bind(&merge<Iterator>,
begin, begin + n / 2, end, f)

);
}
}

On my 8-core machine, this gives a little more than a threefold speedup in sorting large vectors.

Friday, June 27, 2008

Mention in Stroustrup Interview

The A-Z of Programming Languages: C++, top of page 5:

Do you feel that resources like the boost libraries will provide this functionality/accessibility for C++?

Some of the boost libraries - especially the networking library - are a good beginning. The C++0x standard threads look a lot like boost threads. If at all possible, a C++ programmer should begin with an existing library (and/or tool), rather than building directly on fundamental language features and/or system threads.

Not by name, but hey! The rest of the interview is worth reading too. ;)

Friday, May 23, 2008

Boost.Asio vs Asio

Sometimes I am asked what the difference is between the (non-Boost) Asio and Boost.Asio packages I provide. Here is the definitive word on the subject, presented as a series of questions and answers.

What are the differences in the source code?

— Asio is in a namespace called asio::, whereas Boost.Asio puts everything under boost::asio::.

— The main Asio header file is called asio.hpp. The corresponding header in Boost.Asio is boost/asio.hpp. All other headers are similarly changed.

— Any macros used by or defined in Asio are prefixed with ASIO_. In Boost.Asio they are prefixed with BOOST_ASIO_.

— Asio includes a class for launching threads, asio::thread. Boost.Asio does not include this class, to avoid overlap with the Boost.Thread library

— Boost.Asio uses the Boost.System library to provide support for error codes (boost::system::error_code and boost::system::system_error). Asio includes these under its own namespace (asio::error_code and asio::system_error). The Boost.System version of these classes currently supports better extensibility for user-defined error codes.

— Asio is header-file-only and for most uses does not require linking against any Boost library. Boost.Asio always requires that you link against the Boost.System library, and also against Boost.Thread if you want to launch threads using boost::thread.

Where do I get a release package?

Asio is available for download from sourceforge, in a package named asio-X.Y.Z.tar.gz (or .tar.bz2 or .zip).

Boost.Asio is included in the Boost 1.35 distribution. It is also available as a separate package on sourceforge, named boost_asio_X_Y_Z.tar.gz. The latter is intended to be copied over the top of an existing Boost source code distribution.

Where are the source code repositories?

Asio uses a sourceforge-hosted CVS repository. Details of how to access it may be found here. It may also be browsed via the web.

Boost.Asio is checked into Boost's subversion repository.

How do you maintain both versions?

All development is done in the Asio CVS repository. I periodically convert the source into Boost format using a script called boostify.pl, and merge the changes into the Boost subversion repository.

Will Asio be discontinued now that Boost.Asio is included with Boost?

No. There are projects using Asio and they will continue to be supported. I also prefer to use Asio over Boost.Asio in my own projects, for the convenience of header-file-only and shorter namespaces.

Should I use Asio or Boost.Asio?

It depends. Here are some things to consider:

— If you (like me) prefer the convenience of header-file-only libraries then I'd suggest using Asio over Boost.Asio.

— If you must use a version of Boost older than 1.35 then Boost.Asio is not included. You can use Boost.Asio by copying it over the top of your Boost distribution (see above), but not everyone is comfortable doing this. In that case, I would suggest using Asio over Boost.Asio.

— I will be creating new versions of both the Asio and Boost.Asio packages on a faster release cycle than that followed by Boost. If you want to use the latest features you can still use Boost.Asio as long as you are happy to copy it over the top of your Boost distribution. If you don't want to do this, use Asio rather than Boost.Asio.

Can Asio and Boost.Asio coexist in the same program?

Yes. Since they use different namespaces there should be no conflicts, although obviously the types themselves are not interchangeable. (In case you're wondering why you might want to do this, consider a situation where a program is using third party libraries that are also using Asio internally.)

Sunday, March 30, 2008

739 days ago ...

... asio was accepted into Boost. Today you can find it as part of a Boost release. Woohoo!

Wednesday, August 08, 2007

Time Travel

Many event-driven programs involve state changes that are triggered according to the system clock. You might be coding for:

  • A share market that opens at 10:00am and closes at 4:00pm.

  • An off-peak phone billing rate that starts after 7:00pm.

  • An interest calculation that is run on the last day of every month.

The asio::deadline_timer class lets you handle this easily. For example:
using namespace boost::posix_time;
typedef boost::date_time::c_local_adjustor<ptime> local_adj;

...

asio::deadline_timer timer(io_service);

ptime open_time(second_clock::local_time().date(), hours(10));
timer.expires_at(local_adj::local_to_utc(open_time));
timer.async_wait(open_market);
There's a catch: to test that your timer events work correctly, you have to run your program at the right time of day. It usually isn't practical to sit around all day (or, worse, all year) waiting for the timers to expire.

Time Traits


You may have noticed that the asio::deadline_timer class is actually a typedef:
typedef basic_deadline_timer<boost::posix_time::ptime>
deadline_timer;
where the basic_deadline_timer class template is declared as follows:
template <
typename Time,
typename TimeTraits
= asio::time_traits<Time>,
typename TimerService
= deadline_timer_service<Time, TimeTraits> >
class basic_deadline_timer;
In the context of our problem, the most interesting template parameter is the second one: TimeTraits. An implementation of TimeTraits lets us customise the treatment of the template's Time parameter, and consequently the behaviour of the timer itself.

A TimeTraits class must implement an interface that matches the following:
class TimeTraits
{
public:
// The type used to represent an absolute time, i.e. the same
// as the Time template parameter to basic_deadline_timer.
typedef ... time_type;

// The type used to represent the difference between two
// absolute times.
typedef ... duration_type;

// Returns the current time.
static time_type now();

// Returns a new absolute time resulting from adding the
// duration d to the absolute time t.
static time_type add(time_type t, duration_type d);

// Returns the duration resulting from subtracting t2 from t1.
static duration_type subtract(time_type t1, time_type t2);

// Returns whether t1 is to be treated as less than t2.
static bool less_than(time_type t1, time_type t2);

// Returns a "posix" duration corresponding to the duration d.
static boost::posix_time::time_duration to_posix_duration(
duration_type d);
};
As you can see from the declaration of the basic_deadline_timer class template, Asio provides a default TimeTraits implementation called asio::time_traits<>.

Offsetting Now


To test our timer events at any time of our choosing, we simply need to change the definition of "now" using a custom TimeTraits class.

Since we want to use the same time types as the regular deadline_timer class, we'll start by reusing the default traits implementation:
class offset_time_traits
: public asio::deadline_timer::traits_type
{
};
The value returned by the now() function will be offset from the system clock by a specified duration:
class offset_time_traits
: public asio::deadline_timer::traits_type
{

private:
static duration_type offset_;
};
which is simply added to the system clock:
class offset_time_traits
: public asio::deadline_timer::traits_type
{
public:
static time_type now()
{
return add(asio::deadline_timer::traits_type::now(), offset_);
}


private:
static duration_type offset_;
};
Of course, we will also need to provide a way to set the offset, which can be done by setting an initial value for "now":
class offset_time_traits
: public asio::deadline_timer::traits_type
{
public:
static time_type now()
{
return add(asio::deadline_timer::traits_type::now(), offset_);
}

static void set_now(time_type t)
{
offset_ =
subtract(t, asio::deadline_timer::traits_type::now());
}


private:
static duration_type offset_;
};

Creating a Timer


To use our custom traits type with the basic_deadline_timer template, we simply need to add the following typedef:
typedef asio::basic_deadline_timer<
boost::posix_time::ptime, offset_time_traits> offset_timer;
To see the offset timer in action, let's create a timer to fire precisely at the start of the coming new year. So we don't have to wait until then, we'll set "now" to be just ten seconds prior to midnight:
offset_time_traits::set_now(
boost::posix_time::from_iso_string("20071231T235950"));

offset_timer timer(io_service);
timer.expires_at(
boost::posix_time::from_iso_string("20080101T000000"));
timer.async_wait(handle_timeout);

io_service.run();
When the program is run, it will take just ten seconds to complete.

Jumping Through Time


One feature not supported by the above solution is the ability to change the definition of "now" after the timers have been started. However, if your timer events are spread across a long period of time, then this is likely to be something you would want.

Let's say that the next timer does not expire for several hours, but in an attempt to speed things up we call set_now() to move to just seconds before. The problem with the above traits class is that the existing asynchronous wait operation does not know about the change to "now", and so will continue to run for the remaining hours.

Fortunately, Asio provides a way around this: by customising the to_posix_duration() function in our traits class.

The to_posix_duration() function is normally used to convert from a user-defined duration type to a type that Asio knows about (namely boost::posix_time::time_duration). The key point here is that this converted duration value is used by Asio to determine how long to wait until the timer expires. Furthermore, it doesn't matter if this function returns a duration that is smaller (even substantially so) than the actual duration. The timer won't fire early, because Asio guarantees that it won't expire until the following condition holds true:
!TimeTraits::less_than(Time_Traits::now(), timer.expires_at())
So, by adding the to_posix_duration() function to our traits class:
class offset_time_traits
: public asio::deadline_timer::traits_type
{
public:
static time_type now()
{
return add(asio::deadline_timer::traits_type::now(), offset_);
}

static void set_now(time_type t)
{
offset_ =
subtract(t, asio::deadline_timer::traits_type::now());
}

static boost::posix_time::time_duration to_posix_duration(
duration_type d)
{
return d < boost::posix_time::seconds(5)
? d : boost::posix_time::seconds(5);
}


private:
static duration_type offset_;
};
we can ensure that Asio detects changes to the offset within seconds.

Thursday, April 26, 2007

New home heating solution

For quite some time I have wanted to take a really good look at improving Boost.Asio's scalability across multiple processors. Unfortunately, getting unlimited access to this sort of hardware has been somewhat problematic. What I really needed was a decent multiprocessor box at home :)

The arrival of Intel's Clovertown quad core CPUs gave me the opportunity. The primary goal was to maximise parallelism while minimising the cost, and the lowest spec quad cores were cheap enough for me to justify spending the money. So, after months of thinking (and weeks of waiting for parts), last week I finally completed my home-built server.

Here are the headline specs:

  • Two Intel Xeon E5310 quad core processors (1.6 GHz)

  • Tyan Tempest i5000XL motherboard

  • 2GB DDR2-667 fully buffered ECC DIMMs

  • OCZ GameXStream 700W power supply (OK, OK, it's a little overpowered, but it was in stock!)

  • Other stuff like case, hard disk, DVD drive, video card and wireless LAN card

  • Lots of fans to provide soothing ambient noise

So far I have installed CentOS Linux 5 on it, but also plan to try Solaris 10 and FreeBSD. It seems pretty snappy.

Monday, January 15, 2007

Unbuffered socket iostreams

Boost.Asio includes an iostreams-based interface to TCP sockets, ip::tcp::iostream, for simple use cases. However, like the file iostreams provided by the standard library, ip::tcp::iostream buffers input and output data. This can lead to problems if you forget to explicitly flush the stream. For example, consider the following code to perform an HTTP request:
ip::tcp::iostream stream("www.boost.org", "http");
stream << "GET / HTTP/1.0\r\n"
<< "Host: www.boost.org\r\n"
<< "\r\n";

std::string response_line;
std::getline(stream, response_line);
...
The code will be stuck on the getline() call waiting for the response, because the request will still be sitting stream's output buffer. The correct code looks like this:
ip::tcp::iostream stream("www.boost.org", "http");
stream << "GET / HTTP/1.0\r\n"
<< "Host: www.boost.org\r\n"
<< "\r\n"
<< std::flush;
The std::flush will cause the stream to send the entire contents of its output buffer at that point.

Boost.Asio now supports an alternative solution: turn off the stream's output buffering. This is accomplished as follows:
ip::tcp::iostream stream("www.boost.org", "http");
stream.rdbuf()->pubsetbuf(0, 0);
stream << "GET / HTTP/1.0\r\n"
<< "Host: www.boost.org\r\n"
<< "\r\n";
Now you can send and receive to your heart's content, without having to worry about whether your message is stuck in the output buffer, but be warned: an unbuffered stream is a lot less efficient in terms of system calls. Don't use this feature if you care about performance.

Friday, November 10, 2006

Buffer debugging

Some standard library implementations, such as the one that ships with MSVC 8.0, provide a nifty feature called iterator debugging. What this means is that the validity of your iterators is checked at runtime. If you try to use an iterator that has been invalidated, you'll get an assertion. For example:
std::vector<int> v(1)
std::vector<int>::iterator i = v.begin();
v.clear(); // invalidates iterators
*i = 0; // assertion!
Boost.Asio now takes advantage of this feature to add buffer debugging. Consider the following code:
void dont_do_this()
{
std::string msg = "Hello, world!";
asio::async_write(sock, asio::buffer(msg), my_handler);
}
When you call an asynchronous read or write you need to ensure that the buffers for the operation are valid until the completion handler is called. In the above example, the buffer is the std::string variable msg. This variable is on the stack, and so it goes out of scope before the asynchronous operation completes. If you're lucky, your application will crash. Often you will get random failures.

With the new buffer debug checking, however, Boost.Asio stores an iterator into the string until the asynchronous operation completes, and then dereferences it to check its validity. In the above example you get an assertion failure just before Boost.Asio tries to call the completion handler.

This feature has only been tested with MSVC 8.0 so far, but it should work with any other implementation that supports iterator debugging. Obviously there's a performance cost to this checking, so it's only enabled in debug builds. You can also explicitly disable it by defining BOOST_ASIO_DISABLE_BUFFER_DEBUGGING (or ASIO_DISABLE_BUFFER_DEBUGGING if you're using standalone asio).

Saturday, October 07, 2006

FreeBSD support

Asio has now been tested successfully on FreeBSD 6.0, and should support FreeBSD 5.5 and later. FreeBSD 5.4 and earlier are not supported since, according to the man pages, getaddrinfo is not thread-safe.

Tuesday, September 26, 2006

SSL password callbacks

On the weekend I made some changes to asio to support password callbacks for SSL. There is a new function on the asio::ssl::context class called set_password_callback(), which takes a function object with the following signature:
std::string password_callback(
std::size_t max_length,
ssl::context::password_purpose purpose);
The callback must return the password as a string. The max_length argument indicates the maximum allowable length of the password, and if the returned string is longer it will be truncated. The context::password_purpose type is an enum with values for_reading and for_writing. In most cases you won't need to use the max_length or purpose arguments, and if you use boost::bind() to create the function object you can just leave them off. For example, the SSL server sample included with asio now has the following:
context_.set_password_callback(
boost::bind(&server::get_password, this));

...

std::string get_password() const
{
return "test";
}
The final thing to note is that the password callback needs to be set before calling any ssl::context functions that load keys, such as use_private_key_file().