Implementing a Streaming Data Pipeline
Trustly has seen a lot of growth during the last few years and this has affected how we do data and analytics. Our challenges are probably not unique and we do not yet know if the choices we made were the right ones. Only time will tell. I have been with the company for almost six years and have seen our dedicated data and analytics resources grow from zero to perhaps 40 people, depending on how you count us. One year from now that number will have grown even further.
Background
I originally intended this blog post to be quite technical and only focus on a few explicit engineering challenges we faced and how we solved them. But I soon realized that it would not make much sense if I did not describe the context a bit more. One thing I have noticed with being part of a bigger organization is that it becomes even more important to stay focused on which problems you are trying to solve. As an engineer, I feel best when I can spend a whole day trying to solve an interesting technical problem (and I am probably not the only one). As a human being, I tend to gravitate towards doing things that make me feel good (don’t we all?). Unfortunately, what makes me feel good is not always what is most valuable to my employer. In a small organization where everybody knows everybody and all employees have lunch together on Mondays, this misalignment quickly becomes obvious and can be sorted out but the larger the organization becomes, the longer I can go on solving problems that perhaps were not that important to begin with. It is harder - for me as well as for others - to falsify my belief in that I am working on something really important. Especially when it sounds like something a data engineer should be doing, such as building a streaming data pipeline.
Let’s first take a look at one of our existing batch data platforms, which uses Google Cloud Platform. We have a number of data sources, most important of which is our payment product, but also including CRM systems, financial systems, Jira, etc. Data is mostly pulled from these systems using jobs written in Python and triggered by Airflow. We ingest all this data and put it in quite a raw format in BigQuery, our data lake if you will. The transformations downstream from there are written in SQL and executed by dbt. We started out a few years ago orchestrating all these SQL transformations with Airflow operators but moved everything to dbt about a year ago and we haven’t regretted that for a minute. Dbt works really well and enables us to decouple the transformation of data from the ingestion.
When moving from a small to a larger organization, you need to start thinking about decoupling processes and how to scale your teams. When the entire data organization is only three people everybody will know a bit of everything - and that’s ok. If you want to scale that 10 times, the best way to do it is probably not by requiring every new team member to know Java, Python, SQL and cloud architecting as well as having a detailed understanding of what product metrics matter, in which source system to find the raw data, how to define them and how to work with business stakeholders. People who know all that (or can even learn it) are a scarce resource. Instead, enabling Analysts and Analytics Engineers to develop and maintain pipelines in SQL, without having to worry about how the raw data gets into the platform in the first place - a job best left to the Data Engineers - is something we have found enables us to remove bottlenecks and distribute the workloads over many teams.
Why streaming?
So, back to streaming - why do we need that? The obvious answer would be that we need data in real time and while that is certainly something to strive for, I would not say that is what matters most to us at the moment. Everything else being equal - having data, i.e. knowing something, earlier is better than later. But everything else isn’t equal when it comes to batch versus streaming. I would rather rate our current priorities as follows:
- Capture change
- Reduce load times
- Decrease latency
Here I am thinking of reducing latency from one day to an hour (rather than from one second to a millisecond). Let us have a look at each of these in turn.
Capture change
To a large degree, analytics is about finding underlying patterns that will help you understand the world. Understanding the world around you helps you make decisions - hopefully decisions that will help your business grow. There is a lot to be said about how (and how not) to turn data into insights (or even better - decisions) and I happily leave those challenges to our analysts and data scientists. Having said that, I think it is quite uncontroversial to state that time is an important dimension in much analytics work. Time as in: “Yesterday we sold for 5 SEK, today we sold for 10 SEK, how much will we sell for tomorrow?”
Jay Kreps has written a classic blog post with the title “The Log: What every software engineer should know about real-time data's unifying abstraction” which does a much better job than me explaining the benefits of capturing change over time in the format of a log of events. The bottom line for our use-case is that our payments system generally stores information about the world as it is now, and only to a limited degree what it looked like yesterday. Or half a second ago. It would be impractical to use an OLTP database to persist all changes for long time periods but if the payment system can just fire away events as they happen and someone collects them on the other end of a message broker, they can be used to “replay” and reconstruct the state of the world at any point in time you may want to study in retrospect.
Reduce load times
Unfortunately, the database of our payment product does not have a “last modified” timestamp for all tables. This means that there is no way of telling which of the rows in a table have changed between yesterday and today - or between one second ago and now. In order to have consistent data on the data platform, we therefore need to export whole tables from the production system every night. And as Trustly’s transaction volumes have exploded, so have our nightly data exports. You could argue that the database should have been designed in a better way to begin with but when the system was built many years ago, the focus was on getting a working product out of the door that we could sell, not on adapting to an analytics platform we might build several years into the future (if we were still in business by then).
Getting a stream of events, i.e. just the new stuff, drastically reduces the compute and bandwidth spent on updating the data on our platform to the latest state.
Decrease latency
This is probably what you first think of when you hear the word “streaming” and of course it is something that matters to us as well. Not being limited by having fresh data only once a day enables new uses of the data. What you should keep in mind though is that streaming is hard. A real-world comparison could be the difference between doing plumbing work on your water supply lines (streaming) and watering your plants (batch). The consequences of an error in the former are a lot more severe (flooded house) than in the latter (wet window sill). If you have a greenhouse, the plumbing work to install water there might be worth it but if you just have five plants in your apartment, you might want to restrain your ambitions of being on the technological forefront. I would argue the same thinking goes for data platforms.
Our solution
With the above in mind, we set out about a year ago to build a framework that would improve the ingestion of data into our data platform. The work is by no means done (it will probably never be) but this is what we have come up with so far.
Producers -> Pub/Sub -> Beam (Dataflow) -> Google Cloud Storage -> Airflow (Cloud Composer) -> BigQuery -> dbt -> BigQuery -> Consumers
This solution has been live in production for a few subcomponents of our payment system since late August, and we are still evaluating how to improve it. We have made a few observations so far.
Strict schemas are key for structured data
We have settled on Avro for the encoding of messages sent from producers. After having experimented with BigQuery’s auto-detect features of pure JSON last year we knew we needed something stricter not to end up with a crash/maintenance hell for the data team. Together with one of the product development teams (and eventual producers of data), we did look at Protobuf and JSON with schemas but Avro seemed like the choice with the least downsides.
While there is some rudimentary support for schemas on GCP, e.g. you can assign Avro schemas to Pub/Sub topics, our experience has been that this is far more immature than what, for example, Kafka has to offer. GCP is improving all the time so perhaps a year from now things will look different. For now we have a schema store in a GCS bucket where producers put their schemas and from where the Beam ingestion job can read them.
Use cloud services when possible
Unless you have very specific needs or you have a lot of competence in some area already, we have found that using cloud services is an easy way to get something scalable into production in the least amount of time. We actually started out building the pipeline based on Kafka but later switched over to Pub/Sub when it became apparent that the Data Services team would have to do a fair share of operating and maintaining the event delivery component (Kafka or Pub/Sub). A reason not to go with a cloud service is the risk of lock-in but if you take some care when doing the implementation, e.g. only use the framework for its core purpose in a decoupled sort of way and don’t start (ab)using it for all sorts of things, it should be replaceable. Even better is, of course, if the cloud service is based on an open source framework where the logic can be moved to another cloud vendor’s hosted solution, e.g. Airflow (Cloud Composer) or Beam (Dataflow), should the need arise.
Beam’s Python API is immature compared to Java
Our team’s competence is mostly within Python and SQL so a natural choice for us was to develop the Beam code that does the ingestion from Pub/Sub to BigQuery in Python. However, after having spent a fair amount of time we started realising that using Java would have given us better support and more options. To be fair, the Beam documentation does not hide this fact but we, perhaps a bit naively, did not put that much consideration into it beforehand. For example, Python’s lack of a strict typing system might make it quicker to get started on something but when you want to make sure you can handle all type conversions between an Avro schema and a BigQuery table, Java is more reliable.
It is also the case that the Java sources and sinks provide more functionality out-of-the-box, e.g. you can provide the BigQueryIO connector in Java with a table name to write to based on a function you define to be evaluated at runtime. For Python, the table name has to be based on a field in the incoming event. Bottom line: if you choose to go with Python you will have more restrictions on how you can build your pipeline and some of the features it offers seem more or less experimental. You typically have to look into the source code to figure out what it actually does and what limitations there are.
Consider micro-batching for problems where you don’t need real-time
At Trustly, we do have use cases that require near real-time streaming data for the pipeline but none of them are fully implemented on the platform yet. One reason we choose Beam for ingestion is that it would enable us to combine streaming and batch flows and - to some degree - switch between the two. Depending on the needs of the data consumer, we could offer a “data product” that is no more complex than needed. In our case, micro batching means storing data as Avro files on GCS every 10 minutes. Once a day, we load those files into partitioned tables in BigQuery. During the day, the data is queryable (with up to 10 minutes latency) from a BigQuery external table pointing to the folder on GCS where today’s files are placed; very much like the batch and speed layers of a lambda architecture.
The benefit of this is that we have more liberty in how we eventually structure the data in BigQuery. If data is constantly streaming into tables it is hard to change things, e.g. add partitioning to a table. (If we go back to the plumbing analogy, it’s like trying to replace the hose to the dishwasher when it is running and there is no way to turn off the water supply). Also, as we are ingesting full days into day-partitioned tables, we get idempotent jobs and in the case of a failure somewhere in the pipeline it is much easier to sort things out and get back to a known state.
Conclusion
You deserve some credit if you have made it this far, yet I feel like I have just scratched the surface of (one component of) our data setup. There are more things I would like to share. For example our journey with Airflow/Cloud Composer and how we have scaled out the use of dbt in the organization. However, I hope this has at least given some insight into what we are working on and the reasons for choosing our particular solution. If you think there are better ways to do this or if you just think these tasks sound like an interesting challenge - why not come join us? We are constantly looking for skilled data engineers to grow the team and improve our practise within the company.
Per Isacson
Head of Data Engineering