Meditations on data processing pipelines - Part 3

Published on Sep 1, 2025

·

17 min read

Blogpost Main

Prelude

We tried implementing data processing pipelines in pure OOP and pure FOP. Both approaches worked but not without significant downsides. Ok, maybe Functional-Oriented Programming had fewer of those, but still.

Here we will try to get the best out of both approaches.

A mix

So what were the good parts of each approach?

In OOP, we saw that structure in the code became rather enforced and this came with a lack of flexibility. Adding new data ingestions became in principle simple and streamlined; same with changes that affect all pipelines. However as soon as we encountered an edge case, it all crumbled like a house of cards.

In FOP, it was the opposite. One could do virtually anything. However structure became the responsibility of the developers; all developers, across time. Unlike OOP, it did not require much initial thought, hence allowed for what business likes most: speed. Changes could easily be implemented, however without the developer maintaining a reasonable structure, it quickly led to spaghetti code; of many types!

So before trying to union these approaches, let’s review what this design really depends on.

Considerations

These are details that should be considered when starting a project with data processing pipelines.

  • Programming language and its ecosystem. All the examples we have seen so far are in Python, which provides certain perks, such as multiple-inheritance. These perks affect the overall final design.

  • Expected project lifespan. Is this more of a POC, perhaps a couple of months long, or something business has already committed to long-term? With a POC, clearly one needs to provide value fast to justify the existence of such a project, which naturally leads to taking shortcuts.

  • Number of pipelines. Do we expect a lot of datasets, or just a few?

  • Number of sources and downstream users. Does data come from various sources? Is it readily available? How much effort to extract it from each place? In how many places and ways will we need to provide data?

  • Data complexity. Do we have simple tables, or also other formats? Are there many columns? What kind of update patterns happen in the source? Can we (easily) re-use code? Can we rely on a single common data format internally, e.g. a pandas DataFrame, or do we need to accommodate more?

  • Data size. Can we process everything on a single machine or do we need distributed tools?

  • Batch vs streaming. Does batch processing, even when every 10 minutes, suffice? Do we need a proper streaming solution, say processing in under a few seconds?

  • Architecture. Are we aiming for a data lake, lakehouse, or insert your favorite buzzword? Medallion architecture? Or does a simple staging + production setup suffice? This influences the amount of generalizations we need to introduce.

  • Features. Logging? Metrics? Lineage? Catalog? How important are these? Is historization / SCD2 required?

  • Processing complexity. Do we have complex steps requiring many functions, or simple a few maps and filters? Do we require materializing intermediate steps, even if just for debugging?

  • Inter-connectivity. Does the processing of a dataset rely on the processing of another, regarding scheduling? Does it rely on its own previous processing, e.g. to fetch its own last timestamp?

  • Configuration. Will this project enter a “maintenance” mode, i.e. require minimal effort to add/remove datasets? Does any configuration rely (directly) on business?


Yes.

… is the default answer. There are always unknowns.

Perhaps these questions are less relevant in the grand scheme of things. They are rather generic, aren’t they? We are an exact science, where things should be quantifiable to have meaning. And this is the curse (or blessing) of data engineering. The challenges can be so different depending on the domain and simply the environment at that point in time, be it an already established data corner or just now bringing the revolution. Yet somehow people still deliver projects and find ways to make everything work.

We should nevertheless always aim to write such pipelines in the best way, right?

Some might be thinking we could simply tl;dr here and say: choose functions! Which funny enough, would probably work in more cases than not. The point of these blogposts however, is to highlight the nuance of these approaches. Both the actual code and also the external factors.

Regarding the latter, my advice (or wish?) is that as soon as it is clear that a project is not a POC, do revisit these considerations. Take the time to prepare the codebase for the future, and it will pay off exponentially.

OOP - FOP Harmony

OK but still, what is this better combined approach? Is it actually better?

Base classes

Recall that reading and writing, so the IO, are fairly standard. One does not have so many differences there, and it looks more configuration-friendly. A more OOP approach would work well.

The processing steps however, can be very different overall and yet only slightly different between each pair of pipelines. Here we need the flexibility of functions. The question is how to bring some structure to these, and here is where some learnings from OOP may apply.

So here goes.

import abc
from typing import Any

class PipelineStep(abc.ABC):
  @abc.abstractmethod
  def run(self, *previous_output: Any) -> Any:
    raise NotImplementedError

class Input(PipelineStep): ...

class Output(PipelineStep): ...

class Pipeline:
  def __init__(self, steps: Iterable[PipelineStep]):
    assert len(steps) > 1
    self.steps = steps

  def run_step(self, step: PipelineStep, *previous_output) -> Any:
    return step.run(*previous_output)

  def run(self) -> Any:
    output_ = self.steps[0].run()
    for step in self.steps[1:]:
      output_to_pass = [output_] if not isinstance(output_, tuple) else output_
      output_ = self.run_step(step, *output_to_pass)

    return output_

Hey, this still looks very much like pure OOP!

True. However these are all the base classes; no more! We really only have two concepts: a pipeline, and a step in the pipeline. A step can be executed, even on its own, and a pipeline executes a list of steps.

A step however can include anything. This is where the functions come into play. One is free to write and re-use functions as they see fit. No restrictions.

We step away from the static pipeline-as-config approach here. We do not want to implement all possible processing steps and let the “user” - here still us - just pass arguments. So we won’t have a e.g. FilterProcessingStep auto-magically added in the Pipeline and us configuring the filters in Pipeline.__init__. If such a step is needed, it is implemented or added in a PipelineStep directly.

Why the Input & Output? They do nothing!

Can you think about why it’s important to distinguish between things interacting with a 3rd party and those that do not? We will dive into that shortly. For now, let’s try implementing these base classes for a simple pipeline.

class CsvInput(Input):
  def __init__(self, path: str):
    self.path = path

  def run(self, *previous_output) -> pl.DataFrame:
    return pl.read_csv(self.path)

class CsvOutput(Output):
  def __init__(self, path: str):
    self.path = path

  def run(self, df: pl.DataFrame, *previous_output) -> None:
    df.write_csv(self.path)

class ProcessCsv(PipelineStep):
  def run(self, df: pl.DataFrame, *previous_output) -> pl.DataFrame:
    return df.filter(...).select(...)

Pipeline([
  CsvInput("path.csv"),
  ProcessCsv(),
  CsvOutput("path_output.csv")
]).run()

So we implemented 3 steps: a read, a process, and a write. We then link them in a Pipeline object and run. This is it!

Since we instantiate the objects, we can pass exactly what each step needs. This works well for static things, such as paths or credentials. Dynamic things then can actually be implemented in two ways: by sub-classing Pipeline, or doing them within the steps and passing to the next via *args. The design here is not restrictive.

Do also note that the idea is that nothing expensive happens on object instantiation. This is important. And it is the responsibility of the dev to use this pattern responsibly. We do not connect to DBs or other 3rd parties before running the steps. We thus ensure that things remain testable without overusing mocking.

Re-using steps

class CsvSimplePipeline(Pipeline):
  def __init__(self, input_path: str, output_path: str):
    super().__init__([
      CsvInput(input_path),
      ProcessCsv(),
      CsvOutput(output_path)
    ])

CsvSimplePipeline("sales.csv", "sales_output.csv").run()
CsvSimplePipeline("purchases.csv", "purchases_output.csv").run()

Of course! Just subclass Pipeline to fix the steps, and this is it!

What about different processing? Maybe always with the same input/output pattern?

class ProcessCsvDifferently(PipelineStep):
  def run(self, df: pl.DataFrame, *previous_output) -> pl.DataFrame:
    ...

Pipeline([
  CsvInput("path1.csv"),
  ProcessCsv(),
  CsvOutput("path1_output.csv")
]).run()
Pipeline([
  CsvInput("path2.csv"),
  ProcessCsvDifferently(),
  CsvOutput("path2_output.csv")
]).run()

We can keep the ‘barebones’ approach above, using the generic Pipeline, or again generalize:

class CsvIOPipeline(Pipeline):
  def __init__(self, input_path: str, output_path: str, steps: Iterable[PipelineStep]):
    super().__init__([
      CsvInput(input_path),
      *steps,
      CsvOutput(output_path)
    ])

CsvIOPipeline([ProcessCsv()]).run()
CsvIOPipeline([ProcessCsvDifferently()]).run()

Context

So how do you pass context between steps? What if there’a lot of context? Aren’t we running into the same issue as in the OOP approach where the order of inheritance is important?

Indeed, this is a challenge. While we do use simple examples here, the recommendation is to not overuse the PipelineStep. One should not create a separate step for every simple thing, such as a single filter, a map, etc. Instead, they should be combined such that they constitute a bigger unit of work. Nevertheless, it is not a problem to have some of the steps short. The recommendation here is really to KISS.

Talking about the *args, one could group some together in a Context object. However again, one needs to be careful. The recommendation is to not just dump everything in there. One should not store data in some generic container of all config because this allows any step to modify it, which again leads to spaghetti code.

Think about Airflow tasks; tasks do not really pass data between them. The things they can pass, are simple. It is these simple things that can be grouped in a Context object, as exemplified below.

class Context: ...

class PipelineStepWithContext(PipelineStep):
  def run(self, data: Any, context: Context) -> Tuple[Any, Context]:
    raise NotImplementedError

class PipelineWithContext(Pipeline):
  def run_step(self, step: PipelineStepWithContext, data: Any, context: Context) -> Optional[Tuple[Any, Context]]:
    return super().run_step(step, data, context)

  def run(self) -> Any:
    data, context = self.steps[0].run()
    for step in self.steps[1:]:
      data, context = self.run_step(step, data, context)

    return data, context

Of course, one could also initialize the Context option in the __init__, however this again opens up the possibility of spaghetti code. The aim would be that anything stored in the context is immutable, so either return a new Context on each step, or enforce that when something is added to the context, it can no longer be modified. Similar to a dataclass, if you will. For simplicity here, the context is an empty class.

class CreateContext(PipelineStepWithContext):
  def run(self, data: Any, context: Context) -> Tuple[Any, Context]:
    return None, Context()

class CsvInputWithContext(PipelineStepWithContext):
  def __init__(self, path: str):
    self.path = path

  def run(self, data: Any, context: Context) -> Tuple[pl.DataFrame, Context]:
    context.input_path = self.path
    return pl.read_csv(self.path), context

class CsvOutputWithContext(PipelineStepWithContext):
  def run(self, data: pl.DataFrame, context: Context) -> None:
    output_path = "_".join("output", context.input_path)
    df.write_csv(output_path)

class ProcessCsvWithContext(PipelineStepWithContext):
  def run(self, data: pl.DataFrame, context: Context) -> Tuple[pl.DataFrame, Context]:
    return data.filter(...).select(...)

PipelineWithContext([
  CreateContext(),
  CsvInputWithContext("path.csv"),
  ProcessCsvWithContext(),
  CsvOutputWithContext()
]).run()

Notice how the new classes earlier on were added just to simplify yet generalize the type hints. In this latest snippet, we see an actual implementation exemplifying this behavior. We now know there is always a context object, albeit CsvInputWithContext looks rather ugly as it never receives any data but a None. The purpose here was to make it clear that the data should not be stored within the context.

One could alternatively use this Context object to actually store the pipeline configuration too. Feel free to imagine how that would look like.


There have already been quite some recommendations on how to use this pattern. Some might argue it is not good then, since the responsibility still falls on the devs. This happens indeed because the pattern is so flexible, which in turn is because pipelines can vary so much. We will revisit these in a more docstring-fashion later, and what steps one could take to enforce some of these recommendations and hence ease the burden on the dev.

Generic change

Can we add logging to every pipeline? What about to the steps? Or only to some pipelines?

Of course! One just needs to extend the base Pipeline class for the relevant pipelines. Whether this logging is applied to the pipeline as a whole, or to each step, is just a matter of choosing which Pipeline method to apply it to.

class Pipeline:
+  @log
  def run_step(self, step: PipelineStep, *previous_output) -> Any:
    ...

# or
class Pipeline:
+  @log
  def run(self) -> Any:
    ...

And of course, one could subclass Pipeline separately to add the logging, in case it should not affect all the pipelines. Then use this new subclass where needed, for example:

- class PipelineWithContext(Pipeline):
+ class PipelineWithContext(PipelineWithLogging):

No context

Can we see an example of passing things between steps though? Say, an API client? Or SparkContext!

Passing around an API client should not be required; perhaps that’s an indication that it should only be used within a PipelineStep. However SparkContext is indeed a good example. So let’s pass that and also the DataFrame, of course, for processing and writing.

class CreateSparkSession(PipelineStep):
  def run(self) -> SparkSession:
    return SparkSession.builder.appName(__name__).getOrCreate()

class ReadDf(PipelineStep):
  def run(self, spark: SparkSession) -> Tuple[SparkSession, DataFrame]:
    return spark, spark.read.csv("path.csv")

class ProcessDf(PipelineStep):
  def run(self, spark: SparkSession, df: DataFrame) -> Tuple[SparkSession, DataFrame]:
    return spark, df.filter(...)

class WriteDf(PipelineStep):
  def run(self, spark: SparkSession, df: DataFrame) -> None:
    df.write.parquet("path/to/your/output.parquet")

Pipeline([
  CreateSparkSession(),
  ReadDf(),
  ProcessDf(),
  WriteDf(),
]).run()

Arguably, the spark session did not need to be passed past the ReadDf; it could actually have been created within it! But this is just an example :)

Inputs and Outputs

So about those inputs & outputs?

Yes! If you have been reading my blog, you know I care about data lineage. Let’s say we simply want to keep track of the input and output datasets used in a pipeline. We do not need to do this at runtime - or ok, most of the time. Some of you already have flashbacks about those dynamic pipelines that have to figure out at runtime what needs to be processed. And actually that problem could be solved too with this overall design, but I digress.

Let’s say we want to see which inputs and outputs are used in a pipeline. Without diving too deep into the possibilities to achieve that, one can easily notice how figuring out whether PipelineStep reads or writes data is just a matter of sticking to the Input and Output implementations of it. So given a Pipeline object, one simply needs to check whether these super classes are present in the mro of each step of a pipeline.

pipeline = Pipeline(...)

for step in pipeline.steps:
  if Input in step.mro():
    print("found an `Input`!")

OK, OK. This indeed requires the pipeline object. It’s not a static analysis that does not affect the current codebase.

But it could be! Because we keep a clear structure, it would not actually be that hard to implement. I attempted something similar here.

And of course, one could implement a @lineage decorator in the same way @log was implemented, as exemplified when trying out OpenLineage here.

Multiple inputs & outputs

Well here it gets interesting. The simplest approach is to have multiple Input or Output in the steps, then pass them around.

There could also be the situation that some of these inputs or outputs have to go through the same processing steps. In this case one can actually do something interesting: a pipeline as a step. Here’s what I mean:

class ProcessInputInParallel(PipelineStep):
  def __init__(self, paths: str):
    self.paths = paths

  def run(self) -> pl.DataFrame:
    outputs = list()
    for path in self.paths:
      sub_pipeline = Pipeline([
        CsvInput(self.path),
        ProcessCsv()
      ])
      outputs.append(sub_pipeline.run())

    return pl.concat(outputs)

Pipeline([
  ProcessInputInParallel(["file1.csv", "file2.csv"]),
  CsvOutput("combined.csv")
]).run()

Notice how one could also dictate how those things are processed in parallel. It does not have to be a for loop. One could easily set up subprocesses, threads, or other parallel mechanisms!

It is also true that this kind of recursive-ness can make lineage more complicated to track, however due to the structure, I still argue it is simple enough.

Summary Docs

Alright so to summarize the conventions as a sort of tutorial on how to use this recommended pattern.

  • Imagine all data processing as DAGs. Nodes are a data(!) object, be it a file or in-memory, and edges are some computation applied on them. A Pipeline is naturally a 1+ edges going through the DAG and a PipelineStep is a single edge that produces some meaningful data. Do note that data here can also mean an API call, or just some unit of work using that data.

  • Stick to a naming convention. Perhaps redundant but it is easier to understand what happens in the code when Pipelines are suffixed Pipeline, and PipelineSteps are suffixed PipelineStep or even just Step. Corollary, make it obvious which steps do any IO by suffixing with Input or Output and relying on separate super classes. These could be enforced via some meta-programming tests.

  • Do not connect to any 3rd party on Pipeline and PipelineStep object instantiation. No database connections or API calls shall be made in the __init__; these belong in the run methods. One shall of course store anything needed to connect to those 3rd parties in the __init__. This could be enforced by testing with no internet connection.

  • Only pass immutable data or context between PipelineSteps. Corollary, do not store the main data in a context object. By immutable, I mean allowing data to be modified “globally”, thus affecting all steps. By main data, I mean things that can get big and/or rely on being fetched from somewhere; a hardcoded simple mapping or connection credentials that never change are fine. This one cannot really be enforced but will show up by pipelines using a lot of memory, or devs being confused as to why data changes “oddly” when switching around the PipelineSteps.

And if you think about it, even if you do not follow the pattern I propose in this blogpost, the conventions above are still useful!

Integrations

Without going too deep, it is of course useful - even required - to consider how this pattern would integrate with other tools. For example, we could have orchestration via Airflow or Dagster, lineage and catalog via OpenMetadata, monitoring with Prometheus, processing via Spark or dbt, etc. These tools may overlap or even provide similar patterns. Perhaps some of you even use Hamilton for those long and complicated data processing steps.

Any pattern you choose for the pipelines must somehow work with the tools. Though perhaps it is a bit of a chicken and the egg situation: should the tools depend on the pipelines design, or the design on the tools?

As we love to say,

It depends

So is this the final design? Is this the design to rule them all?

Ask again later ;)

Addendum

IO as OOP

You said that IO is fairly static then completely ignored this; what’s up with that?

Indeed. I left IO as edges, though one could extend this further by introducing the concept of datasets. Think of this as Dagster assets.

So for example we could have:

class Dataset(abc.ABC):
  @abc.abstractmethod
  def read(self):
    raise NotImplementedError

  @abc.abstractmethod
  def write(self):
    raise NotImplementedError

class CsvFile(Dataset): ...

class DBTable(Dataset): ...

And these datasets would be used within the PipelineStep.

The question would then be how to tackle those PipelineSteps labeled as Input or Output. Are they still useful? Imo, yes.

The choice is really based on our interpretation of the DAGs; are the 3rd party inputs and outputs Dataset nodes? Is an API endpoint a dataset?

In the context of extracting lineage, I would argue that it is easier to extract it when we start from the edges than from the nodes. Given each edge must have a start and an end, the lineage is already there. The read is in the edge. The write is in the edge. Imagine having a box of puzzle pieces that you need to put together vs lifting a bunch of bead strings.

Common data format

Pipelines indeed seem to depend on the data format passed along each step. Once we start using a DataFrame, be it from polars or Spark, we kind of stick to it. This makes it hard to migrate from one to another, and also to re-use functionality by simply running the same computations but on a different backend.

Here perhaps one should use a library like narwhals. Do check it out!

Typing

It’s true that maintaining a single super class with generic typing that is actually useful is difficult. We tried subclassing our PipelineSteps and overriding the type hints everywhere. It is very easy to break type safety without realizing. It is also very easy to just add *args and **kwargs everywhere. The responsibility of keeping this design sane belongs solely on the developer, especially when dealing with a flexible language such as Python.

Notice something wrong? Have an additional tip?

Contribute to the discussion here

Want to work with me? My socials are below!