Meditations on data processing pipelines - Part 2
Published on Aug 4, 2025
8 min read

Prelude
How to write our data process pipelines?
We talked in the previous blogpost about why this is important and experimented with OOP. Do take a look there for more context.
We now take a look at a purely functional approach.
Functions
Everything is a function. Reading from a file, fetching from an API, processing, writing, etc.
Is this going to lead to more robust pipelines?
Let’s give it a shot.
import polars as pl
def load_sales():
df = pl.read_csv("sales.csv")
df = df.filter(pl.col("foo") > 1)
df.write_parquet("path.parquet")
So concise, right? That’s all! No classes, no abstractions, nothing.
But this is not the point.
We start with the premise that we need to ingest many things.
And this is where it will become obvious.
Say we now need to ingest purchases in the exact same way.
In the OOP approach, it would be a single line, something like Purchases("purchases.csv")
.
Using functions?
def load_purchases():
df = pl.read_csv("purchases.csv")
df = df.filter(pl.col("foo") > 1)
df.write_parquet("path.parquet")
It’s basically a copy-paste, just changing the path. You sure you want to do this 100s of times?
OK, but you can generalize!
def load_table(source_path: str):
df = pl.read_csv(source_path)
df = df.filter(pl.col("foo") > 1)
df.write_parquet("path.parquet")
load_table("sales.csv")
load_table("purchases.csv")
This is all, innit?
Yes :) There is indeed beauty in simplicity.
But rarely are pipelines so easy. Let’s add complexity.
Small differences
Say purchases actually come from a db. Everything else stays the same.
Notice how our generalized function already broke?
We can no longer just use load_table
.
Easy, add a new one. You would do something similar with OOP, no?
def load_table_db_conn(conn, table_name):
df = pl.read_database(source_path, conn)
df = df.filter(pl.col("foo") > 1)
df.write_parquet("path.parquet")
def load_table_db(table_name):
conn = create_connection(...)
return load_table_db_conn(conn, table_name)
load_table_db("purchases")
Notice the dev flow.
We created a new load_table_db
function for this other read source, but then realized we need
to create a database connection first.
So we rename that into load_table_db_conn
and wrap it with the actual load_table_db
which now also creates a db connection.
We also simply copied the other steps over.
Makes sense, right?
Well, we should generalize further!
Indeed, those processing steps could also be extracted in a common function.
def process_data(df: pl.DataFrame):
df = df.filter(pl.col("foo") > 1)
df.write_parquet("path.parquet")
def load_table(source_path: str):
df = pl.read_csv(source_path)
- df = df.filter(pl.col("foo") > 1)
- df.write_parquet("path.parquet")
+ process_data(df)
def load_table_db_conn(conn, table_name):
df = pl.read_database(source_path, conn)
- df = df.filter(pl.col("foo") > 1)
- df.write_parquet("path.parquet")
+ process_data(df)
Nice!
But now I need to ingest another dataset and that one needs to output in csv format.
We took a shortcut before, didn’t we?
That process_data
function should really just process it, not also write.
Let’s fix it.
def process_data(df: pl.DataFrame):
...
- df.write_parquet("path.parquet")
+ return df
+def write_data(df: pl.DataFrame):
+ df.write_parquet("path.parquet")
def load_table(source_path: str):
...
- process_data(df)
+ df = process_data(df)
+ write_data(df)
def load_table_db_conn(conn, table_name):
...
- process_data(df)
+ df = process_data(df)
+ write_data(df)
Notice how we keep making the same change multiple times?
Both those load_table
functions had to be changed.
And didn’t we just say we want to ingest another dataset but write it in csv?
No problem, we can parametrize the
write_data
. These changes are perfectly normal.
def write_data(df: pl.DataFrame, output_format: str):
if output_format == "parquet":
df.write_parquet("path.parquet")
elif output_format == "csv":
df.write_csv("path.csv")
else:
raise ValueError("Unexpected output format!")
Deja vu? Was that maybe similar to what our AI overlord recommended in the previous post?
Wait, just noticed a bug. We always write to the same path!
Indeed, this would also need to be an argument to the function. But maybe we should instead have two different functions for writing? Let’s fix both issues in one go.
def write_parquet(df: pl.DataFrame, output_filename: str):
df.write_parquet(f"{output_filename}.parquet")
def write_csv(df: pl.DataFrame, output_filename: str):
df.write_csv(f"{output_filename}.csv")
And yet again, we need to modify our original functions and finally introduce the new function for the new ingestion that writes to a csv file.
def load_table(source_path: str):
...
- write_data(df)
+ write_parquet(df)
def load_table_db(conn, table_name):
...
- write_data(df)
+ write_parquet(df)
+def load_table_db_csv(table_name):
+ conn = create_connection(...)
+ df = pl.read_database(table_name, conn)
+ df = process_data(df)
+ write_csv(df)
+load_table_db_csv("balance")
So, now we have 3 different functions that do almost the same stuff, minus some differences?
Precisely! This is exactly the point here. There is a lot of duplication. It is becoming increasingly complicated to keep some structure. It is hard to make single changes that apply to multiple flows and it becomes naturally easy to just make another copy to make things work. And we are still at only 3 datasets!
More datasets
Let’s continue with adding more datasets. We have 3 datasets loaded in different ways, and now we simply need to add another one. We need to also write balances to parquet.
load_table("sales.csv")
load_table_db("purchases")
load_table_db_csv("balance")
+load_table_db("balance")
No problem. But we notice duplicates, so need to ensure a primary key on this table. What do we do?
We could try adding it to purchases too, out of convenience and since: why not? Except: the data volume is large. It would be very costly. Let’s just add it to this new one.
-def process_data(df: pl.DataFrame):
+def process_data(df: pl.DataFrame, drop_duplicates: bool):
df = df.filter(pl.col("foo") > 1)
+ if drop_duplicates:
+ df = df.unique()
return df
-def load_table_db_conn(conn, table_name):
+def load_table_db_conn(conn, table_name, drop_duplicates):
df = pl.read_database(source_path, conn)
- df = process_data(df)
+ df = process_data(df, drop_duplicates)
write_parquet(df)
-def load_table_db(table_name):
+def load_table_db(table_name, drop_duplicates):
conn = create_connection(...)
- return load_table_db_conn(conn, table_name)
+ return load_table_db_conn(conn, table_name, drop_duplicates)
-load_table_db("purchases")
-load_table_db("balance")
+load_table_db("purchases", drop_duplicates=False)
+load_table_db("balance", drop_duplicates=True)
Notice that we needed to update again everything s.t. this configuration trickled down to where it’s actually applied. We need to ctrl+click already through quite a few functions to understand where this is used. Have we maybe optimized prematurely?
Also, don’t you have another deja vu regarding configuration? Having to configure these processing details upfront does not seem to work well. So what do we do instead?
Duplication vs generalization
Well to improve from before, the most obvious solution is just to allow duplication.
We no longer keep a single load_table_db
function, but have a different one per dataset.
So instead of configuring the processing details upfront, such as deduplication,
we simply accept that the same code might exist multiple times.
def load_table_purchases():
conn = create_connection(...)
df = pl.read_database("purchases", conn)
df = df.filter(pl.col("foo") > 1)
write_parquet(df)
def load_table_balance():
conn = create_connection(...)
df = pl.read_database("balance", conn)
df = df.filter(pl.col("foo") > 1)
df = df.unique()
write_parquet(df)
load_table_purchases()
load_table_balance()
So we essentially regressed to the beginning by removing the generalizations we did before. We accept that some functions are simply re-used, such as creating the db connection, reading, writing…
Wait, reading and writing, so the IO, seems to be the most stable part, doesn’t it?
Indeed! If we take the IO away, our functions seem much more simple!
def process_table_purchases(df) -> pl.DataFrame:
df = df.filter(pl.col("foo") > 1)
return df
def process_table_balance(df) -> pl.DataFrame:
df = df.filter(pl.col("foo") > 1)
df = df.unique()
return df
And could still generalize the common steps, to try and maintain it a bit more structured. Keep in mind that each of these steps could be more complicated; one may apply mappings, joins, filter on many columns, etc.
def filter_foo_over_one(df) -> pl.DataFrame:
return df.filter(pl.col("foo") > 1)
def process_table_purchases(df) -> pl.DataFrame:
df = filter_foo_over_one(df)
return df
def process_table_balance(df) -> pl.DataFrame:
df = filter_foo_over_one(df)
df = df.unique()
return df
Yet how long until we reach the same cascade of functions as before, but now with plenty of duplicate code? Remember that we abstracted the IO stuff, but using functions, that will be duplicate code. Spoiler: yet maybe it could not be duplicate :)
Introducing changes everywhere
Let’s stress these functions one last time. We now need to log each ingestion. Assume we use the latest format of our code, so a function per dataset. To add logging, we could simply add a line to each of those functions; or, write it as a decorator.
+@log
def load_table_purchases():
...
+@log
def load_table_balance():
...
I say simply, I of course mean tediously. If we had 100 datasets, we would add the same line 100 times. Who knows how complicated these pipelines could, be?
Is there no better way? This seemed like a solved problem before.
Yet here is where our functions-only attempt ends. We will try to combine these approaches in a final blogpost.
Summary
What we have clearly shown is that functions are very simple to use, particularly for processing steps. However, they innately lead to a lot of duplicated code and arguably too much flexibility. Structuring the code becomes 100% the job of the developer, particularly across all developers, be it the OG’s or the ones maintaining the codebase 5 years down the line.
Indeed there is a fine line between (over)generalization and duplication. The OOP approach required thinking about the overall structure and enforcing it, while functions simply allow… anything, really. It’s a shift of responsibilities from the original developers to all the developers. They have the complete freedom to implement whichever way they seem fit, even when it achieves exactly the same goals as already implemented elsewhere. Nevertheless, things can be much easier to implement with functions, and also what business especially likes to hear: fast.
Addendum
What about the more complex scenarios, such as multiple outputs and multiple inputs?
You caught me; I did not mention these as functions would then seem too good.
They would indeed be better suited for these scenarios. We simply add the functions where needed, while in the OOP approach, recall we were rather stuck as our abstractions did not just allow it.
I will leave the actual implementation for you to try, though I am confident you would reach the same conclusion.
What does AI tell us? #vibecode
It actually returns about the same suggestion as in OOP but replacing the classes with functions. Give it a try:
write a template of a data processing pipeline in Python; be concise, without explaining it; the template should allow different I/O options and also different processing steps, when configuring various pipelines and datasets; do not use classes