Friday, April 26, 2013

C# style async/await in C++ - Part 1: Introduction and use with Boost.Asio

Asynchronous Programming


Asynchronous programming has become more and more important recently as a way to efficiently use the resources available with multicore processors yet at the same time avoid dealing with locking primitives.
In C++, two important libraries for this type of programming are Boost.Asio and Microsoft's Parallel Patterns Library (PPL) Task Library. Boost.Asio provides asynchronous operations with callback handlers. You can learn more about Boost.Asio here. The PPL Task Library provides asynchronous operations using continuations. You can learn more about PPL here.

Async/Await


The problem with using these libraries is that they operate differently from synchronous programming. Your logic ends up being in either multiple callback handlers or in multiple lambda continuations. C# recently added async/await to make it easier to write asynchronous code. You can find out more about them here and watch a presentation here.
There is even a proposal to add this to C++. You can see the proposal here.

Async/Await using Boost.Coroutine


However, you don't want to wait for a language proposal to be approved and then get implemented my compilers to make your programming easier. In fact, you can have a lot of the benefit now. The key that you need is Boost.Coroutine. Boost.Coroutine is in the 1.53 release of Boost. You can read about Boost.Coroutine here. Using Boost.Coroutine, I wrote cpp_async_await which is an open source library with a Boost Software License that allows (as much as possible with a library only solution) async/await style programming in C++ with Boost.Asio and Microsoft PPL/PPLx.

Motivating example


Go take a look at a simple async http client using raw Boost.Asio http://www.boost.org/doc/libs/1_53_0/doc/html/boost_asio/example/http/client/async_client.cpp



Welcome back. Boost.Asio is very powerful, but the callbacks make the logic hard to follow. In contrast here is our version of the same code. You can find the full code at

https://github.com/jbandela/cpp_async_await/blob/master/Example2.cpp

void get_http(boost::asio::io_service& io,std::string server, std::string path){

    using namespace asio_helper::handlers;
    // This allows us to do await
    asio_helper::do_async(io,[=,&io](asio_helper::async_helper helper){

        using boost::asio::ip::tcp;

        // This allows us to use the predefined handlers
        // such as read_handler, write_handler, etc
        using namespace asio_helper::handlers;

        tcp::resolver resolver_(io);
        tcp::socket socket_(io);
        boost::asio::streambuf request_;
        boost::asio::streambuf response_;

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        std::ostream request_stream(&request_);
        request_stream << "GET " << path << " HTTP/1.0\r\n";
        request_stream << "Host: " << server << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Start an asynchronous resolve to translate the server and service names
        // into a list of endpoints.
        tcp::resolver::query query(server, "http");

        // Do async resolve
        tcp::resolver::iterator endpoint_iterator;
        boost::system::error_code ec;
        std::tie(ec,endpoint_iterator) =  helper.await<resolve_handler>(
            [&](resolve_handler::callback_type cb){
                resolver_.async_resolve(query,cb);
        });
        if(ec) {throw boost::system::system_error(ec);}

        // Do async connect
        std::tie(ec,std::ignore) = helper.await<composed_connect_handler>(
            [&](composed_connect_handler::callback_type cb){
                boost::asio::async_connect(socket_,endpoint_iterator,cb);    
        });
        if(ec){throw boost::system::system_error(ec);}

        // Connection was successful, send request
        std::tie(ec,std::ignore) = helper.await<write_handler>(
            [&](write_handler::callback_type cb){
                boost::asio::async_write(socket_,request_,cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Read the response status line
        std::tie(ec,std::ignore) = helper.await<read_handler>(
            [&](read_handler::callback_type cb){
                boost::asio::async_read_until(socket_,response_,"\r\n",cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Check that the response is OK
        std::istream response_stream(&response_);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
        {
            std::cout << "Invalid response\n";
            return;
        }
        if (status_code != 200)
        {
            std::cout << "Response returned with status code ";
            std::cout << status_code << "\n";
            return;
        }

        // Read the response headers, which are terminated by a blank line.
        std::tie(ec,std::ignore) = helper.await<read_handler>(
           [&](read_handler::callback_type cb){
                boost::asio::async_read_until(socket_, response_, "\r\n\r\n",cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Process the response headers.
        std::istream response_stream2(&response_);
        std::string header;
        while (std::getline(response_stream2, header) && header != "\r")
            std::cout << header << "\n";
        std::cout << "\n";

        // Write whatever content we already have to output.
        if (response_.size() > 0)
            std::cout << &response_;

        // Continue reading remaining data until EOF.
        bool done = false;
        while(!done){

            std::tie(ec,std::ignore) = helper.await<read_handler>(
                [&](read_handler::callback_type cb){ 
                    boost::asio::async_read(socket_, response_,
                        boost::asio::transfer_at_least(1), cb);         
            });
            if(ec && ec != boost::asio::error::eof){
                throw boost::system::system_error(ec);
            }
            done = (ec == boost::asio::error::eof);
            // Write all of the data so far
            std::cout << &response_;
        }
   });
}

Discussion

Notice how we can have the code all in one function instead of spreading it out, and can read it with a single scan instead of jumping to the handler then back. The magic happens in await. Let's look at a single call
// Connection was successful, send request
std::tie(ec,std::ignore) = helper.await<write_handler>(
        [&](write_handler::callback_type cb){
            boost::asio::async_write(socket_,request_,cb);
});

helper.await takes a template parameter to specify what handler to use. Handlers are defined in namespace asio_helper::handlers. Await takes a single function parameter that consists of a lambda. The lambda takes a parameter of write_handler::callback_type. If we were using a read_handler, it would be read_handler::callback_type and so on.

helper.await returns whatever parameters were passed into the callback handler as a single value, pair,or tuple depending on the number of parameters in the handler. A read_handler has boost::system::error_code ec and std::size_t bytes_transferred as parameters so it returns an std::pair. We then can use std::tie to get the error code and ignore the bytes transferred.

The await function calls the asynchronous Boost.Asio function and then uses Boost.Coroutine to suspend our function and "return" to the calling function. Meanwhile the callback_type is a special function object that when called by Boost.Asio uses Boost.Coroutine to resume our function.
The cpp_async_await library defines handlers for the following Boost.Asio handler types in namespace asio_helper::handlers:
  • read_handler for ReadHandler
  • write_handler for WriteHandler
  • completion_handler for CompletionHandler
  • accept_handler for AcceptHandler
  • composed_connect_handler for ComposedConnectHandler
  • connect_handler for ConnectHandler
  • resolve_handler for ResolveHandler
  • wait_handler for WaitHandler
  • signal_handler for SignalHandler
  • ssl_handshake_handler for HandshakeHandler
  • ssl_shutdown_handler for ShutdownHandler

The handlers allow async_helper::await to return as a value,pair, or tuple whatever values are passed to the callback function.

The code is at https://github.com/jbandela/cpp_async_await/

It is a header only library. For Boost.Asio you need to include asio_helper.hpp. You will need to link to boost_system and boost_context libraries. The code will compile on Windows with MSVC 2012 and on Linux with gcc 4.7.2. You need Boost version 1.53 as that is the version that has Coroutine.

There is also support for Microsoft PPL and PPLx. Include ppl_helper.hpp and pplx_helper.hpp. Due to PPL and PPLx being different from Boost.Asio, there are a few minor changes in how you use the library with PPL and PPLx.

Thanks for taking the time to read this. Download the code and take a look at it and play around with it. Let me know what you think. Next time we will talk about using this library with PPL and PPLx

-John Bandela

5 comments:

  1. Great post. Addition of coroutine/context in BOOST is a big deal imho but somehow still goes largely unnoticed.

    ReplyDelete
  2. To create your programing assignments and learn programing just visit this website. http://clanguageprogrames.blogspot.com/

    ReplyDelete