Quick Links: GitHub | Documentation A few weeks ago we open sourced Faust, a Python stream processing library that we built at Robinhood to make it extremely easy to build and deploy traditionally complex streaming architectures. As Robinhood has grown and we have added more and more functionality to our product, our infrastructure has also evolved. We have added numerous internal services and technologies to help us solve different problems. This has resulted in a typical application often needing to interact with one or many different services. Typical streaming frameworks such as Spark require external dependencies to be packaged with the app in specific ways, and submitted into the Yarn/Mesos cluster that is running the application. This is usually a detour from how Python applications typically manage dependencies — virtualenv and pip. We built Faust as a library to allow for it to be used with any existing tools you may be using. Simply install Faust, and use it to develop Python applications as you typically would. We use Python Asyncio to achieve high performance I/O. In this blog post we will walk through some examples of how we use Faust to interact with various different services using off-the-shelf libraries. Faust + Redis Redis has established itself as an in-memory data store of choice owing to its data structures, amazing query speeds and simplicity. We use Redis on Robinhood’s Data team across a variety of use cases. Following is an example, showing how we use Redis to cache messages on the Robinhood Feed. We can install aredis and Faust using pip:pip install aredis pip install faust Upon installing the dependencies, let’s first define our Faust application, Kafka topic and models:import datetime import faustclass Activity(faust.Record, isodates=True): user: str message: str timestamp: datetime.datetimeapp = faust.App("redis_example", broker="kafka://localhost:9092") activities_topic = app.topic("feed_activities", value_type=Activity) We can now create an agent which reads feed activities coming in through this topic, and adds the messages to the user’s Redis sorted set as follows:import [email protected](activities_topic) async def save_activities(activities): async for activity in activities: client = aredis.StrictRedis(host="localhost", port=6379) await client.zadd(activity.user, activity.timestamp, activity.message) As shown above, we use Redis as you would use it in any app. Faust doesn’t require any special drivers or modes for using Redis. All it needs is a Redis library that’s compatible with Python Asyncio. Faust + HTTP We often use streaming apps that need to talk to other services over HTTP. Below is an example of how we use the Python aiohttp library from a Faust streaming app for one of our use cases at Robinhood. First, let us install the Python library we will use for HTTP requests:pip install aiohttp We skip the app and model definition which is similar to the previous, and straightaway look at how we would design our agent. We create an agent which processes orders and uses a third part HTTP API to send order confirmation emails to our customers:import aiohttpasync def send_confirmation(order): url = f"https://emailer.robinhood.com/" data = { "user": order.user_id, "subject": "Order Confirmation", "body" f"Order: {order.quantity} shares of {order.symbol}", } async with aiohttp.ClientSession() as session: await session.post(url, json=data)@app.agent(orders_topic) async def add_symbol(orders): async for order in orders: await send_confirmation(order) A lot of our internal services export REST APIs. The ability to easily integrate aiohttp with Faust apps allows us to break down a variety of our backend systems into simple and isolated streaming apps. Faust + InfluxDB Robinhood operates on large amounts of time series data such as tick by tick price data for each stock symbol. We use InfluxDB to store some of these time series. Below is an example of how we query InfluxDB from a Faust application. Again, as before, let us install the Python library we will use to query InfluxDB:pip install aioinflux We now create an agent which looks at the orders topic from above and looks at the time series in InfluxDB for the particular stock to get the price at which the order executed was the price in the market at the time. We do this to ensure that we are giving the best quality of executions to our customers.import [email protected](orders_topic) async def add_symbol(orders): async for order in orders: client = aioinflux.InfluxDBClient() query = f"SELECT price FROM marketdata WHERE symbol = {order.symbol} AND timestamp














