Data Orchestration via Events

Published on Oct 7, 2024

·

20 min read

Blogpost Main Image

Prelude

Picture a simple data pipeline processing a few tables from a database into a central place to store it, clean it, and enhance it for business. Call it a Data Lake, Mesh, Warehouse, Lakehouse… does not matter. These are simply datasets, building on each other, with each update propagating downstream to the other datasets. This could be visualized as data flowing from node to node in a DAG.

Now scale that 100x.

There are now multiple systems, various technologies, different data & domains, several teams responsible, all with ever-changing overlaps. There is no central data governance - but keep trying Databricks! - nor platform or tool to visualize everything. Maybe one team uses Dagster, others rely on Azure Data Factory, and the remaining have some in-house solutions.

How do we ensure that data still flows all the way without interruptions or unnecessary delays? Can we generalize without being too restrictive? Is there some sort of interface we could design?

Problem Statement

Let us look at an example.

DAG Example

We have team alpha building datasets A & B, team beta doing C, and team gamma using all of them to create D. We are now in team gamma. When do we update our dataset D?

Easy, let’s just update it on a schedule, say, daily at midnight?

What if it needs to be more frequent than that? Or what if there are some critical fixes upstream that we need to process ASAP?

Well… then let’s tell the upstream team to do the triggering for us!

What if there is a bug and they don’t trigger? Or we have to release changes on our end and block any new triggers? But wait, they are all on vacation and cannot help us!

Proposal

Admittedly, the options above are valid approaches, particularly in tight-knit teams or where the tech landscape is not as complex. Yet, there are situations when tackling these issues makes sense (hint: corporate).

So the proposal here is some sort of interface between the systems and/or the teams. Taking inspiration from Event-Driven Architectures, let’s decouple things via an intermediary table of events. Think of this table simply as a log of all the data updates happening within a single system or team, so an event would be in the form of “this table got updated at time X”.

This table then becomes the interface between systems and teams, such that whoever wants to consume data only needs to look at this table to figure out when new things have arrived; in a sense, they subscribe to this data.

See where I’m going with this?

The producer of the data publishes details about each data update happening in its domain. Indeed this is pretty much the Publish-subscribe pattern but without using a whole platform such as Kafka. We do not need a distributed system, nor such low latency, for our use-case. As long as we follow some principles, which we’ll get to shortly, it is fine to rely on something less complex.

Where do we store these events?

The easy answer is wherever it can be accessed by others. It does not need to be centralized, even though that could simplify things. Each team or system should have complete control over where they publish this dataset. So it could be a table in a database, a file on HDFS, maybe even a topic on Kafka; they are all ok!

PubSub

This is exactly the point here; we are looking for an overall pattern to replicate, not yet another platform for this specific use case.

We do need to pay attention to a few things though. Before we get to the actual implementation and more specific table format details, let’s consider the following.

Distributed issues

Clock drift is an important topic to keep in mind, particularly when dealing with events. There are quite a few solutions available, however do we really want to pay attention to this?

One could instead aim for the single-writer principle, or perhaps for some sort of queue or server in-between the writer and the events table. However, do we really care that much?

And this is the beauty of it! What matters the most is for there to be an event with each update. As long as they are consistently added, namely each update happens - according to its timestamp - one after another, it would still work.

This is exemplified below, where table B is seemingly updated 2 hours before table A, yet the event for table A came first. Note that for each table, all their events do make sense chronologically, i.e. top to bottom.

Logs

We could even not use timestamps and bypass the problem completely by providing a version number with each update. For example, this would be natively supported in Delta Lake files which use such version numbers for time travel (even though when getting to the implementation part of our design, it could become a bit tricky to get the correct version numbers from delta lake).

So the important part here is to be consistent, or otherwise put: the events should make sense chronologically.

Log

We mentioned how these events would function as a log of data changes. While it should not replace the actual system logs, it would make sense to even provide additional metadata about the data changes. We could store code version numbers, system details, maybe even information about the data itself. This highly depends on how the processing is done and after all: each system owns their events and so can add whatever is useful.

It could even be used to trigger warnings. How many here are either swamped by countless warnings and alerts, or perhaps the opposite, have none whatsoever? One could have a reader/subscriber simply poll the table for missing events on a schedule and if something is missing, trigger an alert. This is particularly relevant when downstream readers rely on those events for orchestration. One could go even further to analyze the duration to process data from A to B (within the system), maybe notice trends, or treat it as a regression analysis with each code change.

The list could go on.

Code

OK no more sales. Let’s see if we could even implement something like this.

We start by implementing the two sides of this event table. We use Python for example purposes. First the writer:

def writer():
    # read, process, write
    send_event()

It is important to send the event after updating the data to avoid situations when just the event-sending fails. In such a failure scenario, of course one needs to be able to recover, e.g. by sending just the event, or re-triggering the whole processing_step (while making sure the writing step won’t create duplicated data). Wrapping these 2 steps in a transaction would perhaps be even better but more often than not it will not be possible.

Now the reader:

def reader():
    read_latest_markers()
    # read, process, write
    update_latest_markers()

The same pattern here, where the ‘bookkeeping’ step is done after the actual processing, s.t. in case the writing succeeded but updating the markers did not, one should still be able to easily recover.

Notice how this is really only the processing step. We have externalized the orchestration part, so the actual component which checks if we need to update. We will get to that shortly.

Wait, what are these markers though?

Well we should not process the same events multiple times. Once we know we have updated our table up to the events at marker X, keep track of that marker such that we can filter only for events after it. The classic option is a timestamp, such as updated_at, or one could use version numbers, or perhaps other metadata to track these changes. Recall these events should make sense chronologically.

The markers are also highly relevant when processing increments or deltas. Implementation may vary, however the assumption here is that data flows into the same table with each update and one can filter for each update via some arguments; a data slice if you will.

The system doing the reading can choose to store this wherever they see fit. Recall we are decoupled from the writing system!

So these markers need to coincide with the timestamps or versions used in the events, right?

Yes, well spotted! They do need to correspond. Let’s take the example before.

If we are in team gamma owning table D, we care about the updates in tables A and C. We need slices of data based on some markers; where do we get that? From the events themselves! Teams alpha and beta publish events showing the timestamps when A and C are updated, and we use exactly those timestamps to keep track of the latest slices we processed while updating table D.

Is it time to trigger?

Now for the fun part. How do we check if we should trigger a downstream job?

This could be implemented on a schedule, an always running job, or even via a webhook. Depends on factors such as the frequency of updates, the number of tables, etc. The part we focus on here though, is how to deduce from this events table that something should be triggered.

Naive Solution

Recall each table update should produce an event. The bare minimum would be to attach a timestamp to it, such that we end up with something such as:

table_name,timestamp
purchases,2024-01-01T00:00:00
num_purchases_per_day,2024-01-01T00:02:00

Let’s assume these 2 tables are produced by the same system, where num_purchases_per_day logically depends on purchases, i.e. the former is derived from the latter. We now have another system which needs to use both of these tables to create a report for business, say daily_summary containing the number of purchases along with the most expensive purchase per day.

Use your favorite analogy here, for example this could be a medallion architecture, where one system or team handles data up to silver, and another one builds the downstream gold tables.

To deduce whether we need to trigger an update on this downstream table, we need to know 2 things:

  • which tables it depends on, here both purchases and num_purchases_per_day, and
  • when is the last time we updated our table.

Using polars in these code snippets due to its concise syntax and other cool features. The full (runnable) code will be available here.

from datetime import datetime
from typing import Iterable

import polars as pl

def is_time_to_trigger(events, last_processed_ts, dependencies) -> bool:
    df = events.filter(pl.col("table_name").is_in(dependencies))
    df = df.filter(pl.col("timestamp") > last_processed_ts)

    return not df.is_empty()

So all we do is filter these events for new events, i.e. > last_processed_ts, and for the tables (dependencies) that we care about.

So why is this naive? LGTM!

It might be enough :) However recall these tables are logically connected. What happens if we trigger after the first one but before the second one is updated? Is it really ok to trigger twice? It is pretty wasteful! Depending on the computation, it might even lead to incorrect results to read these two tables at different points in time.

Let’s try to fix that.

Propagation

The problem is two-folded:

  1. the trigger should only happen after the two tables have been updated, but also
  2. the processing of this update should not be reading anything additional that might have arrived in the meantime.

Point 2 can be easily solved by adding an upper bound when reading, e.g. select * where updated_at <= {timestamp}. Of course, this means the timestamp in the event should be >= to the updated_at in the data itself, but also < than any subsequent updated_at data that might come in.

So let’s focus on point 1. Here we essentially need a way to tell whether a data change has reached its downstream tables; even if nothing new was written! This last part is important, otherwise we can’t tell whether the data change reached the downstream tables or failed somewhere in the middle.

Didn’t you say each event represents “this table got updated at time X”? This would no longer be true!

Oops, indeed. If we still followed that approach, then there would be no event. This means our events need to in fact represent “this table might have new data since time X”. Alternatively one could send and event with some extra metadata saying “this table has no new data”.

Let’s choose the first option. Consider situations where data is upserted; one doesn’t really know if something was written unless querying the data post-write, or pre-computing the delta pre-write. Let’s not be wasteful!

We shall call this data flow propagation.

So we need to extend our events a little. Software engineers here are surely aware of correlation ids and their usage to track the context of a request in a larger system. Here we are going to track the data change as it flows through data sets.

Add this information in our events:

table_name,timestamp,correlation_id
purchases,2024-01-01T00:00:00,68cdb6c2-fc90-4a4f-b3d0-e2a06f9b9a94
num_purchases_per_day,2024-01-01T00:02:00,68cdb6c2-fc90-4a4f-b3d0-e2a06f9b9a94
purchases,2024-02-01T00:00:00,40eab005-6d5b-4400-b5c2-62fd7e02c858

Note the partially propagated event coming in just ‘now’ in purchases, as shown by the last event. We then update our is_time_to_trigger function:

+ df = keep_only_complete_propagation(df)

And implement (naively) the new function:

def keep_only_complete_propagation(events: pl.DataFrame) -> pl.DataFrame:
    df = (
        events
        .with_columns(pl.len().over(pl.col("correlation_id")).alias("count"))
        .filter(pl.col("count") == pl.max("count"))
        .drop("count")
    )

    return df

Wait, naively? Again?

Let’s see if this is enough by going through some scenarios.

Scenarios

What if the update did not write anything new to the second table?

This is ok, since we’d still have an event, and we know there might be new data.

What if the update actually failed to propagate to all the tables?

Well this means there was no trigger. This is again correct. As soon as the issue is fixed, the event should come in which would lead to the trigger happening. Hopefully there is proper monitoring in place to alert the teams in such scenarios. Recall these alerts could even be derived from this events table.

What if an update failed to propagate, however another subsequent update came in which succeeded?

Perhaps this should not be permitted at all? Hmm sounds tricky to solve.

Is our downstream system capable of handling backfills during processing? If yes, then there is no problem, since recall we are dealing with data slices and we could backfill the missing slice later, when the source fixes the issue (and sends the events).

However, if not, then our keep_only_complete_propagation function should return False if it detects any incomplete propagation. This information would be easily obtainable by flipping the condition of our filter. For the purpose of this blogpost, will not implement this here, as there are other more pressing issues to discuss.

What if the source system now updates more tables for the same correlation id?

So the source system now produces a different amount of tables in the same flow. Two sub-problems here: more tables, or fewer.

GoT - fewer

If there are more, that’s no problem; we still only filter the events for the tables we care about.

For fewer, then there would be no trigger anymore as it will never fully propagate. The only real solution here is to inform the downstream system about it, such that it updates its dependencies. Due to the implications of such a change, this seems like a perfectly valid approach. Automating this does not make much sense, since likely the downstream system now either needs to fetch the same data elsewhere, or differently, or even decommission it too.

What if the downstream system changes its upstream dependencies?

Again, either more tables, or fewer. In both cases however, there is no problem. Our code would still simply filter for the updated list of table dependencies from now on.

Wew, this was a lot.

So many things can change. Maybe this could serve as an example of the challenges faced by data engineers in comparison to other specializations, such as software engineering or data analytics. But I digress.

More complex systems

Notice something we missed?

Umm does that correlation id really pass through all tables?

Precisely. In an ideal software engineering world, perhaps it would. It is kind of the idea behind microservices, that each system would be small enough to tackle only specific use cases. This however does not really fit here. There are many data pipelines, likely going through different flows in the same system.

Looking back at our is_time_to_trigger function, we can clearly see how we assume that all datasets follow the same pipeline, meaning the correlation id will reach all the tables in our events. But what if that’s not the case?

Can’t we just have a different events table per pipeline?

Yes, that is one option. An alternative is for the source system to expose these pipeline ‘definitions’ and make our function a bit more clever.

Let’s try this second approach. Shouldn’t be hard to implement, right?

These pipelines could be similarly exposed via some externally-accessible table, or for our example, via code:

pipeline_to_tables = {
    "purchases": ["purchases", "num_purchases_per_day", "purchases_per_buyer"],
    "exchange_rate": ["exchange_rate"]
}
table_to_pipeline = {t: k for k, v in pipeline_to_tables.items() for t in v}

We now have an additional table in our purchases pipeline and an additional exchange_rate pipeline updating a single table.

In any case, what is clear is that this kind of data lineage needs to be available somewhere. It could easily be auto-generated too; no need to manually maintain yet another table. Callback to the previous blogposts anyone?

We are now going to use this mapping by updating our is_time_to_trigger function:

table_to_pipeline_df = pl.DataFrame({
    "table_name": table_to_pipeline.keys(),
    "pipeline": table_to_pipeline.values(),
})

def keep_only_complete_propagation(events: pl.DataFrame) -> pl.DataFrame:
    df = (
        events
        .join(table_to_pipeline_df, on="table_name", how="left")
        .with_columns(pl.len().over([pl.col("correlation_id"), pl.col("pipeline")]).alias("count"))
        .with_columns(pl.max("count").over(pl.col("pipeline")).alias("max"))
        .filter(pl.col("count") == pl.col("max"))
        .drop("pipeline", "count", "max")
    )

    return df

We have now effectively split our triggering mechanism, or partitioned if you will, per-pipeline.

Let’s go through some more scenarios. I promise, these are the last ones.

Scenarios

What if we have different trigger types?

Say, if exchange_rate updates, we need to reload, but if purchases updates, we only process the increment as an upsert?

This is clearly a valid scenario, e.g. assume that the downstream table needs to join purchases with exchange_rate; or perhaps one needs to find out what size of cluster to use. In this case, I would argue this is a problem for the actual processing function, reader in our example. We are talking now about the “how”, not “when” to process. The reader function can apply the same filtering steps, look at the remaining events, then based on some rules, infer “how” to process the data. This kind of logic can vary greatly depending on what kind of processing the readers need to do.

Wait, once we started processing we cannot change the cluster size, can we?

Indeed, this is a bit of a catch-22 situation, since once the processing started, the cluster size is likely already fixed. One cannot always rely on autoscaling to work.

I would argue that in this scenario, the reader is still the one that needs to solve it. One could introduce another very simple layer in-between the reader and the is_time_to_trigger which can deduce, given the events, what type of processing is required and trigger the reader with the appropriate arguments.

But what if the upstream data also has different processing styles?

For example, data might typically arrive in increments, but sometimes can also constitute a full reload; this can easily happen for example when fixing an upstream data issue. In this case, the events should contain some additional information. I would argue that the type of write operation suffices, such as OVERWRITE, INSERT, and UPDATE, with perhaps the last two merged into UPSERT. This is definitely a good starting point, yet again, it depends on each system and what makes sense there.

Depends

No need to force a generalization if it doesn’t make sense here. The design is meant to allow certain differences per-system when it comes to the “how”.

What if we want to wait for multiple of these unrelated tables to update before propagating this update downstream?

With our current logic, in case there are sufficient events for either of purchases or exchange_rate pipelines, our is_time_to_trigger function returns True. Suppose we don’t want that, and instead want to wait for both of these pipelines to update before triggering.

One can definitely update the function. It could be as simple as adding an if-clause to check that, for the remaining events, len(distinct(table_name)) == len(dependencies), else return False. However, does it really make sense to implement this kind of logic? Up to you to decide!

Summary

We are done!

Finally

What did we achieve though?

We introduced a design pattern that could serve as both the separator and the glue between different systems processing the same data. We prototyped it in Python with Polars and even proved our initial assumption wrong. We have identified some - heck - a lot of challenges to make this design work and proposed solutions to each of them.

We have seen that there is no clear best option to make data flow smoothly, yet it is achievable.

I would argue that this approach is quite flexible while still providing some structure and, most importantly, solving the main issue we wanted to solve: propagating data updates ASAP.

Do you think this design is worthwhile? Notice any flows flaws? Perhaps any specifics in your use case that we have completely missed here?

Do tell!

tl;dr Core Setup

Given system foo updating table A and system bar updating table B based on table A, we need:

  • a place to store the events produced by foo,
  • a place to store the event marker last read by bar,
  • foo to update the events table whenever there’s an update to A,
  • bar to keep track of the last event marker used to update B,
  • table lineage information about the source pipelines,
  • webhook or job to poll the events table.

The event table schema would then be:

table_name = obvious
marker = timestamp or version number of data change
event_marker = timestamp of event
correlation_id = unique per pipeline run
write_operation = overwrite, upsert, ...
<other context>

Addendum

Haven’t you forgotten something? What about deletes?

Guilty

Guilty as charged. This is indeed a tricky issue to solve.

One could for example declare a “delete” write operation in the same events table and inform the reader how to interpret it; this is needed since the reader would now need to behave differently for this specific write type.

Perhaps a more common alternative is handling deletes separately, such as via a second table, following a similar flow. Its schema could be as simple as primary key and deletion timestamp. This way, the reader can choose to handle it how and whenever they choose.

What if there is a bug in the data that we need to fix? Can we bypass these events?

Absolutely! The reader/subscriber just needs to isolate the processing steps from the bookkeeping/markers and trigger the processing for whichever slice of data one desires. This “fix it” flow does not pose any issue to downstream users as long as the flow of updating its own events is kept, i.e. each write still publishes a new event.

Shouldn’t events have a timestamp of their own?

That could simplify things indeed, particularly when dealing with timezones or events not published in time. So the events table will now have two marker columns: the table “updated_at” marker, plus the event “timestamp”. Downstream systems then need to keep track of the last event timestamp they processed, such that they do not reprocess the same events, and additionally the table updated_at, if processing deltas.

Hmm but how do we handle processing deltas from multiple tables?

This is particularly relevant when dealing with real big data.

Suppose we have two upstream tables, each receiving deltas, and we only want to process the deltas.

We talked about bookkeeping the “updated_at”, but that only really works for a single table. If table A is updated at time X, it does not mean table B updated at time X too; even if the events somehow manage to arrive at the same time!

So the solution here is to keep track of the “updated_at” for each table we require data slices from. The triggering logic remains the same; it is the “reader” function which deals with this complexity.

What if a system handles a LOT of different pipelines? Isn’t this event table a bottleneck?

Valid question indeed. Here we can apply well-known patterns, such as partitioning our event table or even splitting it into multiple tables, per source table or source pipeline. Anything that can aid the readers in processing these events faster or the writers in not queueing one after another.

Notice something wrong? Have an additional tip?

Contribute to the discussion here