Xavier Bruhiere

Reactive Python - Asynchronous programming

October 5, 2016 (8y ago)8 views

On the Confluent website one can find this bold title :

Stream data changes everything

From the createors of Kafka, a real-time messaging system, this is not a surprising assertion. Yet, data streaming infrastructures have gained in popularity and many projects require the data to be processed as soon as it shows up. It contributed to the development of famous technologies like Spark Stremaing, Apache Storm and more broadly websockets.

This latest piece of software in particular brought real-time data feeds to web applications, trying to solve low latency connections. Coupled with the asynchronous Node.js, one can build a powerful event-based reactive system. But what about Python ? Given the popularity of the language in data science, would it be possible to bring the benefits of this kind of data ingestion ? As this article will show, it turns out that modern Python (hear Python 3.4 or later) supports asynchronous data streaming apps.

Introducing asyncio

Python 3.4 introduced in the standard library the module asyncio to provision the language with :

Asynchronous I/O, event loop, coroutines and tasks

While Python treats functions as first-class objects (meaning you can assign them to variable, pass them as arguments), most developers follow an imperative programming style. It seams on purpose :

It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code.

  • Guido van Rossum

So Asyncio is the pythonic answer to asynchronous programming. This paradigm makes a lot of sense for otherwise costly I/O operations or when we need events to trigger code.

Scenario

For fun and profit, let's build such project. We will simulate a dummy electrical circuit composed of three components :

  • A clock regularly ticking
  • A board I/O pin randomly choosing to toggle its binary state on clock events
  • A buzzer buzzing when the I/O pin flips to one

This set us up with an interesting machine-to-machine communication problem to solve.

Note the code snippets in this post makes use of features like async and await introduced in Python 3.5. While it would be possible to backport to Python 3.4, I highly recommend the reader to follow along with the same version or newer. Anaconda or Pyenv can ease the installation process if necessary.

$ python --version
Python 3.5.1
 
$ pip --version
pip 8.1.2

Asynchronous webscoket Client/Server

The client : Clock

Our first step, the clock, will introduce both asyncio and websocket basics. We need a straightforward method that fires tick signals through a websocket and wait for acknowledgement.

# filename: sketch.py
 
async def clock(socket, port, tacks=3, delay=1)

The async keyword is sugar syntaxing introduced in Python 3.5 to replace the previous @asyncio.coroutine. The official pep 492 explains it all but tldr : API quality.

To simplify websocket connection plumbing we can take advantage of the eponym package: pip install websockets==3.5.1. It hides the protocol's complexity behind an elegant context manager.

# filename: sketch.py
 
# the path "datafeed" in this uri will be a parameter available in the other
# side but we won't use it for this example
uri = 'ws://{socket}:{port}/datafeed'.format(socket=socket, port=port)
 
# manage asynchronously the connection
async with websockets.connect(uri) as ws:
  for payload in range(tacks):
      print('[ clock ] > {}'.format(payload))
      # send payload and wait for acknowledgement
      await ws.send(str(payload))
      print('[ clock ] < {}'.format(await ws.recv()))
      time.sleep(delay)

The keyworkd await was introduced with async and replaces the old yield from to read values from asynchronous functions. Inside the context manager the connection stays open and we can stream data to the server we contacted.

The server : IOPin

The core of our application are entities capable of speaking to each others directly. To make things fun we will expose the same API as Arduino sketches, i.e. a setup method ran once at startup and a loop called when new data is available.

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: factory.py
 
import abc
import asyncio
 
import websockets
 
 
class FactoryLoop(object):
    """ Glue components to manage the evented-loop model. """
 
    __metaclass__ = abc.ABCMeta
 
    def __init__(self, *args, **kwargs):
        # call user-defined initialization
        self.setup(*args, **kwargs)
 
    def out(self, text):
        print('[ {} ] {}'.format(type(self).__name__, text))
 
    @abc.abstractmethod
    def setup(self, *args, **kwargs):
        pass
 
    @abc.abstractmethod
    async def loop(self, channel, data):
        pass
 
    def run(self, host, port):
        try:
            server = websockets.serve(self.loop, host, port)
            self.out('serving on {}:{}'.format(host, port))
            asyncio.get_event_loop().run_until_complete(server)
            asyncio.get_event_loop().run_forever()
        except OSError:
            self.out('Cannot bind to this port! Is the server already running?')
        except KeyboardInterrupt:
            self.out('Keyboard interruption, aborting.')
            asyncio.get_event_loop().stop()
        finally:
            asyncio.get_event_loop().close()

The child objects will be required to implement setup and loop while this class will take care of

  • Initializing the sketch
  • Registering a websocket server based on a asynchronous callback (loop)
  • Telling the event loop to poll for... events.

The websockets states the server callback is expected to have the signature on_connection(websocket, path). This is too low level for our purpose. Instead, we can write a decorator to manage asyncio details, message passing, error handling, etc... and only call self.loop with application-level relevant information : the actual message and the websocket path.

# filename: factory.py
 
import functools
 
import websockets
 
 
def reactive(fn):
 
    @functools.wraps(fn)
    async def on_connection(klass, websocket, path):
        """Dispatch events and wrap execution."""
        klass.out('** new client connected, path={}'.format(path))
        # process messages as long as the connection is opened or
        # an error is raised
 
        while True:
            try:
                message = await websocket.recv()
                aknowledgement = await fn(klass, path, message)
                await websocket.send(aknowledgement or 'n/a')
            except websockets.exceptions.ConnectionClosed as e:
                klass.out('done processing messages: {}\n'.format(e))
                break
    return on_connection

Now we can develop a readable IOPin object.

# filename: sketch.py
 
import factory
 
class IOPin(factory.FactoryLoop):
    """Set an IO pin to 0 or 1 randomly."""
 
    def setup(self, chance=0.5, sequence=3):
        self.chance = chance
        self.sequence = chance
 
    def state(self):
        """Toggle state, sometimes."""
        return 0 if random.random() < self.chance else 1
 
    @factory.reactive
    async def loop(self, channel, msg):
        """Callback on new data."""
        self.out('new tick triggered on {}: {}'.format(channel, msg))
        bits_stream = [self.state() for _ in range(self.sequence)]
        self.out('toggling pin state: {}'.format(bits_stream))
        # ...
        # ... toggle pin state here
        # ...
        return 'acknowledged'

We finally need some glue to run both clock and IOPin and test if the later toggles its state when the former fires new ticks. The following snippet uses a convenient library, click 6.6, to parse command line arguments.

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: arduino.py
 
import sys
import asyncio
 
import click
 
import sketchs
 
 
@click.command()
@click.argument('sketch')
@click.option('-s', '--socket', default='localhost', help='Websocket to bind to')
@click.option('-p', '--port', default=8765, help='Websocket port to bind to')
@click.option('-t', '--tacks', default=5, help='Number of clock ticks')
@click.option('-d', '--delay', default=1, help='Clock intervals')
def main(sketch, **flags):
    if sketch == 'clock':
        # delegate the asynchronous execution to the event loop
        asyncio.get_event_loop().run_until_complete(sketchs.clock(**flags))
    elif sketch == 'iopin':
        # arguments in the constructor go as is to our `setup` method
        sketchs.IOPin(chance=0.6).run(flags['socket'], flags['port'])
    else:
        print('unknown sketch, please choose clock, iopin or buzzer')
        return 1
 
    return 0
 
 
if __name__ == '__main__':
    sys.exit(main())

Don't forget to chmod +x the script and start the server in a first terminal ./arduino.py iopin. When it is listening for connections, start the clock with ./arduino.py clock and watch them communicate ! Note that we used here common default host and port so they can find each other.

Peer to peer communication

So far we established a websocket connection to process asynchronously clock events. Now that one pin swing between 1's and 0's, let's wire a buzzer and pretend it buzzes on high state (1) and remains silent on low ones (0). We can rephrase that in Python like so :

# filename: sketches.py
 
import factory
 
 
class Buzzer(factory.FactoryLoop):
    """Buzz on light changes."""
 
    def setup(self, sound):
        # customize buzz sound
        self.sound = sound
 
    @factory.reactive
    async def loop(self, channel, signal):
        """Buzzing."""
        behavior = self.sound if signal == '1' else '...'
        self.out('signal {} received -> {}'.format(signal, behavior))
        return behavior

So now how do we make them to communicate ? Since they share a common parent class, we implement a stream method to send arbitrary data and acknowledge reception with, also, arbitrary data. To sum up, we want IOPin to use this API:

class IOPin(factory.FactoryLoop):
 
    # [ ... ]
 
    @protocol.reactive
    async def loop(self, channel, msg):
        # [ ... ]
        await self.stream('buzzer', bits_stream)
        return 'acknowledged'

Service Discovery

The first challenge to solve is service discovery. We need to target specific nodes within a fleet of reactive workers.

This topic however goes past the scope of this article. The shortcut below will do the job (i.e. hardcode the nodes we will start) while keeping us focus on reactive messaging.

# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# filename: mesh.py
 
"""Provide nodes network knowledge."""
 
import websockets
 
 
class Node(object):
 
    def __init__(self, name, socket, port):
        print('[ mesh ] registering new node: {}'.format(name))
        self.name = name
        self._socket = socket
        self._port = port
 
    def uri(self, path):
        return 'ws://{socket}:{port}/{path}'.format(socket=self._socket,
                                                    port=self._port,
                                                    path=path)
 
    def connection(self, path=''):
        # instanciate the same connection as `clock` method
        return websockets.connect(self.uri(path))
 
 
# TODO service discovery
def grid():
    """Discover and build nodes network."""
    # of course a proper service discovery should be used here
    # see consul or zookkeeper for example
 
    # note: clock is not a server so it doesn't need a port
    return [
        Node('clock', 'localhost', None),
        Node('blink', 'localhost', 8765),
        Node('buzzer', 'localhost', 8765 + 1)
    ]

Streaming Machine-to-Machine chat

Let's provide FactoryLoop with the knowledge of the grid and implement an asynchrone communication channel.

# filename: factory.py (continued)
 
import mesh
 
class FactoryLoop(object):
 
  def __init__(self, *args, **kwargs):
      # now every instance will know about the other ones
      self.grid = mesh.grid()
 
    # ...
 
  def node(self, name):
      """Search for the given node in the grid."""
      return next(filter(lambda x: x.name == name, self.grid))
 
  async def stream(self, target, data, channel):
      self.out('starting to stream message to {}'.format(target))
 
      # use the node webscoket connection defined in mesh.py
      # the method is exactly the same as the clock
      async with self.node(target).connection(channel) as ws:
          for partial in data:
              self.out('> sending payload: {}'.format(partial))
              # websockets requires bytes or strings
              await ws.send(str(partial))
              self.out('< {}'.format(await ws.recv()))

We added a bit of debugging lines to better understand how the data flows through the network. Every implementation of the FactoryLoop can both react to events and communicate with other nodes it is aware of.

Wrapping up

Time to update arduino.py and run our cluster of three reactive workers in three.

@click.command()
# [ ... ]
def main(sketch, **flags):
    # [ ... ]
    elif sketch == 'buzzer':
        sketchs.Buzzer(sound='buzz buzz buzz').run(flags['socket'], flags['port'])

Launch three terminals or use a tool like foreman to spawn multiple processes. Either way, keep in mind you will need to track scripts output.

$ # start IOPin and Buzzer on the same ports we hardcoded in mesh.py
$ ./arduino.py buzzer --port 8766
$ ./arduino.py iopin --port 8765
 
$ # now that they listen, trigger actions with the clock (targetting IOPin port)
$ ./arduino.py clock --port 8765
[ ... ]
 
$ # Profit !

We just saw one worker reacting to a clock, and an other one reacting to randomly generated events. The websocket protocol allowed us to exchange streaming data and receive arbitrary responses, unlocking sophisticated fleet orchestration. While we limited this example to two nodes, a powerful service discovery mechanism could bring to life a distributed network of microservices.