Reactive programming for Real-Time Applications

Reactive programming for Real-Time Applications

The main idea of event-based programming is the management of the application execution flow on the events occurred around it that can affect its behavior, such as user actions (mouse click, pressing a key, etc.) or messages from other programs, including the operating system. These applications detect the events and treat them with the specific handler functions.

Reactive applications are subgroup event-driven applications. The key aspect of reactive programming is executing code asynchronously and in a non-blocking way.

Now we come closer to definitions of “Reactive Streams”, which complement reactive programming with a producer/consumer model represented by a publisher and a subscriber.

The benefits of Reactive Programs and use cases are:

  • Applications with user interaction - while waiting for user input, an application can execute other tasks.
  • Having other data streams that need to be managed - staying alive processing thousands of small messages per second or solve big data stream issues, like “backpressure”, is much easier with such an approach.
  • High-load - some frameworks advertise to be able to process up to 50 million msg/sec on a single machine.
  • Distributed systems - implemented with such approach your application is ready to be deployed on cloud and scaled horizontally from an architecture point of view, with the right framework it will be easy from DevOps perspective as well.
  • Long-time processes. These applications require performing tasks with a large amount of time due to the need for CPU time, resources or access to other blocking processes, and can block the system during execution. In these cases, this task is programmed in a separate function that can be executed in another thread and does not interrupt the main program thread. When it has to interact with the application, either to indicate the completion of the task or to request more information necessary to continue, the process sends an event directed to the main thread to let it know the current status and execute the next actions.

We put only one constraint before doing research - compatibility with Java 8. But even in this case, we had several tools that in one or another way allow trying hands on this approach:

  • RxJava 2
  • Spring 5
  • Akka Streams
  • MongoDB
  • Reactive Rabbit
  • Vert.x

Being limited in resources we shortened our list to Akka and Vert.x. We read docs and started implementing demo applications. Finally, we decided to go with Vert.x , because we could get way more profit from the tools it provides besides EventBus.

Vert.x

As its website said, Vert.x is event-driven and non-blocking. Also, it is lightweight, fast, modular and simple. Using this library doesn't restrict you to a specific manner to develop the applications; instead, it gives you different tools to improve your applications with reactive programming increasing the user experience.

We were specifically interested in doing an application with intensive usage of I/O, but in this case not using HTTP communication but using simple TCP connections to pass information in both edges of the application, and we found in Vert.x that is really easy to create a simple TCP Server and TCP Client.

Hands-on!

We created a small proof-of-concept project to check, how this all looks in real life. Source Code.

An application that provides two entities:

  • Connection: This entity will listen in a TCP port and all the requests received would be sent to one of the configured Providers.
  • Provider: This entity will open a TCP connection with an external Data Provider. Then each time it receives a request from a Connection it will forward it through its open TCP socket and with the response received it will apply some transformations.

As you may notice this application acts as a "broker" that uses intensively TCP connections.

That being said, in the first version we just used the potential of Vert.x NetServer.

So this is our first version of the connection.

public class ServerConnection implements ConnectionService {
   private Vertx vertx;

   public ServerConnection(Vertx vertx) {
       this.vertx = vertx;
   } 

   @Override
   public boolean enable(Connection configuration) {
      vertx.createNetServer().connectHandler(socket -> onConnect(configuration, socket))
      .listen(configuration.getPort(), configuration.getHost());

      return configuration.isEnabled();
   }

   private Handler<Buffer> onConnect(Connection configuration, NetSocket socket) {
      return buffer -> {
          configuration.getProviderService().getInfo(buffer.getString(0, buffer.length())).compose(s -> {
              socket.write(s);
              socket.write("\r\n");
              return Future.succeededFuture();
          });
      };
   }

}
Connection.java

As you may see we are creating a TCP server with vertx.createNetServer(options) very easily and binding a Callback specifying what needs to be done when we receive a request. And this works fine, in a fashionable non-blocking way because:

  • Vert.x NetServer is handling connections asynchronously invoking our inline callback.
  • We are invoking getProviderService().getInfo() using Future mechanism, which enables us to make a non-blocking call. If we don't do it this way the process would be blocked until we have a response from the External Data Provider.
@Override
public Future<String> getInfo(String input) {
   logger.info("Providing for: " + input);
   socket.write(input);
   return Future.future(promise -> {
       socket.handler(buffer -> {
           promise.complete(buffer.getString(0, buffer.length()));
       });
   });
}
IPProvider.java

Improving things

Easy up there and just using a few of Vert.x, not really intrusive it isn't? But we felt that Vert.x could offer much more than we have been using. The connection needs to know the interfaces of Provider in order to use it, scalability is totally tied to Connection side, etc...

So Vert.x provides some mechanism to make this more flexible and async. These are Verticles and Event Bus.

The Event Bus

Verticles are chunks of code that get deployed and run by Vert.x. Verticles can also be run in different languages! It seems exciting! And of course, verticles are purely asynchronous.

On the other hand, Event Bus is a mechanism to communicate Verticles, giving to us addressing, publish/subscribe and point-to-point communication.

@Override
public void start(Future<Void> future) throws Exception {
   super.start();
   NetServerOptions options = new NetServerOptions().setLogActivity(true);
   NetServer server = vertx.createNetServer(options);

   server.connectHandler(socket -> onInput(configuration, socket))
         .listen(configuration.getPort(), configuration.getHost(), connectionStartedHandler(future, configuration));
         }

...

private void onInput(Connection configuration, NetSocket socket) {
   socket.handler(buffer -> {
       String message = buffer.getString(0, buffer.length());
       logger.info("Got input: " + message);
       EventBus eventBus = vertx.eventBus();

       logger.info("Sending message to " + configuration.getProviderAddress() + " : " + message);
       eventBus.request(configuration.getProviderAddress(), message, ar -> {
           String result = ar.succeeded() ? ar.result().body().toString()  : "Error";
           logger.info("Received reply: " + result);
           socket.write(result);
       });
   });
}
Connection.java

As you may see it's not a big changes of code but now it enables us to just have the Provider Address to send the message with the expected tag, we don't know provider methods anymore, and we don't need to care about providers asynchronous behaviour, since this is given by the Event Bus mechanism automatically. We will just bind some code to be invoked when we receive a reply.

On the provider side, things remain very similar. We just register the verticle as a consumer in the event bus with its own Id, and wait for messages:

private void registerEventListener(String id) {
   EventBus eventBus = vertx.eventBus();

   logger.info("Registering new event handler " + id);
   eventBus.consumer(id, message -> {
       logger.info(String.valueOf(message.body()));
       this.getInfo(String.valueOf(message.body()))
           .setHandler(s -> {
               message.reply(s.result());
               s.succeeded();
           });
   });
}
IpProvider.java

Nice, but how we tackle scalability?

We need more workers!

That's an easy question, just of care of it in deployments options, so when we deploy a verticle we can do:

DeploymentOptions options = new DeploymentOptions().setInstances(20);
vertx.deployVerticle(new IpProvider(dataProviderConf), options)

Final thoughts

Vertx is a unique tool kit. It could be called massive because of the great number of its features: TCP and HTTP servers, own JSON library,  Event bus, asynchronous coordination utilities, DNS client, Streams, Metrics and a lot of others. But at the same time, it is light (650 kb) and modular, so you could take only those parts you need. It says loudly about its non-blocking philosophy,  but it doesn’t make you write an application with some predefined template as some frameworks do. Just gives you tools, give you ideas and wish good luck wherever you go.  

We found vertx as a good higher-level alternative to Akka from some perspective. When solving some task with Akka, we felt like we had to investigate a way to find a right way to do this type of things in Akka, but when solving the same task with Vertx there were two options: there was already existing fine tool provided in Vertx (like TCP server), or in the worst case you search a right way to do it with an event-driven approach. Nothing framework specific.

Positives:

  • If the event-based paradigm is applicable to your project, Vertx is a good provider of a tool kit for you.
  • Besides the event-bus it provides a big list of modules that simplify life.
  • High load ready. It is possible due to, both, event-driven approach and easy deployment management.
  • It is used by people. Community is important, it means more libraries, places to share thoughts and people that could help you at some point.
  • Available in a several JVM languages and JavaScript.

Negatives:

  • It is easy to lose control, start writing things with a proposed “fluent” approach and end up with spaghetti code. It is true that Java is not a functional language in the first place, so this, and a big amount of callbacks doesn’t look natural sometimes.
  • We did not manage to find a good approach to unit testing. Official documentation and people on the internet propose things that are actually deployed HTTP servers just to test some small part of it. Vertx-unit library gives a small API to test async calls, but there is no way to easily mock things. For example, you can not mock Vertx HTTP server to test onSuccess and onFail callbacks that are being called when the server started or failed to start.

About Akka

Akka concentrates all the focus on improving the concurrency, so it does it incredibly good. Actors are light, fast and thread-safe.

Pros:

  • It includes a complete actor-based engine to implement the non-blocking feature.
  • Akka has a high performance. It can manage up to 50 million messages per second on a single machine. It also has a low memory need, with ~2.5 million actors per heap GB.
  • It is implemented taking careful in thread safety, to avoid erroneous behaviors or unpredictable results.
  • As it is implemented with an actor’s hierarchy, when one of them detects a failure, it suspends itself and all of its subordinates, sending a message to a supervisor as a self-healing mechanism to auto recover from failures.
  • Akka supports Reactive Streams via Akka Streaming. Akka has stream operators (via Scala DSL) that is very concise and clean.
  • The code is written more in a “Java way” compared with Vert.x. Message handlers and Actor configuration look more natural than Vert.x callbacks and the way you configure a Verticle, although you do it via JSON objects.

Cons:

  • With Vert.x you just start doing it. Akka is difficult to learn and design. Behind every positive point, you need to learn a lot of things in theory, on a conceptual level, and in practice, to be able to implement your complex business logic and keep your system fast and furious. Luckily, official documentation, a bunch of good books and conference video records will help you with that. So just put some time buffer that you will use for learning.

Conclusion

So, we tried another programming paradigm and reviewed some well-know frameworks that are used with it in Java. In our opinion, this is a promising and, what is important, effective way of making applications, and for sure we will use it in our next projects.

Vert.x recommended itself very good, and despite the fact that there are better "reactive" model providers, like Akka, no doubt we are going to use some of its modules as well.

References: