C++ Actor Framework — Dev Blog

An Open Source implementation of the actor model in C++

Spotlight: Bro

For our second article in the spotlight series, we want to higlight the popular open source network monitor Bro and talk to the core development team in Berkeley, California.

Background: Bro(ker)

Official Bro Logo


Bro is an open source network analysis framework, well grounded in 15 years of research. While focusing on network security monitoring, Bro provides a comprehensive platform for more general network traffic analysis as well. Bro’s user community includes major universities, research labs, supercomputing centers, open-science communities, and has an established foothold in the industry. In addition to organize user conferences and providing online resources for learning and using Bro, the creators of Bro also offer enterprise-level commercial support.

With version 2.4, the Bro development team integrated a new communication library based on CAF called Broker (not to be confused with brokers in CAF). Currently, this new communication layer is shipped as beta release and needs to be enabled explicitly during build. Broker will become a mandatory dependency in future Bro versions and replace the current communication and serialization system.

In this interview, we have the pleasure to talk to the core development and research team primarily located at the International Computer Science Institute (ICSI) in Berkeley, California.

The Interview

CAF Team: Dear Bro team, thank you for giving us the opportunity for this interview. The original paper describing Bro was released in 1999. This is a very long time for a software code base and clearly speaks for its quality. What is your secret?

Bro Team: When Vern Paxson created the initial version Bro, he designed the system out of practical experience and—as we can say in hindsight—came up with the right abstractions in the system architecture, which at its core hasn’t changed since.

We also carefully try to avoid feature creep and only add new functionality when finding convincing use cases, which often involves discussion. While this sometimes comes at the of cost development speed, it often helps us to make strategically viable decisions in the long term.

CAF Team: Bro is a security-critical software and network operators around the world rely on it. Changing a key component like the communication system is not something that you do without having a good reason. What was your motivation to start developing a new communication backend from scratch?

Bro Team: To keep monitoring the ever-growing network uplinks at line rate today (10, 40, or even 100 Gbps), the traffic needs to be distributed across several physical systems. Logs are sent back to a single system, where they need to be quickly and reliably written out. In addition, certain types of analysis are distributed across all of the nodes (e.g. scan detection) and stress the communications framework.

Our existing communication framework starts to show its age: there exist race conditions with synchronization of high-level state, it lacks a persistence component, the pub/sub data flow cannot be controlled in a fine-grained way, the implementation is complex and inefficient, and we have to maintain two independent implementations: there exists a separate C library (Broccoli) to allow external applications to interact with Bro.

CAF Team: What goals did you set for the new communication system? What do you want to achieve with the system in the long run?

Bro Team: Ultimately we want to “crack open” the boundaries of what constitutes Bro and who can communicate with it. We also want to consolidate our existing implementation into a single library, which both Bro and external applications can use. In the past, Bro spawned a separate child process for its communication. Now we’re just linking against a shared library—Broker—which provides a uniform API to communication in the Bro ecosystem.

We want to create a platform where it’s easy to communicate with Bro instances, reconfigure them on the fly, and change their state not just from within Bro, but from anywhere. Moreover, we aim to provide a more flexible pub/sub event communication infrastructure.

CAF Team: Many challenges in software development arise in the implementation step and are not obvious in the initial design phase. What were the core issues and challenges you faced when implementing Broker and how did CAF contribute to your solution?

Bro Team: From a high level, ripping out a communication infrastructure and replacing it with a new one, without users even noticing, is a big challenge. This means we have to be able to incrementally deploy Broker. We already shipped Broker with Bro 2.4, but it’s not yet enabled by default. However, our master branch already relies on Broker, which means that 2.5 will come with Broker enabled by default after we get enough feedback from our users and putting out release candidates. The transition to C++11 so far caused most of the attention.

From the implementation side, we wanted to converge to a tight, minimal API that proves powerful enough to cover our use cases. CAF gives us the flexibility we need to hide a powerful asynchronous backend behind a blocking C API, but also expose the asynchrony at at Broker’s C++ API. Then there’s the messaging aspect: Broker is essentially a distributed pub/sub engine which naturally maps to the abstractions CAF provides.

One of our biggest challenges involves bringing together asynchronous lookups and type-erasure in the Bro’s statically typed scripting language. For example, when asking a remote data store for a value, the return type is not known a priori. Fortunately, we already have proposal for adding pattern matching (note to the security community: not regular expressions) for type-safe message access.

CAF Team: Did you consider other frameworks? If yes, what convinced you to use CAF instead?

Bro Team: We looked at other, more low-level messaging libraries. In particular, we compared CAF to nanomsg. With CAF we found that we could express the same idea in significantly less code because it comes with everything we need of the box. CAF simply doesn’t require any boilerplate and enabled us to focus on the problem domain, as opposed to grappling with the implementation details of messaging framework. After all, avoiding complexity and being able to reason about both code and protocol was the primary motivator for rewriting our communication layer.

The type-safe abstractions CAF provides on top of bare messaging made a huge difference in productivity. One of our team members summarized it as “[..] it didn’t feel like I was writing code that attempts to implement a protocol as it felt like writing code that is the protocol.”

We also found that nanomsg’s communication patterns, which represent one of its key selling points, can be easily implemented in a few lines of code with CAF. Orthogonal to that, CAF comes with a nice failure semantics (monitors & links) which we haven’t found in other frameworks.

Finally, if you didn’t release CAF under a BSD (or equivalently permissive) license, we couldn’t even have considered it in the first place.

CAF Team: How were the reactions to CAF in your team so far?

Bro Team: Internally we had one CAF fan already who brought the actor model to the attention of the team. After our comparative evaluation, we were all excited to see such a clear winner and looking forward to seeing CAF perform in more than just toy examples.

CAF Team: Can you tell us about your performance and scaling requirements? Did CAF meet your expectations?

Bro Team: Broker enables us to reach a new horizon in terms of scaling. Bro is one of the unique open system capable of monitoring 100 Gbps—a workload which only a cluster of machines can stem. The worker nodes must send the logs somewhere via Broker, while the messaging layer must not induce a high overhead so that the nodes can focus on their traffic analysis.

In addition to the standard cluster for monitoring a single link, we have an ongoing “deep cluster” project where multiple sensor nodes deep in the network augment the traditional boarder-gateway monitoring. This entails deploying hundreds and thousands of nodes at global scale.

Finally, Bro can instruct open-flow controllers to reconfigure network hardware, e.g., to insert blocking rules or shunt flows. These use cases have tight latency requirements, with Broker must meet.

We are still in the process of evaluating performance; things look good so far but we expect to do more tuning as our users start to rely on it for operations.

CAF Team: The official announcement of Broker was at BroCon 2015. How did your community react to your plans for the communication system? Did they see the benefits immediately or did they object to changing critical components and introducing CAF as a new long-term dependency?

Bro Team: Our users liked that Broker enables new ways of communicating with Bro in the future. We’re already receiving first questions about Broker on the mailing list.

Fortunately, our community trusts us developers that we make the right strategic decisions about Bro’s future. So we haven’t heard any critical voices about introducing CAF as new dependency. In fact, moving to C++11 will allows us to modernize Bro’s code base itself.

CAF Team: Final question: if you were to decide what the next feature of CAF would be, what would you have in mind?

Bro Team: In the near term, we want to migrate to typed actors so that we can get a compile-time proof that our messaging protocol implements the spec correctly. But CAF can do that already. In the short term, we want to use the direct connection optimization for when distant siblings in a tree hierarchy communicate. Further down the road, we’re excited about the planned introspective features, where CAF can report utilization of CPU I/O, which we would like to use a basis for decisions to adapt our topology dynamically at runtime. CAF’s stateful and migratable actors seem to lay the foundation for this capability.

CAF Team: Thank you very much for this interview and we wish you all the great success for the next 15 years of Bro!

Special thanks go to Matthias Vallentin for introducing the teams to each other. Matthias is a member of the Bro team as well as a long-time contributor to CAF.

Links

Testing Brokers

A major benefit of actors is that they are very easy to test. No complex mocking, no simulation, simply sending and receiving messages. Unfortunately this does not apply to brokers, a special type of actors performing I/O. Sending messages still works, of course, but testing brokers involves the I/O part as opposed to sending messages. Does your actor parse its input correctly? Does it close connections appropriately? This blog post describes how CAF supports unit testing of the I/O aspect outside the message passing realm.

A key concern of unit testing is reproducibility. When writing a test, you want to make sure to be in charge of each step individually. This is generally not the case when performing I/O operations, since this involves components of the operating system, e.g., sockets and the TCP stack. Fortunately, brokers in CAF never talk to the OS directly, which allows us to trick them into a fake environment.

Brokers

Before we talk about how to take control of I/O, we first need to understand how the machinery behind brokers works.

brokers

The UML diagram above shows the relations a brokers has with I/O-related classes in CAF. The middleman is a singleton in CAF that provides access to various I/O-related functionality. The three classes that are responsible for doing I/O are multiplexer, scribe, and doorman.

Scribes

A scribe decouples a broker from the underlying networking APIs. To the broker, it provides access to an output buffer and the scribe will create new_data_msg messages for the broker whenever the scribe has received enough data according to the read policy set by the broker (via configure_read). Note that the user-facing API of the broker does not expose the scribe to the programmer. All the broker needs to worry about is a connection_handle. This handle simply identifies the corresponding scribe and forwards the operation.

For example, if you call self->configure_read(hdl, receive_policy::at_most(1024)); on a broker, it looks which scribe manages hdl and forwards the second argument to this scribe.

It is also worth mentioning that the new_data_msg received by the broker will always contain the same buffer. The scribe merely re-writes its content over and over again.

Doormen

A doorman, just like a scribe, decouples the broker from underlying networking APIs. Whenever a new network connection has been established, the doorman generates a new_connection_msg for the broker.

Multiplexer

The multiplexer is an I/O loop and a factory for scribes and doormen. If you want to change which networking API CAF is using, this is the (abstract) class you need to implement. It has no member functions in the UML diagram for brevity, but here are the important ones we need to know:

class multiplexer {
public:
  /// Assigns an unbound scribe identified by `hdl` to `ptr`.
  /// @warning Do not call from outside the multiplexer's event loop.
  virtual void assign_tcp_scribe(abstract_broker* ptr,
                                 connection_handle hdl) = 0;

  /// Assigns an unbound doorman identified by `hdl` to `ptr`.
  /// @warning Do not call from outside the multiplexer's event loop.
  virtual void assign_tcp_doorman(abstract_broker* ptr, accept_handle hdl) = 0;

  /// Simple wrapper for runnables
  struct runnable;

  using runnable_ptr = intrusive_ptr<runnable>;

  /// Runs the multiplexers event loop.
  virtual void run() = 0;

  /// Invokes @p fun in the multiplexer's event loop, calling `fun()`
  /// immediately when called from inside the event loop.
  /// @threadsafe
  template <class F>
  void dispatch(F fun);

  /// Invokes @p fun in the multiplexer's event loop, forcing
  /// execution to be delayed when called from inside the event loop.
  /// @threadsafe
  template <class F>
  void post(F fun);

  // ...
};

Calling multiplexer::assign_tcp_scribe will create a new scribe (for TCP-like communication) and assign this scribe to the given broker. The function assign_tcp_doorman does the same thing for doormen. If you have ever used ASIO, run, dispatch, and post will remind you of ASIO’s io_service. And you are right. In fact, CAF’s asio_multiplexer is simply implemented using an io_service. The function run is called in a thread crated by the middleman on startup. Whenever a broker receives a message from other actors, it calls post to schedule handling the message for later by creating a runnable for this task.

Faking I/O

For testing brokers without actually doing I/O, CAF has a multiplexer implementation that is solely meant for testing (since 0.14.1):

class test_multiplexer : public multiplexer {
public:
  /// A buffer storing bytes.
  using buffer_type = std::vector<char>;

  /// Models pending data on the network, i.e., the network 
  /// input buffer usually managed by the operating system.
  buffer_type& virtual_network_buffer(connection_handle hdl);

  /// Returns the output buffer of the scribe identified by `hdl`.
  buffer_type& output_buffer(connection_handle hdl);

  /// Returns the input buffer of the scribe identified by `hdl`.
  buffer_type& input_buffer(connection_handle hdl);

  /// Returns `true` if this handle has been closed
  /// for reading, `false` otherwise.
  bool& stopped_reading(connection_handle hdl);

  /// Returns `true` if this handle has been closed
  /// for reading, `false` otherwise.
  bool& stopped_reading(accept_handle hdl);

  /// Stores `hdl` as a pending connection for `src`.
  void add_pending_connect(accept_handle src, connection_handle hdl);

  /// Accepts a pending connect on `hdl`.
  void accept_connection(accept_handle hdl);

  /// Reads data from the external input buffer until
  /// the configured read policy no longer allows receiving.
  void read_data(connection_handle hdl);

  /// Appends `buf` to the virtual network buffer of `hdl`
  /// and calls `read_data(hdl)` afterwards.
  void virtual_send(connection_handle hdl, const buffer_type& buf);

  /// Waits until a `runnable` is available and executes it.
  void exec_runnable();

  /// Returns `true` if a `runnable` was available, `false` otherwise.
  bool try_exec_runnable();

  /// Executes all pending `runnable` objects.
  void flush_runnables();

  // ...
};

The test_multiplexer has several member function that “fake” network events and allow you to manipulate the buffers of scribes directly. In particular, virtual_send allows you to fake incoming data on a connection_handle. This will cause the scribe to generate one or more new_data_msg messages (depending on the configured receive policy). Those messages are handled by the broker immediately.

To simulate a remote connection, one needs to create a pending connection using add_pending_connect and cause the corresponding doorman to accept it via accept_connection.

Using the test multiplexer requires calling set_middleman(new network::test_multiplexer) in main, before calling any I/O-related function in CAF. Before showing the test multiplexer in action, we first implement implement a broker we want to test.

Example Application

The application we are going to test is a simplistic HTTP server. We do not want to bother with actually parsing HTTP, so we always only consider the first line of a HTTP header and check if it is equal to "GET / HTTP/1.1". If we receive anything else, we will send a 404 as response.

However, we are going to deal with chunked input and we (pedantically) require "\n\r" for line breaks. We are not going to generate HTTP dynamically. Instead, we use the following constants:

constexpr char http_valid_get[] = "GET / HTTP/1.1";

constexpr char http_get[] = "GET / HTTP/1.1\r\n"
                            "Host: localhost\r\n"
                            "Connection: close\r\n"
                            "Accept: text/plain\r\n"
                            "User-Agent: CAF/0.14\r\n"
                            "Accept-Language: en-US\r\n"
                            "\r\n";

constexpr char http_ok[] = "HTTP/1.1 200 OK\r\n"
                           "Content-Type: text/plain\r\n"
                           "Connection: close\r\n"
                           "Transfer-Encoding: chunked\r\n"
                           "\r\n"
                           "d\r\n"
                           "Hi there! :)\r\n"
                           "\r\n"
                           "0\r\n"
                           "\r\n"
                           "\r\n";

constexpr char http_error[] = "HTTP/1.1 404 Not Found\r\n"
                              "Connection: close\r\n"
                              "\r\n";

constexpr char newline[2] = {'\r', '\n'};

When receiving input chunks, we need to keep track of the state of our parser. We will also store each received line in a vector of strings and make use of CAF’s stateful actors, as shown below.

enum parser_state {
  receive_new_line,
  receive_continued_line,
  receive_second_newline_half
};

struct http_state {
  http_state(abstract_broker* self) : self_(self) {
    // nop
  }

  ~http_state() {
    aout(self_) << "http worker finished with exit reason: "
                << self_->planned_exit_reason()
                << endl;
  }

  std::vector<std::string> lines;
  parser_state ps = receive_new_line;
  abstract_broker* self_;
};

using http_broker = caf::experimental::stateful_actor<http_state, broker>;

To make bookkeeping easier, we use one broker (our server) that accepts incoming connections and spawns a new HTTP worker per connection. The server does not need any additional state and is quite simple.

behavior server(broker* self) {
  aout(self) << "server up and running" << endl;
  return {
    [=](const new_connection_msg& ncm) {
      aout(self) << "fork on new connection" << endl;
      auto worker = self->fork(http_worker, ncm.handle);
    },
    others >> [=] {
      aout(self) << "unexpected: "
                 << to_string(self->current_message()) << endl;
    }
  };
}

And here is the implementation of our http_worker using the state from above.

behavior http_worker(http_broker* self, connection_handle hdl) {
  // tell network backend to receive any number of bytes between 1 and 128
  self->configure_read(hdl, receive_policy::at_most(128));
  return {
    [=](const new_data_msg& msg) {
      assert(! msg.buf.empty());
      assert(msg.handle == hdl);
      // extract lines from received buffer
      auto& lines = self->state.lines;
      auto i = msg.buf.begin();
      auto e = msg.buf.end();
      // search position of first newline in data chunk
      auto nl = std::search(i, e, std::begin(newline), std::end(newline));
      // store whether we are continuing a previously started line
      auto append_to_last_line = self->state.ps == receive_continued_line;
      // check whether our last chunk ended between \r and \n
      if (self->state.ps == receive_second_newline_half) {
        if (msg.buf.front() == '\n') {
          // simply skip this character
          ++i;
        }
      }
      // read line by line from our data chunk
      do {
        if (append_to_last_line) {
          append_to_last_line = false;
          auto& back = lines.back();
          back.insert(back.end(), i, nl);
        } else {
          lines.emplace_back(i, nl);
        }
        // if our last search didn't found a newline, we're done
        if (nl != e) {
          // skip newline and seek the next one
          i = nl + sizeof(newline);
          nl = std::search(i, e, std::begin(newline), std::end(newline));
        }
      } while (nl != e);
      // store current state of our parser
      if (msg.buf.back() == '\r') {
        self->state.ps = receive_second_newline_half;
        self->state.lines.pop_back(); // drop '\r' from our last read line
      } else if (msg.buf.back() == '\n') {
        self->state.ps = receive_new_line; // we've got a clean cut
      } else {
        self->state.ps = receive_continued_line; // interrupted in the middle
      }
      // we don't need to check for completion in any intermediate state
      if (self->state.ps != receive_new_line)
        return;
      // we have received the HTTP header if we have an empty line at the end
      if (lines.size() > 1 && lines.back().empty()) {
        auto& out = self->wr_buf(hdl);
        // we only look at the first line in our example and reply with our
        // OK message if we receive exactly "GET / HTTP/1.1", otherwise
        // we send a 404 HTTP response
        if (lines.front() == http_valid_get)
          out.insert(out.end(), std::begin(http_ok), std::end(http_ok));
        else
          out.insert(out.end(), std::begin(http_error), std::end(http_error));
        // write data and close connection
        self->flush(hdl);
        self->quit();
      }
    },
    [=](const connection_closed_msg&) {
      self->quit();
    },
    others >> [=] {
      aout(self) << "unexpected: "
                 << to_string(self->current_message()) << endl;
    }
  };
}

Our HTTP worker receives chunks of 128 bytes. Once it detected the end of the HTTP header (a blank line), it looks at the first line to see if it is equal to http_valid_get. If so, it sends the OK message, otherwise it sends the 404.

A minimal application using our brokers that always tries to open port 8080 is a three-liner:

int main() {
  spawn_io_server(server, 8080);
  await_all_actors_done();
  shutdown();
}

Testing the HTTP Broker

Without further ado, here is our complete unit test for the example application.

namespace {

class fixture {
public:
  fixture() {
    // note: the middleman will take ownership of mpx_, but using
    //       this pointer is safe at any point before calling `shutdown`
    mpx_ = new network::test_multiplexer;
    set_middleman(mpx_);
    // spawn the actor-under-test
    aut_ = spawn_io(server);
    // assign the acceptor handle to the AUT
    aut_ptr_ = static_cast<abstract_broker*>(actor_cast<abstract_actor*>(aut_));
    mpx_->assign_tcp_doorman(aut_ptr_, acceptor_);
    // "open" a new connection to our server
    mpx_->add_pending_connect(acceptor_, connection_);
    mpx_->assign_tcp_scribe(aut_ptr_, connection_);
    mpx_->accept_connection(acceptor_);
  }

  ~fixture() {
    anon_send_exit(aut_, exit_reason::kill);
    // run the exit message and other pending messages explicitly,
    // since we do not invoke any "I/O" from this point on that would
    // trigger those messages implicitly
    mpx_->flush_runnables();
    await_all_actors_done();
    shutdown();
  }

  // helper class for a nice-and-easy "mock(...).expect(...)" syntax
  class mock_t {
  public:
    mock_t(fixture* thisptr) : this_(thisptr) {
      // nop
    }

    mock_t(const mock_t&) = default;

    mock_t& expect(const std::string& what) {
      auto& buf = this_->mpx_->output_buffer(this_->connection_);
      CAF_REQUIRE((buf.size() >= what.size()));
      CAF_REQUIRE((std::equal(buf.begin(), buf.begin() + what.size(),
                              what.begin()));
      buf.erase(buf.begin(), buf.begin() + what.size()));
      return *this;
    }

    fixture* this_;
  };

  // mocks some input for our AUT and allows to
  // check the output for this operation
  mock_t mock(const char* what) {
    std::vector<char> buf;
    for (char c = *what++; c != '\0'; c = *what++)
      buf.push_back(c);
    mpx_->virtual_send(connection_, std::move(buf));
    return {this};
  }

  actor aut_;
  abstract_broker* aut_ptr_;
  network::test_multiplexer* mpx_;
  accept_handle acceptor_ = accept_handle::from_int(1);
  connection_handle connection_ = connection_handle::from_int(1);
};

} // namespace <anonymous>

CAF_TEST_FIXTURE_SCOPE(http_tests, fixture)

CAF_TEST(valid_response) {
  // write a GET message and expect an OK message as result
  mock(http_get).expect(http_ok);
}

CAF_TEST(invalid_response) {
  // write a GET with invalid path and expect a 404 message as result
  mock("GET /kitten.gif HTTP/1.1\r\n\r\n").expect(http_error);
}

CAF_TEST_FIXTURE_SCOPE_END()

The constructor of our fixture spawns the server that we are going to test. Since we need an actual pointer to the broker, we need to use actor_cast to get a pointer and downcast it afterwards. Using the pointer, we can setup a connection using the test multiplexer. These steps will create a new_connection_msg for the server that spawns a http_worker in response.

With the mock member function, we create a simple API that allows us to correlate inputs and outputs. The mock function is a simple wrapper around virtual_send and expect compares what output a broker has written in its output_buffer with the output we expect.

The test uses the unit test framework from CAF. Using a different test framework, e.g., Boost.Test, is straightforward. The complete source code can be found at GitHub.

Version 0.14 Released!

Version 0.14 of CAF has just been released. This release improves several CAF internals, fixes bugs (primarily of type-safe actors), adds an optional ASIO backend, and makes three changes to the API:

  • timed_sync_send has been removed (it offered an alternative way of defining message handlers, which is inconsistent with the rest of the API)
  • the policy classes broadcast, random, and round_robin in actor_pool were removed and replaced by factory functions using the same name
  • all functions that were deprecated with 0.10 were finally removed

We have also refined our policy regarding supporting compilers and improved our testing setup:

  • CAF always required a recent compiler and we are eager to adopt new language features and to catch up with C++14 (and C++17). On the other hand, we are aware that changing the compiler every few months is not acceptable for most users. As a compromise between progressivity and stability, we support versions of GCC and Clang that are up to two years old. Consequently, support for GCC 4.7 has been dropped with 0.14.

  • We are continuously working on increasing the number of distribution channels for CAF. At the moment, we provide a FreeBSD port, a Homebrew package, Biicode integration, and Linux packages via the OpenSuSE build service. You can find a full list with links etc. in the Get CAF Section of the README.

  • We have integrated several tools to constantly improve the code quality of CAF. Each pushed commit to develop is automatically tested via Jenkins on several operating systems and compilers. Our current setup includes:

    • Ubuntu (32 and 64 bit)
    • Fedora 22
    • Mac OS 10.10
    • Mac OS 10.9
    • FreeBSD 10

    Further, we test our code on GCC 4.8, GCC 4.9, GCC 5.1, Clang 3.5, and Clang 3.6. For static code analysis, we use Coverity and cppcheck. These tools are complemented with ASAN and LeakSanitizer.

  • In case you are waiting for MSVC support: we are currently testing on the latest RC from Microsoft. It appears that MSVC is not able to compile advanced match cases and libcaf_io does compile but its unit tests fail. Any feedback or help to get the IO parts working (or all parts of CAF compiling) is welcome. To check on the current progress please visit Issue #302 on GitHub.