Xavier Bruhiere

Reactive Python - Real-time events processing

September 26, 2016 (8y ago)11 views

There is a recent trend in programming literature promoting functionnal programming, as a sensible alternative to object-oriented programs for many use cases. This subject feeds many discussions and highlights how important is program design with our applications becoming more and more complex. Although there might be here some seductive intellectual challenge (because yeah, we love to juggle with elegant abstractions), there are also real business values :

  • building sustainable, maintainble programs
  • decoupling architecture components for proper team work
  • limiting bug exposure
  • better product iteration

And so on. When developers spot an interesting approach to solve a recurrent issue in our industry, they formalize it as a Design pattern. Today I want to discuss a powerful member of this family : the pattern Observer. We won't dive into the strict rhetorical details (sorry not sorry). Instead, we will develop how reactive programming can level up the quality of our work.

The scene

That was a bold statement, let's illustrate with a real-world scenario. Say we were tasked to build a monitoring system. We need some way to collect data, analyze it and take actions when things go unexpected. Anomaly detection is an exciting yet challenging problem. We don't want our data scientists to be bothered by infrastructure failures. And in the same spirit we need other engineers to only focus on how to react to specific disaster scenarii.

The core of our approach consists of two components. A monitoring module firing and forgetting its discoveries on channels. And an other processing brick intercepting those events with an appropriate response. The UNIX philosophy at its best : do one thing and do it well. We split the infrastructure by concerns and the workers by event types. Assuming our team defines well-documented interfaces, this is a promising design.

The rest of the article will discuss the technical implementation but keep in mind that i/o documentation and proper processing load estimation are also fundamental.

Strategy

Our local lab is composed of three elements :

  • The alert module we will emulate with a simple cli tool which publish alert messages.

  • The actual processing unit subscribing to events it knows how to react to.

  • A message broker supporting Publish / Subscribe (or PUBSUB) pattern. For this purpose Redis offers a popular, efficient and rock-solid solution. Highly recommended. But the database isn't designed for this case. NATS, however, presents itself like so:

NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always on ‘dial-tone’.

Sounds like promising ! Client libraries are available for major languages and Apcera, the company sponsoring the technology, has a solid reputation for building reliable distributed systems.

Again, we won't develop how processing actually happens, only the orchestration of this three moving parts.

Setup

NATS being a message broker, we need to run a server locally (version 0.8.0 as of today). Gnatsd is the official and scalable first choice. It is written in Go so we get performances and drop-in binary out of the box. For microservices fans (as I am), an official Docker image is available for pulling. And for lazy ones (as I am), a demo server is already running at nats://demo.nats.io:4222.

Services will use Python 3.5.1 but 2.7.10 should do the job with minimal changes. Our scenario is mostly about data analysis and system administration on the backend, and Python has a wide range of tools for both areas. So let's install the requirements.

$ pip --version
pip 8.1.1
 
$ pip install \
  -e git+https://github.com/mcuadros/pynats@6851e84eb4b244d22ffae65e9fbf79bd9872a5b3#egg=pynats \
  click==6.6  # for cli integration

And thats'all, we are ready to write services.

Publishing events

Let's warm up by sending some alerts to the cloud. First, we need to connect to the NATS server.

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: broker.py
 
import pynats
 
 
def nats_conn(conf):
    """Connect to nats server from environment variables.
 
    The point is to allow easy switching without to change the code. You can
    read more on this approach stolen from 12 factors apps.
    """
    # the default value comes from docker-compose
    (https://docs.docker.com/compose/) services link behavior
    host = conf.get('__BROKER_HOST__', 'nats')
    port = conf.get('__BROKER_PORT__', 4222)
    opts = {
        'url': conf.get('url', 'nats://{host}:{port}'.format(host=host, port=port)),
        'verbose': conf.get('verbose', False)
    }
 
    print('connecting to broker ({opts})'.format(opts=opts))
    conn = pynats.Connection(**opts)
    conn.connect()
    return conn

This should be enough to start our client.

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: observer.py
 
import os
import broker
 
 
def send(channel, msg):
    # use environment variables for configuration
    nats = broker.nats_conn(os.environ)
    nats.publish(channel, msg)
    nats.close()

And right after that, a few lines of code to shape a cli tool.

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: __main__.py
 
import click
 
 
@click.command()
@click.argument('command')
@click.option('--on', default='some_event', help='messages topic name')
def main(command, on):
    if command == 'send':
        click.echo('publishing message')
        observer.send(on, 'Terminator just dropped in our space-time')
 
if __name__ == '__main__':
    main()

chmod +x ./__main__.py gives it execution permission so we can test how our first bytes are doing.

$ # `click` package gives us a productive cli interface
$ ./__main__.py --help
Usage: __main__.py [OPTIONS] COMMAND
 
Options:
  --on TEXT  messages topic name
  --help     Show this message and exit.
 
$ __BROKER_HOST__="demo.nats.io" ./__main__.py send --on=click connecting to broker ({'verbose': False, 'url': 'nats://demo.nats.io:4222'})
publishing message ...

This is quite poor in feedback indeed but no exception means we did connect to the server and published a message.

Reacting to events

We're done with the heavy lifting! Now that interesting events are flying through the internet, we can catch them and actually provide business values. Don't forget the point : let the team write reactive programs without worrying how it will be triggered. I found the following snippet to be a readable syntax for such goal.

# filename: __main__.py
 
import observer
 
 
@observer.On('terminator_detected')
def alert_sarah_connor(msg):
    print(msg.data)

As the capitalize letter of On suggests, this is a Python class, wrapping a NATS connection. It aims to call the decorated function whenever a new message goes through the given channel.

Below is a naive implementation shamefully ignoring any reasonable error handling and safe connection termination (broker.nats_conn would be much more production-ready as a context manger, but hey : we do things that don't scale, move fast and break things).

# filename: observer.py
 
class On(object):
 
    def __init__(self, event_name, **kwargs):
        self._count = kwargs.pop('count', None)
        self._event = event_name
        self._opts = kwargs or os.environ
 
    def __call__(self, fn):
        nats = broker.nats_conn(self._opts)
        subscription = nats.subscribe(self._event, fn)
        def inner():
            print('waiting for incoming messages')
            nats.wait(self._count)
            # we are done
            nats.unsubscribe(subscription)
            return nats.close()
        return inner

Instil some life to this file from the __main__.py.

# filename: __main__.py
 
@click.command()
@click.argument('command')
@click.option('--on', default='some_event', help='messages topic name')
def main(command, on):
    if command == 'send':
        click.echo('publishing message')
        observer.send(on, 'bad robot detected')
    elif command == 'listen':
        try:
            alert_sarah_connor():
        except KeyboardInterrupt:
            click.echo('caught CTRL-C, cleaning after ourselves...')

Your linter might complain about the injection of the msg argument in alert_sarah_connor but no offense, it should just work (tm) :

$ In a first terminal, listen to messages
$ __BROKER_HOST__="demo.nats.io" ./__main__.py listen
connecting to broker ({'url': 'nats://demo.nats.io:4222', 'verbose':
False})
waiting for incoming messages
 
$ And fire up alerts in a second terminal
__BROKER_HOST__="demo.nats.io" --on='terminator_detected'

The data appears in the first terminal, celebrate !

Conclusion

Reactive programming implemented with Publish/Subscribe pattern brings a lot of benefits for events-oriented products. Modular development, decoupled components, scalable distributed infrastructure, single-responsability principle. One can think about how data flows into the system before diving into the technical details.

This kind of approach also gains traction from real-time data processing pipelines (Riemann, Spark, Kafka). NATS performances, indeed, allows ultra low-latency architectures development without too much of deployment overhead.

We covered in a few lines of Python the basics of a reactive programming design, with lot of improvement opportunities : events filtering, built-in instrumentation, infrastructure-wide error tracing. I hope you found in this article the building block to develop upon !