Meditations on data processing pipelines - Part 1

Published on Jul 7, 2025

·

16 min read

Blogpost Main

Prelude

Another data pipeline post? There is no one solution to fit them all!

That might be true. What might also be true is the many ways one can make their lives harder when implementing such data processing pipelines.

And it does not take much. It is the small seemingly innocent decisions made early on that are never revisited and updated. Then things are just built on top assuming the early decisions as canon and thus unchangeable.

Someone smarter than me made that choice. It must have been thought-through!

Yet how many times are you confronted with trivial edge-cases? Something which most PR reviewers would say does not matter, either way is fine.

It’s just an if-else, no need to write a different function!

Then months, years along the way, that if-else turned into an if-elif-else, then an if-elif-else surrounded by a try/raise, then each condition body in a separate function with its own if/else statements, etc.

Simply unmaintainable, excessively complicated, untested, and with redundant/obsolete code.

Isn’t this normal? Don’t all projects end like this, even outside data engineering?

Maybe. This however does not stop one from learning from the mistakes of others. Arguably most would shrug and say that it’s just code or that data engineering is easy, it’s just moving some rows from A to B. And again, there’s truth in that! Yet there are so many subtle ways to do this badly, or say, more complicated than needed.

So the point here is to highlight some of these situations and to provide a reasonable template to use when starting a new project.

Edit: this post turned out to be insanely long, therefore it became a chain. This is part 1, with the others to follow.

In the wild

There are typically 2 approaches here: “old school” OOP, or “modern” function-oriented.

Or a mix. Spoiler: that’s how the proposal will look like!

Will use Python here, as that’s the most common (and flexible) language used. However do note that other languages might implicitly not support some of the patterns below.

OOP

Python has multiple inheritance. This allows a lot of flexibility but also a lot of complexity.

On the extreme side, one may try to generalize everything. A pipeline, a processing step, a data connection, a table, a file, interfaces for everything, etc. On top of that, use a dependency injection framework and long config files for every single thing.

Sound familiar? A bit Java-esque?

This may lead to a beautiful design in theory, but in practice I argue that in most cases it fails. Small exceptions always pile up and so does the cost of introducing a new one. A new pipeline requires a lot of code. And the cognitive demand of remembering how to configure all of these, plus the trade-offs and implications of doing it, become just too much. Even were it documented, who has the patience to read books anymore?

Configs

Say everything is abstracted such that all you need is a few lines of config to ingest a new table. Then a new requirement comes in: we need to deduplicate rows.

Fine.

{ "tableName": "foo" }

becomes

{"tableName": "foo", "dropDuplicates": false}
{"tableName": "bar", "dropDuplicates": true}

Recall that we are adding new tables which means the old config needs updating too. You might think implementing defaults would eliminate this problem, but that’s another PITA we will get into later.


Now another table actually requires dropping duplicates only based on some keys.

Sure, it now looks like:

{"tableName": "foo", "dropDuplicates": false, "keys": null}
{"tableName": "bar", "dropDuplicates": true, "keys": null}
{"tableName": "yo", "dropDuplicates": true, "keys": ["zulu"]}

OK, now another table needs primary key validation after dropping duplicates.

Uh-oh.

We already used keys. Do we modify the original keys or introduce a new one?

Meh, it’s just a name. Let’s deliver fast. Actually, let’s add those keys to all tables. Seems useful!

{"tableName": "foo", "dropDuplicates": false, "keys": null, "primaryKeys": ["pk1"]}
{"tableName": "bar", "dropDuplicates": true, "keys": null, "primaryKeys": ["pk1"]}
{"tableName": "yo", "dropDuplicates": false, "keys": ["zulu"], "primaryKeys": ["pk1"]}
{"tableName": "lo", "dropDuplicates": true, "keys": ["zulu"], "primaryKeys": ["pk1"]}

Wait, table foo first needs some string cleaning before dropping the duplicates for that to work. No problem!

{"tableName": "foo", "dropDuplicates": false, "keys": null, "primaryKeys": ["pk1"], "parseStrings": ["pk1"]}
...

Damn, but if we parse strings in bar the same way, it breaks the primary key. Hmm also in both yo and lo we kind of want to parse some string columns too, but only after checking the primary keys as we have a lot of duplicates. We also want to fail early if computing the primary keys fails.

{"tableName": "bar", "dropDuplicates": false, "keys": null, "primaryKeys": ["pk1"], "parseStrings": null, "parseStringsV2": ["pk1"], "parseStringsBeforePk": ["nope_column"]}
...

New person joins the team. We have a new table and the integer columns need to be parsed.

Oh, I see the pattern is to add a parse{Type} config. Sure, parseIntegers it is. My predecessors surely would agree. Though man, this is so much code to write. And ehm what is the difference between keys and primaryKeys?

I hope I made my point clear. This got out of hand very quickly, didn’t it? And we haven’t even looked at the code yet. Just picture implementing those if/elses for every single scenario so far!

Where did it go wrong?

But where did it go wrong?

I would say from the absolute beginning, assuming that the pipelines will all be simple and generalizable. A related point was assuming new features won’t be required or “nice to have” later; here, I mean for example the primary key check which could be part of a new “validate-your-data” initiative.

But then there were a few decision points of seemingly low importance that could have put a stop to this.

First was using keys instead of something more clear yet verbose such as dropDuplicatesKeys. That would have prevented the confusion later.

Second was arguably letting this pattern continue once noticing that processing a column, even of the same type, might differ depending on 1) the point along the pipeline, and 2) what actually to process. It’s at this point a refactor should have happened. At least a review or plan to change this if more such changes arrive.

Third was that these decisions were not documented anywhere, nor the actual “how to add a new configuration” flow. And can you imagine the number of pages to document all this?

But hey, this is just what happens with short deadlines!

Maybe. Yet again, it is enough to play through these scenarios before just implementing stuff to realize you are going on the wrong path. And seeing alternatives are exactly the point of blogposts such as this one.

Also to finish, the fourth point: in my experience this pattern of config never works. This is not to say any config is bad, but that it should be restricted to what really is general. This means anything about the processing of a table does not belong there. Details such as the source and destination of the output, or which Writer to use (dependency-injection-like), do often work well.

Mixins

Multiple-inheritance, remember?

Take the config we had above.

What if we write our pipelines such that they inherit the steps they should apply? Each class represents one step, e.g. FetchData, and each dataset simply inherits the steps it needs. This pattern is known as mixin.

So we would have a sort of base class for a dataset that is able to materialize a dataset, then we configure how to do it via Mixins. Let’s give it a try.

import abc

from typing import Any

class Fetcher(abc.ABC):
  @abc.abstractmethod
  def fetch_data(self) -> Any:
    ...


class Processor(abc.ABC):
  @abc.abstractmethod
  def process_data(self, data: Any) -> Any:
    ...


class Writer(abc.ABC):
  @abc.abstractmethod
  def write_data(self, data: Any) -> None:
    ...


class Dataset(Fetcher, Processor, Writer):
  def load(self):
    data = self.fetch_data()
    data = self.process_data(data)
    self.write_data(data)

Wait, so how would you use this?

Well, we would need to implement all those things. Then define our datasets. For example:

import polars as pl

class CsvFetcher(Fetcher):
  def fetch_data(self) -> pl.DataFrame:
    return pl.read_csv("path")


class NiceProcessor(Processor):
  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.filter(pl.col("foo") > 1)


class ParquetWriter(Writer):
  def write_data(self, data: pl.DataFrame) -> None:
    data.write_parquet("path")
class Sales(Dataset, CsvFetcher, NiceProcessor, ParquetWriter): ...

But all those things are hardcoded! Do I now need to write a different Fetcher for every single file?

No-no, let’s generalize a bit. We have two main options here: 1) class attributes or 2) object attributes. Let’s try them both.

1. Class attributes

class CsvFetcher(Fetcher):
+  path: str

  def fetch_data(self) -> pl.DataFrame:
-    return pl.read_csv("path")
+    return pl.read_csv(self.path)

class Sales(Dataset, CsvFetcher, NiceProcessor, ParquetWriter):
+  path = "source_path.csv"

Sales.load()

That… feels awkward.

Yup! There is perhaps that flashback from Programming 101 where your professor emphasized the difference between classes and objects, how classes are like templates and objects are, well, the objects built from a template. And here we are making a new template for each object! We are also kind of using self with classmethod‘s. Odd.

2. Object attributes

class CsvFetcher(Fetcher):
+  def __init__(self, path: str) -> None:
+    self.path = path
+    super().__init__(path)

  def fetch_data(self) -> pl.DataFrame:
+    return pl.read_csv("path")
+    return pl.read_csv(self.path)

class Sales(Dataset, CsvFetcher, NiceProcessor, ParquetWriter):
+  def __init__(self, path: str) -> None:
+    self.path = path
+    super().__init__(path)

Sales("source_path").load()

Seems a bit more normal, right?

But that… does not look like working code.

Yup! Would actually need to modify upstream classes to accept *args & **kwargs in their init method. Quite a bit of code. And actually, we now also need to pay attention to the inheritance order such that they match the order of the args in the init!

Ugh.

But how do they compare, you say?

Well let’s first see what we need to do to define more datasets.

Option 1:

class Purchases(Dataset, CsvFetcher, NiceProcessor, ParquetWriter):
  path = "source_path2"

Option 2:

Sales("source_path2")

Option 2 seems more natural, doesn’t it? The template is the same, so we just use a different path for this other dataset. But the naming is odd still; we are now loading purchases using a Sales class.

What about using different processing steps?

class OtherNiceProcessor(Processor):
  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.filter(pl.col("foo") > 1).filter(pl.col("bar").is_not_null())

class Stock(Dataset, CsvFetcher, OtherNiceProcessor, ParquetWriter): ...

So for both approaches we’d need a new Processor subclass. Actually, the change is pretty much identical. And when thinking about it, that would be needed whenever a change in the flow is required, be it processing, read, or writing.

What if we need a client to reach the data at source?

Hmm actually in this case, one is kinda stuck with the class approach, unless passing the endpoint in the __init__.

Option 1:

class ClientFetcher(Fetcher):
  credentials = ...
  endpoint = None

  def fetch_data(self) -> Any:
    client = init_client(self.credentials)
    return client.data(self.endpoint)

class ClientFetcherEndpoint1(ClientFetcher):
  endpoint = "endpoint1"

class ClientFetcherEndpoint2(ClientFetcher):
  endpoint = "endpoint2"

class Dataset(ClientFetcherEndpoint1, ...): ...

And option 2:

class ClientFetcher(Fetcher):
  credentials = ...

  def __init__(self, endpoint):
    self.endpoint = endpoint

  def fetch_data(self) -> Any:
    client = init_client(self.credentials)
    return client.data(self.endpoint)

class Dataset(Dataset, ClientFetcher, NiceProcessor, ParquetWriter):
  def __init__(self, endpoint):
    super().__init__(endpoint)

or is it this?

class Dataset(NiceProcessor, ClientFetcher, Dataset, ParquetWriter):
  def __init__(self, endpoint):
    super().__init__(endpoint)

So with classes only, we need a new class for each endpoint, however in the object approach, we need to pay close attention to the inheritance order. The args must be passed to the correct mixin.

Which option do you think is better?

What if we want to re-use steps?

Umm not sure actually. We had the NiceProcessor and OtherNiceProcessor which looked like:

class NiceProcessor(Processor):
  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.filter(pl.col("foo") > 1)

class OtherNiceProcessor(Processor):
  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.filter(pl.col("foo") > 1).filter(pl.col("bar").is_not_null())

and they do share functionality, here filter on the foo column.

The intuition is of course: inheritance! Let’s modify the second processor:

class OtherNiceProcessor(NiceProcessor):
  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    data = super().process_data(data)
    return data.filter(pl.col("bar").is_not_null())

Looks good, right?

Well, it seems like each tiny step would lead to a new class. So each different filter, each select, every single column processing step would lead to a new class. This is insane!

You could still generalize a bit, no? Configure the columns to filter on as a class attribute? Or I know! Have each step as different configurable class!

You mean like:

class Filter(Processor):
  filters = None

  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.filter(self.filters)

class Select(Processor):
  select_columns = None

  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    return data.select(self.select_columns)

class BetterNiceProcessor(Filter, Select):
  filters = (pl.col("foo") > 1)
  select_columns = ["foo", "bar"]

  def process_data(self, data: pl.DataFrame) -> pl.DataFrame:
    data = Filter.process_data(data)
    data = Select.process_data(data)
    return data

class Dataset(Dataset, ClientFetcher, BetterNiceProcessor, ParquetWriter):
  def __init__(self, endpoint):
    super().__init__(endpoint)

I don’t know about you but I don’t even want to imagine how this would look like when there were more processing steps, say 10. And when considering that other datasets might differ in configuring each step, which means a new processor must be written.

Just, no.

Does it really need to be this tedious?

What if we have multiple outputs?

Sometimes you end up splitting an input dataset into multiple ones; of course you don’t want to do the same work several times!

Hmm surely we only need to modify the writer, right?

Well, in the easy case yea, sure. Just specify some extra paths. But likely each output would have some different steps. How do you handle that now? Pipeline within pipeline?

Inception

And the same applies for a dataset which has multiple inputs, or say unions some datasets together. One might need to apply some processing steps to the datasets before, or after the union; there are many ways to make a DAG out if this.

This time I would not even add the code here. It is simply too intricate to set up all this using this approach.

What about adding logging?

Yes! This is where this approach shines.

class Dataset(Fetcher, Processor, Writer):
+  @log
  def load(self):
    data = self.fetch_data()
    data = self.process_data(data)
    self.write_data(data)

Where the @log decorator contains your implementation for logging. Or could add it to the individual read-process-write steps too. Nevertheless, you only do it once and it applies everywhere!

But what if we need to log the actual steps taken during processing?

Uh-oh. Well with our code, we’d need to modify every single Processor implementation; no thank you!

Isn’t there a better abstraction for the processing steps?

Summary

So what we have seen thus far is that using OOP for the read & write operations - so the IO - seems to work quite well. However, as soon as we get into the data processing part, it all goes south. And this applies to both approaches of classes or objects!

The number of inheritances is awkward, not to mention the MRO. We have to fight with potentially overlapping arguments and attribute names. Need to deal with *args & **kwargs everywhere, and so on.

It just seems that the classes here are too specific and too general at the same time. The specific steps are short and neat, but then we have the Dataset which needs to union everything together nicely for all the distinct scenarios.

So it feels like this Dataset does too many things at once, doesn’t it? Can’t we split its responsibilities?

Separated dataset & processing

What if we had wrappers for the data, and the processing happens elsewhere? So only the IO is encapsulated in this wrapper. Let’s give it a try.

class Dataset(abc.ABC):
  @abc.abstractmethod
  def read(self) -> Any:
    raise NotImplementedError

  @abc.abstractmethod
  def write(self, data: Any) -> None:
    raise NotImplementedError

class Sales(Dataset):
  def read(self) -> Any:
    return pl.read_csv("sales.csv")

  def write(self, data: Any) -> None:
    pl.write_parquet(data)

def process_sales_data(data): ...

def load_sales():
  sales = Sales()
  data = sales.fetch_data()
  data = process_sales_data(data)
  sales.write_data(data)

load_sales()

Hmm seems fine. A bit specific though. We don’t really have the nicely configurable readers and writers anymore. Though we could provide default implementations, right? For example:

class CsvReader(Fetcher):
  def read(self) -> pl.DataFrame:
    return pl.read_csv("path")

class ParquetWriter(Writer):
  def write(self, data: pl.DataFrame) -> None:
    data.write_parquet("path")

class Sales(Dataset, CsvReader, ParquetWriter): ...

Wait, but isn’t that the same as before? What does Sales now even do? And actually, what does it even mean to read the dataset here? Is it from the source or post-processing?

All valid points. It is confusing. It is also indeed very similar to the previous example, except the processing is now done in a random function. So is it even worth going any further this way? If not doing the same Processor-as-a-class approach, the only alternative is using “simple” functions.

And indeed this is where this blogpost ends. In the next post, will delve into how to use a functional approach to write pipelines, forgetting everything about OOP.

Will that be the better option?

Addendum

Our pipelines are not that complicated! We just use tool Y.

Hey, if you’ve been using tool Y for a while and see no issues with it, then by all means keep using it. Don’t change something that works well just because it might be required in a hypothetical future.

What about defaults in functions/classes aka hidden behavior?

Very good question! This is, from my point of view, another bad pattern when overused. In the examples of this blogpost, we had simple overrides of class attributes, such as file paths.

Consider however what happens when these attributes actually contain flags for processing steps taken within the flow. There might be random flags, such as drop_duplicates or db_name hidden deep in the inheritance chain. One cannot see those details unless manually ctrl-clicking the entire chain in an IDE! And oh boy, these chains can get long.

So careful with using default values when these affect the flow in such a way. If you ever wonder “does this pipeline apply X?”, then that X should be incredibly easy to find.

Can’t we just let AI pick the best design for us? #vibecode

Alright, let’s ask Gemini. I try to filter out the output keeping only the essentials.

class Config:
  def __init__(self,
    input_type: Literal['csv', 'database', 'api'],
    input_path: str = None,
    input_query: str = None,
    output_type: Literal['csv', 'database', 'api'],
    output_path: str = None,
    output_table: str = None,
    processing_steps: List[Dict[str, Any]] = None
  ):
    ...

class DataProcessor:
  def __init__(self):
    self._readers: Dict[str, Callable[..., pd.DataFrame]] = {
      'csv': self._read_csv,
      'api': self._read_api,
    }
    self._writers: Dict[str, Callable[[pd.DataFrame, ...], None]] = {
      'csv': self._write_csv,
      'database': self._write_database,
    }
    self._processors: Dict[str, Callable[[pd.DataFrame, ...], pd.DataFrame]] = {
      'clean_data': self._clean_data,
      'transform_features': self._transform_features,
      'aggregate_data': self._aggregate_data,
    }

  def run(self, config: Config) -> None:
    if config.input_type == 'csv':
      df = read_func(config.input_path)
    elif: ...

    for step in config.processing_steps:
      step_name = step.get('name')
      step_args = step.get('args', {})
      process_func = self._processors.get(step_name)
      df = process_func(df, **step_args)

    if config.output_type == 'database':
      write_func(df, config.output_table)
    elif: ...

config1 = Config(
  input_type='csv',
  input_path='input_data.csv',
  output_type='csv',
  output_path='output_data_cleaned.csv',
  processing_steps=[
    {'name': 'clean_data', 'args': {'feature_cols': ['col_x', 'col_y']}},
  ]
)

processor = DataProcessor()
processor.run(config1)

Yet another design, which doesn’t look so bad :) We see the idea of a config dict and using OOP the “proper” objects way.

The I/O is encapsulated in these if/else statements, which looks fine when there aren’t many options. Consider however when one needs to ingest from 10s of different APIs.

The processing steps are also generalized, though likely suffer from the same issue identified earlier: these can differ so much, that the mapping becomes insanely long! Lastly we also notice that the complex scenarios of union or multiple outputs cannot really be handled here.


Feel free to repeat the exercise yourself with your favorite AI. The prompt I used:

write a template of a data processing pipeline in Python; be concise, without explaining it; the template should allow different I/O options and also different processing steps, when configuring various pipelines and datasets

Notice something wrong? Have an additional tip?

Contribute to the discussion here

Want to work with me? My socials are below!