Asynchronous Programming
In C++, two important libraries for this type of programming are Boost.Asio and Microsoft's Parallel Patterns Library (PPL) Task Library. Boost.Asio provides asynchronous operations with callback handlers. You can learn more about Boost.Asio here. The PPL Task Library provides asynchronous operations using continuations. You can learn more about PPL here.
Async/Await
There is even a proposal to add this to C++. You can see the proposal here.
Async/Await using Boost.Coroutine
Motivating example
Welcome back. Boost.Asio is very powerful, but the callbacks make the logic hard to follow. In contrast here is our version of the same code. You can find the full code at
https://github.com/jbandela/cpp_async_await/blob/master/Example2.cpp
void get_http(boost::asio::io_service& io,std::string server, std::string path){
using namespace asio_helper::handlers;
// This allows us to do await
asio_helper::do_async(io,[=,&io](asio_helper::async_helper helper){
using boost::asio::ip::tcp;
// This allows us to use the predefined handlers
// such as read_handler, write_handler, etc
using namespace asio_helper::handlers;
tcp::resolver resolver_(io);
tcp::socket socket_(io);
boost::asio::streambuf request_;
boost::asio::streambuf response_;
// Form the request. We specify the "Connection: close" header so that the
// server will close the socket after transmitting the response. This will
// allow us to treat all data up until the EOF as the content.
std::ostream request_stream(&request_);
request_stream << "GET " << path << " HTTP/1.0\r\n";
request_stream << "Host: " << server << "\r\n";
request_stream << "Accept: */*\r\n";
request_stream << "Connection: close\r\n\r\n";
// Start an asynchronous resolve to translate the server and service names
// into a list of endpoints.
tcp::resolver::query query(server, "http");
// Do async resolve
tcp::resolver::iterator endpoint_iterator;
boost::system::error_code ec;
std::tie(ec,endpoint_iterator) = helper.await<resolve_handler>(
[&](resolve_handler::callback_type cb){
resolver_.async_resolve(query,cb);
});
if(ec) {throw boost::system::system_error(ec);}
// Do async connect
std::tie(ec,std::ignore) = helper.await<composed_connect_handler>(
[&](composed_connect_handler::callback_type cb){
boost::asio::async_connect(socket_,endpoint_iterator,cb);
});
if(ec){throw boost::system::system_error(ec);}
// Connection was successful, send request
std::tie(ec,std::ignore) = helper.await<write_handler>(
[&](write_handler::callback_type cb){
boost::asio::async_write(socket_,request_,cb);
});
if(ec){throw boost::system::system_error(ec);}
// Read the response status line
std::tie(ec,std::ignore) = helper.await<read_handler>(
[&](read_handler::callback_type cb){
boost::asio::async_read_until(socket_,response_,"\r\n",cb);
});
if(ec){throw boost::system::system_error(ec);}
// Check that the response is OK
std::istream response_stream(&response_);
std::string http_version;
response_stream >> http_version;
unsigned int status_code;
response_stream >> status_code;
std::string status_message;
std::getline(response_stream, status_message);
if (!response_stream || http_version.substr(0, 5) != "HTTP/")
{
std::cout << "Invalid response\n";
return;
}
if (status_code != 200)
{
std::cout << "Response returned with status code ";
std::cout << status_code << "\n";
return;
}
// Read the response headers, which are terminated by a blank line.
std::tie(ec,std::ignore) = helper.await<read_handler>(
[&](read_handler::callback_type cb){
boost::asio::async_read_until(socket_, response_, "\r\n\r\n",cb);
});
if(ec){throw boost::system::system_error(ec);}
// Process the response headers.
std::istream response_stream2(&response_);
std::string header;
while (std::getline(response_stream2, header) && header != "\r")
std::cout << header << "\n";
std::cout << "\n";
// Write whatever content we already have to output.
if (response_.size() > 0)
std::cout << &response_;
// Continue reading remaining data until EOF.
bool done = false;
while(!done){
std::tie(ec,std::ignore) = helper.await<read_handler>(
[&](read_handler::callback_type cb){
boost::asio::async_read(socket_, response_,
boost::asio::transfer_at_least(1), cb);
});
if(ec && ec != boost::asio::error::eof){
throw boost::system::system_error(ec);
}
done = (ec == boost::asio::error::eof);
// Write all of the data so far
std::cout << &response_;
}
});
}
Discussion
Notice how we can have the code all in one function instead of spreading it out, and can read it with a single scan instead of jumping to the handler then back. The magic happens in await. Let's look at a single call// Connection was successful, send request
std::tie(ec,std::ignore) = helper.await<write_handler>(
[&](write_handler::callback_type cb){
boost::asio::async_write(socket_,request_,cb);
});
helper.await takes a template parameter to specify what handler to use. Handlers are defined in namespace asio_helper::handlers. Await takes a single function parameter that consists of a lambda. The lambda takes a parameter of write_handler::callback_type. If we were using a read_handler, it would be read_handler::callback_type and so on.helper.await returns whatever parameters were passed into the callback handler as a single value, pair,or tuple depending on the number of parameters in the handler. A read_handler has boost::system::error_code ec and std::size_t bytes_transferred as parameters so it returns an std::pair. We then can use std::tie to get the error code and ignore the bytes transferred.
The await function calls the asynchronous Boost.Asio function and then uses Boost.Coroutine to suspend our function and "return" to the calling function. Meanwhile the callback_type is a special function object that when called by Boost.Asio uses Boost.Coroutine to resume our function.
The cpp_async_await library defines handlers for the following Boost.Asio handler types in namespace asio_helper::handlers:
- read_handler for ReadHandler
- write_handler for WriteHandler
- completion_handler for CompletionHandler
- accept_handler for AcceptHandler
- composed_connect_handler for ComposedConnectHandler
- connect_handler for ConnectHandler
- resolve_handler for ResolveHandler
- wait_handler for WaitHandler
- signal_handler for SignalHandler
- ssl_handshake_handler for HandshakeHandler
- ssl_shutdown_handler for ShutdownHandler
The handlers allow async_helper::await to return as a value,pair, or tuple whatever values are passed to the callback function.
The code is at https://github.com/jbandela/cpp_async_await/
It is a header only library. For Boost.Asio you need to include asio_helper.hpp. You will need to link to boost_system and boost_context libraries. The code will compile on Windows with MSVC 2012 and on Linux with gcc 4.7.2. You need Boost version 1.53 as that is the version that has Coroutine.
There is also support for Microsoft PPL and PPLx. Include ppl_helper.hpp and pplx_helper.hpp. Due to PPL and PPLx being different from Boost.Asio, there are a few minor changes in how you use the library with PPL and PPLx.
Thanks for taking the time to read this. Download the code and take a look at it and play around with it. Let me know what you think. Next time we will talk about using this library with PPL and PPLx
-John Bandela
Great post. Addition of coroutine/context in BOOST is a big deal imho but somehow still goes largely unnoticed.
ReplyDeleteThanks for your comment
DeleteThanks for the awesome post
ReplyDeleteTo create your programing assignments and learn programing just visit this website. http://clanguageprogrames.blogspot.com/
ReplyDeleteNice post very helpful
ReplyDeleteDBAKings