C++ Actor Framework — Dev Blog

An Open Source implementation of the actor model in C++

Testing Brokers

A major benefit of actors is that they are very easy to test. No complex mocking, no simulation, simply sending and receiving messages. Unfortunately this does not apply to brokers, a special type of actors performing I/O. Sending messages still works, of course, but testing brokers involves the I/O part as opposed to sending messages. Does your actor parse its input correctly? Does it close connections appropriately? This blog post describes how CAF supports unit testing of the I/O aspect outside the message passing realm.

A key concern of unit testing is reproducibility. When writing a test, you want to make sure to be in charge of each step individually. This is generally not the case when performing I/O operations, since this involves components of the operating system, e.g., sockets and the TCP stack. Fortunately, brokers in CAF never talk to the OS directly, which allows us to trick them into a fake environment.

Brokers

Before we talk about how to take control of I/O, we first need to understand how the machinery behind brokers works.

brokers

The UML diagram above shows the relations a brokers has with I/O-related classes in CAF. The middleman is a singleton in CAF that provides access to various I/O-related functionality. The three classes that are responsible for doing I/O are multiplexer, scribe, and doorman.

Scribes

A scribe decouples a broker from the underlying networking APIs. To the broker, it provides access to an output buffer and the scribe will create new_data_msg messages for the broker whenever the scribe has received enough data according to the read policy set by the broker (via configure_read). Note that the user-facing API of the broker does not expose the scribe to the programmer. All the broker needs to worry about is a connection_handle. This handle simply identifies the corresponding scribe and forwards the operation.

For example, if you call self->configure_read(hdl, receive_policy::at_most(1024)); on a broker, it looks which scribe manages hdl and forwards the second argument to this scribe.

It is also worth mentioning that the new_data_msg received by the broker will always contain the same buffer. The scribe merely re-writes its content over and over again.

Doormen

A doorman, just like a scribe, decouples the broker from underlying networking APIs. Whenever a new network connection has been established, the doorman generates a new_connection_msg for the broker.

Multiplexer

The multiplexer is an I/O loop and a factory for scribes and doormen. If you want to change which networking API CAF is using, this is the (abstract) class you need to implement. It has no member functions in the UML diagram for brevity, but here are the important ones we need to know:

class multiplexer {
public:
  /// Assigns an unbound scribe identified by `hdl` to `ptr`.
  /// @warning Do not call from outside the multiplexer's event loop.
  virtual void assign_tcp_scribe(abstract_broker* ptr,
                                 connection_handle hdl) = 0;

  /// Assigns an unbound doorman identified by `hdl` to `ptr`.
  /// @warning Do not call from outside the multiplexer's event loop.
  virtual void assign_tcp_doorman(abstract_broker* ptr, accept_handle hdl) = 0;

  /// Simple wrapper for runnables
  struct runnable;

  using runnable_ptr = intrusive_ptr<runnable>;

  /// Runs the multiplexers event loop.
  virtual void run() = 0;

  /// Invokes @p fun in the multiplexer's event loop, calling `fun()`
  /// immediately when called from inside the event loop.
  /// @threadsafe
  template <class F>
  void dispatch(F fun);

  /// Invokes @p fun in the multiplexer's event loop, forcing
  /// execution to be delayed when called from inside the event loop.
  /// @threadsafe
  template <class F>
  void post(F fun);

  // ...
};

Calling multiplexer::assign_tcp_scribe will create a new scribe (for TCP-like communication) and assign this scribe to the given broker. The function assign_tcp_doorman does the same thing for doormen. If you have ever used ASIO, run, dispatch, and post will remind you of ASIO's io_service. And you are right. In fact, CAF's asio_multiplexer is simply implemented using an io_service. The function run is called in a thread crated by the middleman on startup. Whenever a broker receives a message from other actors, it calls post to schedule handling the message for later by creating a runnable for this task.

Faking I/O

For testing brokers without actually doing I/O, CAF has a multiplexer implementation that is solely meant for testing (since 0.14.1):

class test_multiplexer : public multiplexer {
public:
  /// A buffer storing bytes.
  using buffer_type = std::vector<char>;

  /// Models pending data on the network, i.e., the network 
  /// input buffer usually managed by the operating system.
  buffer_type& virtual_network_buffer(connection_handle hdl);

  /// Returns the output buffer of the scribe identified by `hdl`.
  buffer_type& output_buffer(connection_handle hdl);

  /// Returns the input buffer of the scribe identified by `hdl`.
  buffer_type& input_buffer(connection_handle hdl);

  /// Returns `true` if this handle has been closed
  /// for reading, `false` otherwise.
  bool& stopped_reading(connection_handle hdl);

  /// Returns `true` if this handle has been closed
  /// for reading, `false` otherwise.
  bool& stopped_reading(accept_handle hdl);

  /// Stores `hdl` as a pending connection for `src`.
  void add_pending_connect(accept_handle src, connection_handle hdl);

  /// Accepts a pending connect on `hdl`.
  void accept_connection(accept_handle hdl);

  /// Reads data from the external input buffer until
  /// the configured read policy no longer allows receiving.
  void read_data(connection_handle hdl);

  /// Appends `buf` to the virtual network buffer of `hdl`
  /// and calls `read_data(hdl)` afterwards.
  void virtual_send(connection_handle hdl, const buffer_type& buf);

  /// Waits until a `runnable` is available and executes it.
  void exec_runnable();

  /// Returns `true` if a `runnable` was available, `false` otherwise.
  bool try_exec_runnable();

  /// Executes all pending `runnable` objects.
  void flush_runnables();

  // ...
};

The test_multiplexer has several member function that "fake" network events and allow you to manipulate the buffers of scribes directly. In particular, virtual_send allows you to fake incoming data on a connection_handle. This will cause the scribe to generate one or more new_data_msg messages (depending on the configured receive policy). Those messages are handled by the broker immediately.

To simulate a remote connection, one needs to create a pending connection using add_pending_connect and cause the corresponding doorman to accept it via accept_connection.

Using the test multiplexer requires calling set_middleman(new network::test_multiplexer) in main, before calling any I/O-related function in CAF. Before showing the test multiplexer in action, we first implement implement a broker we want to test.

Example Application

The application we are going to test is a simplistic HTTP server. We do not want to bother with actually parsing HTTP, so we always only consider the first line of a HTTP header and check if it is equal to "GET / HTTP/1.1". If we receive anything else, we will send a 404 as response.

However, we are going to deal with chunked input and we (pedantically) require "\n\r" for line breaks. We are not going to generate HTTP dynamically. Instead, we use the following constants:

constexpr char http_valid_get[] = "GET / HTTP/1.1";

constexpr char http_get[] = "GET / HTTP/1.1\r\n"
                            "Host: localhost\r\n"
                            "Connection: close\r\n"
                            "Accept: text/plain\r\n"
                            "User-Agent: CAF/0.14\r\n"
                            "Accept-Language: en-US\r\n"
                            "\r\n";

constexpr char http_ok[] = "HTTP/1.1 200 OK\r\n"
                           "Content-Type: text/plain\r\n"
                           "Connection: close\r\n"
                           "Transfer-Encoding: chunked\r\n"
                           "\r\n"
                           "d\r\n"
                           "Hi there! :)\r\n"
                           "\r\n"
                           "0\r\n"
                           "\r\n"
                           "\r\n";

constexpr char http_error[] = "HTTP/1.1 404 Not Found\r\n"
                              "Connection: close\r\n"
                              "\r\n";

constexpr char newline[2] = {'\r', '\n'};

When receiving input chunks, we need to keep track of the state of our parser. We will also store each received line in a vector of strings and make use of CAF's stateful actors, as shown below.

enum parser_state {
  receive_new_line,
  receive_continued_line,
  receive_second_newline_half
};

struct http_state {
  http_state(abstract_broker* self) : self_(self) {
    // nop
  }

  ~http_state() {
    aout(self_) << "http worker finished with exit reason: "
                << self_->planned_exit_reason()
                << endl;
  }

  std::vector<std::string> lines;
  parser_state ps = receive_new_line;
  abstract_broker* self_;
};

using http_broker = caf::experimental::stateful_actor<http_state, broker>;

To make bookkeeping easier, we use one broker (our server) that accepts incoming connections and spawns a new HTTP worker per connection. The server does not need any additional state and is quite simple.

behavior server(broker* self) {
  aout(self) << "server up and running" << endl;
  return {
    [=](const new_connection_msg& ncm) {
      aout(self) << "fork on new connection" << endl;
      auto worker = self->fork(http_worker, ncm.handle);
    },
    others >> [=] {
      aout(self) << "unexpected: "
                 << to_string(self->current_message()) << endl;
    }
  };
}

And here is the implementation of our http_worker using the state from above.

behavior http_worker(http_broker* self, connection_handle hdl) {
  // tell network backend to receive any number of bytes between 1 and 128
  self->configure_read(hdl, receive_policy::at_most(128));
  return {
    [=](const new_data_msg& msg) {
      assert(! msg.buf.empty());
      assert(msg.handle == hdl);
      // extract lines from received buffer
      auto& lines = self->state.lines;
      auto i = msg.buf.begin();
      auto e = msg.buf.end();
      // search position of first newline in data chunk
      auto nl = std::search(i, e, std::begin(newline), std::end(newline));
      // store whether we are continuing a previously started line
      auto append_to_last_line = self->state.ps == receive_continued_line;
      // check whether our last chunk ended between \r and \n
      if (self->state.ps == receive_second_newline_half) {
        if (msg.buf.front() == '\n') {
          // simply skip this character
          ++i;
        }
      }
      // read line by line from our data chunk
      do {
        if (append_to_last_line) {
          append_to_last_line = false;
          auto& back = lines.back();
          back.insert(back.end(), i, nl);
        } else {
          lines.emplace_back(i, nl);
        }
        // if our last search didn't found a newline, we're done
        if (nl != e) {
          // skip newline and seek the next one
          i = nl + sizeof(newline);
          nl = std::search(i, e, std::begin(newline), std::end(newline));
        }
      } while (nl != e);
      // store current state of our parser
      if (msg.buf.back() == '\r') {
        self->state.ps = receive_second_newline_half;
        self->state.lines.pop_back(); // drop '\r' from our last read line
      } else if (msg.buf.back() == '\n') {
        self->state.ps = receive_new_line; // we've got a clean cut
      } else {
        self->state.ps = receive_continued_line; // interrupted in the middle
      }
      // we don't need to check for completion in any intermediate state
      if (self->state.ps != receive_new_line)
        return;
      // we have received the HTTP header if we have an empty line at the end
      if (lines.size() > 1 && lines.back().empty()) {
        auto& out = self->wr_buf(hdl);
        // we only look at the first line in our example and reply with our
        // OK message if we receive exactly "GET / HTTP/1.1", otherwise
        // we send a 404 HTTP response
        if (lines.front() == http_valid_get)
          out.insert(out.end(), std::begin(http_ok), std::end(http_ok));
        else
          out.insert(out.end(), std::begin(http_error), std::end(http_error));
        // write data and close connection
        self->flush(hdl);
        self->quit();
      }
    },
    [=](const connection_closed_msg&) {
      self->quit();
    },
    others >> [=] {
      aout(self) << "unexpected: "
                 << to_string(self->current_message()) << endl;
    }
  };
}

Our HTTP worker receives chunks of 128 bytes. Once it detected the end of the HTTP header (a blank line), it looks at the first line to see if it is equal to http_valid_get. If so, it sends the OK message, otherwise it sends the 404.

A minimal application using our brokers that always tries to open port 8080 is a three-liner:

int main() {
  spawn_io_server(server, 8080);
  await_all_actors_done();
  shutdown();
}

Testing the HTTP Broker

Without further ado, here is our complete unit test for the example application.

namespace {

class fixture {
public:
  fixture() {
    // note: the middleman will take ownership of mpx_, but using
    //       this pointer is safe at any point before calling `shutdown`
    mpx_ = new network::test_multiplexer;
    set_middleman(mpx_);
    // spawn the actor-under-test
    aut_ = spawn_io(server);
    // assign the acceptor handle to the AUT
    aut_ptr_ = static_cast<abstract_broker*>(actor_cast<abstract_actor*>(aut_));
    mpx_->assign_tcp_doorman(aut_ptr_, acceptor_);
    // "open" a new connection to our server
    mpx_->add_pending_connect(acceptor_, connection_);
    mpx_->assign_tcp_scribe(aut_ptr_, connection_);
    mpx_->accept_connection(acceptor_);
  }

  ~fixture() {
    anon_send_exit(aut_, exit_reason::kill);
    // run the exit message and other pending messages explicitly,
    // since we do not invoke any "I/O" from this point on that would
    // trigger those messages implicitly
    mpx_->flush_runnables();
    await_all_actors_done();
    shutdown();
  }

  // helper class for a nice-and-easy "mock(...).expect(...)" syntax
  class mock_t {
  public:
    mock_t(fixture* thisptr) : this_(thisptr) {
      // nop
    }

    mock_t(const mock_t&) = default;

    mock_t& expect(const std::string& what) {
      auto& buf = this_->mpx_->output_buffer(this_->connection_);
      CAF_REQUIRE((buf.size() >= what.size()));
      CAF_REQUIRE((std::equal(buf.begin(), buf.begin() + what.size(),
                              what.begin()));
      buf.erase(buf.begin(), buf.begin() + what.size()));
      return *this;
    }

    fixture* this_;
  };

  // mocks some input for our AUT and allows to
  // check the output for this operation
  mock_t mock(const char* what) {
    std::vector<char> buf;
    for (char c = *what++; c != '\0'; c = *what++)
      buf.push_back(c);
    mpx_->virtual_send(connection_, std::move(buf));
    return {this};
  }

  actor aut_;
  abstract_broker* aut_ptr_;
  network::test_multiplexer* mpx_;
  accept_handle acceptor_ = accept_handle::from_int(1);
  connection_handle connection_ = connection_handle::from_int(1);
};

} // namespace <anonymous>

CAF_TEST_FIXTURE_SCOPE(http_tests, fixture)

CAF_TEST(valid_response) {
  // write a GET message and expect an OK message as result
  mock(http_get).expect(http_ok);
}

CAF_TEST(invalid_response) {
  // write a GET with invalid path and expect a 404 message as result
  mock("GET /kitten.gif HTTP/1.1\r\n\r\n").expect(http_error);
}

CAF_TEST_FIXTURE_SCOPE_END()

The constructor of our fixture spawns the server that we are going to test. Since we need an actual pointer to the broker, we need to use actor_cast to get a pointer and downcast it afterwards. Using the pointer, we can setup a connection using the test multiplexer. These steps will create a new_connection_msg for the server that spawns a http_worker in response.

With the mock member function, we create a simple API that allows us to correlate inputs and outputs. The mock function is a simple wrapper around virtual_send and expect compares what output a broker has written in its output_buffer with the output we expect.

The test uses the unit test framework from CAF. Using a different test framework, e.g., Boost.Test, is straightforward. The complete source code can be found at GitHub.

Version 0.14 Released!

Version 0.14 of CAF has just been released. This release improves several CAF internals, fixes bugs (primarily of type-safe actors), adds an optional ASIO backend, and makes three changes to the API:

  • timed_sync_send has been removed (it offered an alternative way of defining message handlers, which is inconsistent with the rest of the API)
  • the policy classes broadcast, random, and round_robin in actor_pool were removed and replaced by factory functions using the same name
  • all functions that were deprecated with 0.10 were finally removed

We have also refined our policy regarding supporting compilers and improved our testing setup:

  • CAF always required a recent compiler and we are eager to adopt new language features and to catch up with C++14 (and C++17). On the other hand, we are aware that changing the compiler every few months is not acceptable for most users. As a compromise between progressivity and stability, we support versions of GCC and Clang that are up to two years old. Consequently, support for GCC 4.7 has been dropped with 0.14.

  • We are continuously working on increasing the number of distribution channels for CAF. At the moment, we provide a FreeBSD port, a Homebrew package, Biicode integration, and Linux packages via the OpenSuSE build service. You can find a full list with links etc. in the Get CAF Section of the README.

  • We have integrated several tools to constantly improve the code quality of CAF. Each pushed commit to develop is automatically tested via Jenkins on several operating systems and compilers. Our current setup includes:

    • Ubuntu (32 and 64 bit)
    • Fedora 22
    • Mac OS 10.10
    • Mac OS 10.9
    • FreeBSD 10

    Further, we test our code on GCC 4.8, GCC 4.9, GCC 5.1, Clang 3.5, and Clang 3.6. For static code analysis, we use Coverity and cppcheck. These tools are complemented with ASAN and LeakSanitizer.

  • In case you are waiting for MSVC support: we are currently testing on the latest RC from Microsoft. It appears that MSVC is not able to compile advanced match cases and libcaf_io does compile but its unit tests fail. Any feedback or help to get the IO parts working (or all parts of CAF compiling) is welcome. To check on the current progress please visit Issue #302 on GitHub.

Version 0.13 Released!

Version 0.13 of CAF has just been released. This release contains mostly improvements to core components, but also deprecates some parts of the API and removes previously deprecated parts.

No cppa Headers

The cppa headers were deprecated with 0.9 and are now finally removed. In case you were using the old cppa::options_description API, you can migrate to the new API based on message::extract (see Section 18.4 of the Manual).

IPv6 Support

You can now use IPv6 addresses for remote_actor.

Deprecate last_dequeued and last_sender

Version 0.13 slightly changes the semantics of last_dequeued and last_sender. Both functions will now cause undefined behavior (dereferencing a nullptr) instead of returning dummy values when accessed from outside a callback or after forwarding the current message. Besides, the function names were not a good choice in the first place, since "last" implies accessing data received in the past. As a result, both functions are now deprecated. Their replacements are named current_message and current_sender.

Pattern Matching Revised

The pattern matching engine no longer accesses the RTTI system of CAF. This means that you no longer have to announce your messages as long as you use only the core component of CAF for in-process messaging and not use to_string. The main reason for this change was to reduce the amount of template instantiations and to make CAF more debugger-friendly. For example, consider the following simple CAF program:

Prior to 0.13, this produces the following three stack traces (using c++filt and removing unwanted parts such as binary name):

If you ever ran into a message handler with your debugger of choice, you had to filter through a lot of templates and CAF internals.

Starting with 0.13 (and applying the same transformations), you will get these stack traces instead:

There are still a few templates left, but overall it is much easier to understand what is going on. Whenever you pass a lambda without any prefix, you will get a trivial_match_case. Using the "on(...) >> ..." notation results in an advanced_match_case. Lastly, "others() >> ..." will produce a catch_all_match_case. The function apply_args takes a function or function object, a (compile-time) list of indexes, and a tuple. It then simply gets all values using the given indexes from the tuple and calls the function object. The lfinvoker is a simple helper that returns unit whenever your message handler would return void. Of course you will see a bit more CAF internals when using event-based actors, but the old match_case madness is gone for good.