Showing posts with label CodeProject. Show all posts
Showing posts with label CodeProject. Show all posts

Tuesday, April 30, 2013

C# style async/await in C++ - Part 2 Using with Microsoft PPL/PPLX

Last time we talked a little about asynchrony and about the cpp_async_await project. The previous article is located at http://jrb-programming.blogspot.com/2013/04/c-style-asyncawait-in-c-part-1.html. All code for the project is located at https://github.com/jbandela/cpp_async_await/. We talked about how to use the library with Boost.Asio.

As mentioned before the other major C++ library is Microsoft PPL/PPLX (PPLX is the cross platform port of PPL by Microsoft Casablanca Project) You can obtain PPLX and the documentation at http://casablanca.codeplex.com/ along with a host of other really neat stuff such as an http client, json library, etc. From here on out, unless specified otherwise, you can take what I say about PPL and assume that it applies to PPLX.

While Boost.Asio uses a callback model, PPL/PPLX uses a continuation model. The key class is

template < typename _Type>
class task;

_Type specifies the type of value produced by the task and it can be void. Task is very similar to std::future with the addition of the .then method. Whereas std::future has a .get method which blocks until the future is complete, the .then method allows a lambda to be specified which will be called when the task is complete. You can read more about PPL tasks at http://msdn.microsoft.com/en-us/library/dd492427(v=vs.110).aspx.

Here is an example of how to use tasks and continuations taken from the above link

// basic-continuation.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    {
        return 42;
    });

    t.then([](int result)
    {
        wcout << result << endl;
    }).wait();

    // Alternatively, you can chain the tasks directly and 
    // eliminate the local variable. 
    /*create_task([]() -> int
    {
        return 42;
    }).then([](int result)
    {
        wcout << result << endl;
    }).wait();*/
}

/* Output:
    42
*/

This is actually pretty neat and it is easier to chain tasks than in Boost.Asio.

There is currently a proposal to add .then to std::future. You can find the proposal at http://isocpp.org/files/papers/N3558.pdf

However, it gets hard to use once you need to do anything in a loop. Due to this, along with other reasons, there is a proposal to add resumable functions to the C++ standard. You can find the paper at http://isocpp.org/files/papers/N3564.pdf

Here is one of the motivating example from that paper. Note, they are using a future with the .then continuations just like a PPL task currently

auto write =
    [&buf](future<int> size) -> future<bool> 
{ 
    return streamW.write(size.get(), buf).then(
        [](future<int> op){ return op.get() > 0; });
};
auto flse = [](future<int> op){ return 
    future::make_ready_future(false);};
auto copy = do_while(
    [&buf]() -> future<bool> 
{ 
    return streamR.read(512, buf)
        .choice(
        [](future<int> op){ return op.get() > 0; }, write, flse);
});

The code asynchronously reads a stream 512 bytes at a time until no more bytes are read, while asynchronously writing what was read. Here is how the code looks with the proposed C++ language additions. Note that resumable marks a function as resumable and await suspends the function and then resumes the function when the awaited future(task) is complete returning the value generated by the task that was awaited.

int cnt = 0;
do 
{
cnt = await streamR.read(512, buf);
if ( cnt == 0 ) break;
cnt = await streamW.write(cnt, buf);
} while (cnt > 0);

Notice how much easier to follow the code is with the language additions. The downside is you will have to wait for the proposal to be approved, become part of a standard, and for your compiler to implement it.

The good news is you can have much of the same convenience using cpp_async_await now. First a motivating example. In the Casablanca REST SDK that provides PPLX, there is an example of asynchronously searching a file for lines which contain a some string and writing them asynchronously to another file. You can find the code at http://casablanca.codeplex.com/SourceControl/changeset/view/040c323727ca7747beb254ecf2b8eac73632f3be#Release/collateral/Samples/SearchFile/searchfile.cpp. We are using PPLX because it is a bit easier to have a real example with a commandline app. You would use PPL tasks in the same way as PPLX tasks.

#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>

using namespace utility;
using namespace concurrency::streams;

/// <summary>
/// A convenient helper function to loop asychronously until a condition is met.
/// </summary>
pplx::task<bool> _do_while_iteration(std::function<pplx::task<bool>(void)> func)
{
    pplx::task_completion_event<bool> ev;
    func().then([=](bool guard)
    {
        ev.set(guard);
    });
    return pplx::create_task(ev);
}
pplx::task<bool> _do_while_impl(std::function<pplx::task<bool>(void)> func)
{
    return _do_while_iteration(func).then([=](bool guard) -> pplx::task<bool>
    {
        if(guard)
        {
            return ::_do_while_impl(func);
        }
        else
        {
            return pplx::task_from_result(false);
        }
    });
}
pplx::task<void> do_while(std::function<pplx::task<bool>(void)> func)
{
    return _do_while_impl(func).then([](bool){});
}

/// <summary>
/// Structure used to store individual line results.
/// </summary>
typedef std::vector<std::string> matched_lines;
namespace Concurrency { namespace streams {
/// <summary>
/// Parser implementation for 'matched_lines' type.
/// </summary>
template <typename CharType>
class _type_parser<CharType, matched_lines>
{
public:
    static pplx::task<matched_lines> parse(streambuf<CharType> buffer)
    {
        basic_istream<CharType> in(buffer);
        auto lines = std::make_shared<matched_lines>();
        return do_while([=]()
        {
            container_buffer<std::string> line;
            return in.read_line(line).then([=](const size_t bytesRead)
            {
                if(bytesRead == 0 && in.is_eof())
                {
                    return false;
                }
                else
                {
                    lines->push_back(std::move(line.collection()));
                    return true;
                }
            });
        }).then([=]()
        {
            return matched_lines(std::move(*lines));
        });
    }
};
}}
/// <summary>
/// Function to create in data from a file and search for a given string writing all lines containing the string to memory_buffer.
/// </summary>
static pplx::task<void> find_matches_in_file(const string_t &fileName, const std::string &searchString, basic_ostream<char> results)
{
    return file_stream<char>::open_istream(fileName).then([=](basic_istream<char> inFile)
    {           
        auto lineNumber = std::make_shared<int>(1);
        return ::do_while([=]()
        {
            container_buffer<std::string> inLine;
            return inFile.read_line(inLine).then([=](size_t bytesRead)
            {
                if(bytesRead == 0 && inFile.is_eof())
                {
                    return pplx::task_from_result(false);
                }

                else if(inLine.collection().find(searchString) != std::string::npos)
                {
                    results.print("line ");
                    results.print((*lineNumber)++);
                    return results.print(":").then([=](size_t)
                    {
                        container_buffer<std::string> outLine(std::move(inLine.collection()));
                        return results.write(outLine, outLine.collection().size());
                    }).then([=](size_t)
                    {
                        return results.print("\r\n");
                    }).then([=](size_t)
                    {
                        return true;
                    });
                }

                else
                {
                    ++(*lineNumber);
                    return pplx::task_from_result(true);
                }
            });
        }).then([=]()
        {
            // Close the file and results stream.
            return inFile.close() && results.close();
        });
    })

    // Continution to erase the bool and return task of void.
    .then([](std::vector<bool>) {});
}

/// <summary>
/// Function to write out results from matched_lines type to file
/// </summary>
static pplx::task<void> write_matches_to_file(const string_t &fileName, matched_lines results)
{
    // Create a shared pointer to the matched_lines structure to copying repeatedly.
    auto sharedResults = std::make_shared<matched_lines>(std::move(results));

    return file_stream<char>::open_ostream(fileName, std::ios::trunc).then([=](basic_ostream<char> outFile)
    {
        auto currentIndex = std::make_shared<size_t>(0);
        return ::do_while([=]()
        {
            if(*currentIndex >= sharedResults->size())
            {
                return pplx::task_from_result(false);
            }

            container_buffer<std::string> lineData((*sharedResults)[(*currentIndex)++]);
            outFile.write(lineData, lineData.collection().size());
            return outFile.print("\r\n").then([](size_t)
            {
                return true;
            });
        }).then([=]()
        {
            return outFile.close();
        });
    })

    // Continution to erase the bool and return task of void.
    .then([](bool) {});
}

#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
    if(argc != 4)
    {
        printf("Usage: SearchFile.exe input_file search_string output_file\n");
        return -1;
    }
    const string_t inFileName = args[1];
    const std::string searchString = utility::conversions::to_utf8string(args[2]);
    const string_t outFileName = args[3];
    producer_consumer_buffer<char> lineResultsBuffer;

    // Find all matches in file.
    basic_ostream<char> outLineResults(lineResultsBuffer);
    find_matches_in_file(inFileName, searchString, outLineResults)

    // Write matches into custom data structure.
    .then([&]()
    {
        basic_istream<char> inLineResults(lineResultsBuffer);
        return inLineResults.extract<matched_lines>();
    })

    // Write out stored match data to a new file.
    .then([&](matched_lines lines)
    {
        return write_matches_to_file(outFileName, std::move(lines));
    })

    // Wait for everything to complete.
    .wait();

    return 0;
}

Notice how painful iteration is. Now here is the code using cpp_await_async pplx_helper. Just a quick note. The code above first copies the matching lines into a producer_consumer_buffer and then into a vector and then to the output file. My code copies into the producer_consumer_buffer and then uses that buffer to copy to output. I think, my code achieves the same level of concurrency as the example program. If I am incorrect in this, please let me know in the comments below. You can find the whole file at https://github.com/jbandela/cpp_async_await/blob/master/PplxExample2.cpp

#include "pplx_helper.hpp"
#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>

using namespace utility;
using namespace concurrency::streams;



#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
    if(argc != 4)
    {
        printf("Usage: PplxExample2 input_file search_string output_file\n");
        return -1;
    }
    const string_t inFileName = args[1];
    const std::string searchString = utility::conversions::to_utf8string(args[2]);
    const string_t outFileName = args[3];
    producer_consumer_buffer<char> lineResultsBuffer;

    // Find all matches in file.
    basic_ostream<char> outLineResults(lineResultsBuffer);

    auto reader = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
        auto inFile = helper.await(file_stream<char>::open_istream(inFileName));
        int lineNumber = 1;
        bool done = false;
        while(!done){
            container_buffer<std::string> inLine;
            auto bytesRead = helper.await(inFile.read_line(inLine));
            if(bytesRead==0 && inFile.is_eof()){
                done = true;
            }
            else if(inLine.collection().find(searchString) != std::string::npos){
                helper.await(outLineResults.print("line "));
                helper.await(outLineResults.print(lineNumber++));
                helper.await(outLineResults.print(":"));
                container_buffer<std::string> outLine(std::move(inLine.collection()));
                helper.await(outLineResults.write(outLine,outLine.collection().size()));
                helper.await(outLineResults.print("\r\n"));
            }
            else{
                ++lineNumber;
            }

        }
        helper.await(inFile.close() && outLineResults.close());
    });

    auto writer = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
        basic_istream<char> inLineResults(lineResultsBuffer);
        auto outFile = helper.await(file_stream<char>::open_ostream(outFileName,std::ios::trunc));
        auto currentIndex = 0;
        bool done = false;
        while(!done){
            container_buffer<std::string> lineData;
            auto bytesRead = helper.await(inLineResults.read_line(lineData));
            if(bytesRead==0 && inLineResults.is_eof()){
                done = true;
            }
            else{
                container_buffer<std::string> lineDataOut(std::move(lineData.collection()));
                helper.await(outFile.write(lineDataOut,lineDataOut.collection().size()));
                helper.await(outFile.print("\r\n"));
            }
        }
        helper.await(inLineResults.close() && outFile.close());

    });


    try{
    // Wait for everything to complete and catch any exceptions
    (reader && writer).wait();

    }
    catch(std::exception& e){
        std::cerr << e.what();
    }

    return 0;
}

Notice how we can easily do iteration. The library is pretty similar to what can be achieved with the language additions. Instead of of resumable to mark a function as resumable, we use pplx_helper::do_async which takes a lambda. The lambda takes a single parameter of pplx_helper::async_helper<void>. If the lambda were to return an int for example it would take pplx_helper::async_helper<int> . In general a lambda return type T takes pplx_helper::async_helper<T>. In the case of the example code the parameter is named helper. In the language proposal you use the unary await keyword to suspend the function until a task is complete and then resume the function returning the value generated by the task were were awaiting. In your code we call helper.await on the task you want to await. helper.await provides pretty much the same convenience as the language keyword await.

You can use the same syntax to work with PPL tasks by using namespace ppl_helper. In summary for PPLX(Project Casablanca) use namespace pplx_helper and for PPL (Shipped with Visual C++ on Windows) use ppl_helper

This functionality is packaged up for you at https://github.com/jbandela/cpp_async_await. It is licensed under the Boost Software License which allows usage for both open source and commercial applications. It is a header only library and does not need to be built, but it does depend on Boost.Coroutine and needs to be linked to the boost_context library. The library has been tested with Visual C++ 2012 on Windows, and G++ 4.7.2 on Fedora Linux.

I hope you have enjoyed this discussion. Download the code and try it out, and let me know what you think. If people are interested, I will talk in a future post about how the library actually works.

Thanks,

John Bandela

Thursday, December 20, 2012

Easy Binary Compatible Interfaces Across Compilers in C++ - Part 0 of n: Introduction and a Sneak Preview


The problem of using a C++ library compiled with Compiler A, from a program compiled with Compiler B has been a problem for a while. This is especially true on Windows where Visual C++ generally breaks binary compatibility from release to release. Shipping a library for Windows involves shipping several versions for Visual C++ as well now often for mingw gcc.
Some of the problems C++ has in regards to binary compatibility across different compilers are:name mangling,object layout, exception support.
There are several ways to get around this.
There are whole books written on COM, so I won’t try to go into too many details. A brief overview in regards to the binary interface is here.
The basic idea is that you define an interface like this
Interface Definition
  1. struct Interface;
  2. struct InterfaceVtable{
  3.     int (*Function1)(struct Interface*);
  4.     int (*Function2)(struct Interface*, int);
  5. };
  6.  
  7. struct Interface{
  8.     struct InterfaceVtable* pTable;
  9.  
  10. };
It can be used like this
Using an Interface
  1. struct Interface* pInterface = GetInterfaceSomehow();
  2. int a = pInterface->pTable->Function1(pInterface);
Implementing an interface like this is painful and will be left as an exercise to the reader Smile.
Fortunately, (and by design), Microsoft Visual C++ and most Windows C++ compilers will generate something compatible to the above with an abstract base class using pure virtual functions.
Inteface using C++ (MSVC)
  1. struct InterfaceCpp{
  2.     virtual int Function1() = 0;
  3.     virtual int Function2(int) = 0;
  4. };
You can implement and use like this
Code Snippet
  1. struct InterfaceImplementation:public InterfaceCpp{
  2.     virtual int Function1(){return 5;}
  3.     virtual int Function2(int i){return 5 + i;}
  4. };
  5.  
  6. InterfaceImplementation imp;
  7. InterfaceCpp* pInterfaceCpp = &imp;
  8. std::cout << pInterfaceCpp->Function2(5) << std::endl;
The reason for this, is that the version with function pointers was doing a vtable and a vptr by hand and this version is letting the compiler do it. For more information about vtable and vptr see the excellent article by Dan Saks in Dr. Dobbs.
While the above solution works on Windows (generally), this is not guaranteed to always work A more general cross-platform solution is presented in Matthew Wilson’s Imperfect C++ in chapters 7 and 8. He basically provides a way and macros that allow you to define the above structure manually (ie define your own vtables).
By using either COM style interfaces with compilers that have a compatible vtable layout or rolling your own, you can have cross-compiler binary compatible interfaces.However, you do not have
  • Exceptions
  • Due to not having exceptions, you often have to use error codes and thus do not have real return values.
  • Standard C++ types such as vector and string (use arrays and const char*)
In fact, in an article explaining why Microsoft created C++/CX Jim Springfield stated one of the problems with COM even with libraries such ATL was
“There is no way to automatically map interfaces from low-level to a higher level (modern) form that throws exceptions and has real return values.”
During this series of posts, I will discuss the development of a C++11 library that has the following benefits
  • Able to use std::string and std::vector as function parameters and return values
  • Use exceptions for error handling
  • Compatible across compilers – able to use MSVC to create.exe and g++ to create .dll on Windows, and g++ for executable and clang++ to create .so on Linux
  • Works on Linux and Windows
  • Written in Standard C++11
  • No Macro magic
  • Header only library
As we progress we will talk about some of the disadvantages and areas for improvements and possible alternatives
Here is how we would define an interface DemoInterface. Note jrb_interface is the namespace of the library.
Code Snippet
  1. using namespace jrb_interface;
  2.  
  3. template<bool b>
  4. struct DemoInterface
  5. :public define_interface<b,4>
  6. {
  7.     cross_function<DemoInterface,0,int(int)> plus_5;
  8.  
  9.     cross_function<DemoInterface,1,int(std::string)> count_characters;
  10.  
  11.     cross_function<DemoInterface,2,std::string(std::string)> say_hello;
  12.  
  13.     cross_function<DemoInterface,3,std::vector<std::string>(std::string)>
  14.         split_into_words;
  15.  
  16.     template<class T>
  17.     DemoInterface(T t):DemoInterface<b>::base_t(t),
  18.         plus_5(t), count_characters(t),say_hello(t),split_into_words(t){}
  19. };
.
In this library, all interfaces are actually templates that take a bool parameter. The reason for this will become clear as we discuss the implementation in later posts.
All interfaces inherit from define_interface which takes a bool parameter (just use the bool passed in to the template) and an int parameter specifying how many functions are in the interface.  If you pass in a too small number, you will get a static_assert telling you that the number is too small.
To define a function in the interface, use the cross_function template
The first parameter is the interface in this case DemoInterface. The second parameter is the 0 based position of the function. The first function is 0, the second is 1, the third 2, etc. The third and final parameter of cross_function is the signature of the function is the name style as std::function.
Finally all interfaces need a templated constructor that takes a value t and passes it on to the base class as well as each function. For convenience the define_interface template defines a typedef base_t that you can use in your constructor initializer.
To implement an interface you would do this
Code Snippet
  1. struct DemoInterfaceImplemention:
  2.     public implement_interface<DemoInterface>{
  3.  
  4.         DemoInterfaceImplemention(){
  5.  
  6.             plus_5 = [](int i){
  7.                 return i+5;
  8.             };
  9.  
  10.             say_hello = [](std::string name)->std::string{
  11.                 return "Hello " + name;
  12.             };
  13.  
  14.             count_characters = [](std::string s)->int{
  15.                 return s.length();
  16.             };
  17.  
  18.             split_into_words =
  19.                 [](std::string s)->std::vector<std::string>{
  20.                     std::vector<std::string> ret;
  21.                     auto wbegin = s.begin();
  22.                     auto wend = wbegin;
  23.                     for(;wbegin!= s.end();wend = std::find(wend,s.end(),' ')){
  24.                         if(wbegin==wend)continue;
  25.                         ret.push_back(std::string(wbegin,wend));
  26.                         wbegin = std::find_if(wend,s.end(),
  27.                             [](char c){return c != ' ';});
  28.                         wend = wbegin;
  29.                     }
  30.                     return ret;
  31.             };
  32.  
  33.         }
  34. };
To implement an interface, you derive from implement_interface specifying your Interface as the template parameter. Then in your constructor you assign a lambda with the same signature you specified in the definition of the interface to each of the cross_function variables.
To use an interface, you construct use_interface providing the Interface as the template parameter.
Code Snippet
  1. // Assume iDemo is defined as follows
  2. // use_interface<DemoInterface> iDemo = ...
  3. int i = iDemo.plus_5(5);
  4.  
  5. int count = iDemo.count_characters("Hello World");
  6.  
  7. std::string s =  iDemo.say_hello("John");
  8.  
  9. std::vector<std::string> words = iDemo.split_into_words("This is a test");
You then call the functions just as you would with any class object. Note the use of . instead of –>
Thank you taking the time to read this post. I hope this has piqued your interest. In future posts we will explore how we create this library, and how we can extend this library to do more. I hope you will join me.
You can find compilable code at
https://github.com/jbandela/cross_compiler_call
The code has been tested on
  • Windows with compiling the executable with MSVC 2012 Milan (Nov CTP) and the DLL with mingw g++ 4.7.2
  • Ubuntu 12.10 with compiling the executable with g++ 4.7.2 and the .so file with clang++ 3.1
Instructions on how to compile are included in the README.txt file.
Please let me know what you think in the comments section
- John Bandela