Python Transactions
Published on Sep 2, 2024
10 min read
Prelude
You have some Python functions that turn out to be related. You run them in such a sequence that if something were to go wrong, you need to rollback certain operations applied to an external system.
This is particularly common in data engineering. One might have a single pipeline updating multiple output tables, or perhaps one needs to send an event to a Kafka topic after writing new data. If something goes wrong in the middle, wouldn’t it be nice to automatically roll back the partially succeeded operations? No human intervention, no midnight call, nada. Maybe the problem was just a fluke.
This concept exists as transactions since forever, particularly in finance and databases. It just groups a number of operations into a single unit, such that either all or none may succeed. We should be able to use the same concept for arbitrary functions too.
Note that there already exist some libraries that provide this functionality, such as this and this, however I find them either too bloated, or not flexible enough. Let’s see if we could solve this differently (better?).
Design
So what do we want really?
Perhaps a decorator or a context manager; they seem like natural solutions. But how should it look like?
Maybe something like:
@transaction
def coupled_functions():
function1()
function2()
or
with transaction() as t:
function1()
function2()
Where we simply use this transaction
concept to mark the 2 operations (function1
& function2
) as a single unit.
But then how do we code the rollbacks?
If we were dealing purely with database sessions, perhaps a simple .rollback()
would suffice, however we want to be generic here.
Something like the SQLAlchemy session basics is pretty much what we need.
Rollback
How should it look though?
We could follow the rather verbose approach from here which would be similar to:
with transaction() as t:
t.run(function1, rollback_function1, *args, **kwargs)
t.run(function2, rollback_function2, *args, **kwargs)
However I find this rather cumbersome; it also requires one to declare these functions and their rollbacks within this context. Perhaps we could do something like:
@transaction(rollback_function=rollback_coupled_functions)
def coupled_functions():
function1()
function2()
But then we completely miss the connections between the functions and their respective rollbacks. Recall that we only need to roll back the operations actually applied, and not in any way, but in reverse order. For example, if we first update table A, then try to update table B which fails, we need to first revert anything that might have been written while updating table B, then roll back the update from table A.
What if we decorated each function with its rollback? Something like:
@rollback(rollback_function1)
def function1():
...
@rollback(rollback_function2)
def function2():
...
@transaction
def coupled_functions():
function1()
function2()
It looks ok, right?
These rollback-able functions could then technically be any level deep, not just within the main function (here coupled_functions
).
We shouldn’t need to worry about args & kwargs either; surely if each rollback-able function requires certain args to finish, it would not require different args to rollback.
Right?
We just need to find a way to connect these rollback
with the transaction
.
Enough talk though. Let’s write it, shall we?
Code
Recall what we want out of our transaction solution:
- to link functions to counterpart rollback functions, wherever required,
- to apply these rollback functions whenever something goes wrong,
- to apply them only for the functions which succeeded so far,
- to apply them in reverse order.
Note I choose only decorators going forward, however one could also implement transaction
as a context manager.
We follow exactly the format from the previous example, defining our transaction
and rollback
decorators.
import functools
from collections import deque
from typing import Optional, Callable
_rollback_queue: Optional[deque] = None
def transaction(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
global _rollback_queue
try:
_rollback_queue = deque()
return func(*args, **kwargs)
except Exception as e:
for rollback_fn, rollback_args, rollback_kwargs in _rollback_queue:
rollback_fn(*rollback_args, **rollback_kwargs)
raise e
finally:
_rollback_queue = None
return wrapper
def rollback(rollback_func: Callable):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
global _rollback_queue
_rollback_queue.appendleft((rollback_func, args, kwargs))
return func(*args, **kwargs)
return wrapper
return decorator
Let’s go through it.
First, we need a way to add things and pop them in reverse.
This is most naturally a stack data structure.
The only difference here is that I am using deque
(which is a double-ended queue data structure), purely s.t. I can more naturally iterate “backwards”.
We define it globally, such that both decorators could use it.
We need this stack globally, since we essentially have 2 coupled decorators that need to interact somehow.
The @transaction
identifies the beginning (and end) of a transaction, so it creates this stack, and clears it in the end.
Then during runtime, whenever a function decorated with @rollback
runs, it registers this rollback in the stack.
This way, if something goes wrong, @transaction
can simply iterate and apply the relevant rollback functions corresponding to the operations
already attempted.
We also store the original args and kwargs passed to the functions s.t. we can pass them to their rollback functions, if needed. Indeed, this means we assume the rollback function does not need anything different or additional to perform its role. Should probably not store entire DataFrame’s either, but you could :)
Wait,
global
, really? AlsoNone
as default value? What is this nonsense?!
Let me explain.
global
We need a way to link these decorators.
We are not using a construct such as the context manager, in which case the scope of this rollback_queue
would be within that context.
We need something more… global.
Let’s dispel some misconceptions though.
global
is not global to execution, just global in this file/module, which is small and doesn’t lead to spaghetti code.
By design, it is not meant to be used elsewhere; it also will not name-clash with something else in the codebase.
I further mark it as “private” via the underscore, which prevents an import *
to import it by default.
Even if explicitly imported, one needs to put in extra work to mess with it; try importing _rollback_queue
in a different file and modifying it, I dare you.
So is global really so terrible here?
None
OK the None
.
Nulls get a bad reputation mostly for being used in unexpected places.
For example, if a Purchase object is suddenly null in the middle of the program due to some bug, this will be rather frustrating to debug, as the exception
will not be able to pinpoint where it became a null, just when it was observed as a null.
However nulls, at their core, simply mean the absence of something.
Here, we use it to implicitly show that we are not in a transaction.
Is it worth creating a second global bool variable just so we can keep _rollback_queue
always initialized?
Edge Cases
Is this really it? That’s all the code?
Well, no. There are some extra details to consider.
Inception
One could place these decorators anywhere, particularly the @transaction
.
Arguably one can never foresee everything the user could do.
However, let’s tackle one specific scenario which does not really make sense: overlapping transactions.
We do not want to start another transaction while already within a transaction.
We could prevent that easily by updating our transaction wrapper:
+ class TransactionAlreadyExistsException(Exception):
+ pass
...
+ if _rollback_queue is not None:
+ raise TransactionAlreadyExistsException()
Isn’t it nice we just used None
as a “not-exists” value?
An empty queue would have been entirely plausible when initiating a second transaction before reaching any rollback-able function.
Function outside the transaction
If you paid close attention, you might have noticed a bug in our code.
Our rollback
decorator just updates the queue, without considering if the queue exists!
This is a perfect example showcasing the dangers of using nulls.
Nevertheless, we are dealing with very simple code, so let’s just fix it:
- _rollback_queue.appendleft((rollback_func, args, kwargs))
+ if _rollback_queue is not None:
+ _rollback_queue.appendleft((rollback_func, args, kwargs))
This will now make sure that one can still use the function even if not within a transaction.
Note again the explicit not None
check, as opposed to just if _rollback_queue
.
Helping the user
What is the point of code if not to be helpful? Let’s put ourselves in the shoes of those that might use it.
What if we forget to define our rollback functions and start using a transaction? Or what if we don’t realize that our rollback functions do not have the same parameters as their counterpart? While again, we cannot foresee everything a user might do, let’s make it as easy as possible.
Considering the first question, one could simply add a warning for the user to see in our finally
clause within @transaction
:
+ logger = logging.getLogger()
...
+ def warn_when_missing_rollback_decorators():
+ logger.warning("Found no function decorated with a rollback; are you sure you need a @transaction?")
...
+ if len(_rollback_queue) == 0:
+ warn_when_missing_rollback_decorators()
Now whenever someone uses @transaction
but forgets to add any @rollback
, they will see a warning the first time they run the code.
Remember someone could also mistakenly remove @rollback
without realizing the impact; arguably, this could even be considered an Exception.
So I’d say this is a worthwhile addition.
For the latter question, it is more complicated. One could for example statically analyze the code and issue similar warnings. However, is it worth spending the time to tackle this scenario? In this blogpost, we skip it.
Parallel Processing
Well, well, you found a weakness.
Let’s consider threads. As we know, threads share the memory between them; more specifically, the heap memory which stores all data structures, including our queue. This immediately makes it clear that our code won’t work. We would need to integrate some synchronization primitives and likely create a different queue per thread. This topic opens a whole can of worms, so will accept this for now as a weakness of this approach.
What about processes?
Here there’s no problem. Each process has its own stack and heap memory, so our code will work just fine. Check the repository for some example tests :)
Rollback fails
What if a rollback function itself fails?
Unfortunately there is not much we can do here. As it is, this exception would take precedence to the other one, and escalate immediately, leaving the system in an unknown state as any additional rollback functions would not be executed. This is arguably better than trying to execute the remaining rollbacks regardless. To ease debugging, further logging is required. We could extend our decorators, or the user could also include logging for this scenario in their rollback functions.
BaseException
Why did you catch
Exception
?
As opposed to a custom one or its super BaseException
?
An argument against a custom exception, is that the user would need to explicitly raise it if something went wrong. We prefer to be minimally invasive here.
BaseException
includes a few exceptions which are not really application-specific, such as KeyboardInterrupt
or SystemExit
.
We choose not to rollback in those cases to avoid, for example, being stuck with a frozen system due to a slow network.
Those are exceptions that should flow through and bypass any application workflow.
Hence, we stick to Exception
.
Overriding args/kwargs
Recall our assumption about *args
and **kwargs
: the rollback function will receive the same as its counterpart function.
What if the user does want to use different ones?
Will not make this blogpost any longer, however it would definitely be possible to extend our rollback decorator to accept custom arguments, e.g.
@rollback(rollback_function=..., args=..., kwargs=...)
At this point, however, I would argue that this might be an error in the design of the application. Consider inserting data into a table: at the minimum, you’d need the rows, and the table name. To revert this change, you’d need exactly the same things.
Addendum
Can we see the full code?
Yes, it is posted here.
What prompted you to write this blogpost?
It happened more than once that we were writing data pipelines but also the exception handling within them, only to realize that some steps are in fact coupled, and if trying to recover from a failure we’d need to either:
- check that each operation within a “unit” has succeeded before proceeding in our retry, or
- avoid doing the above by automatically rolling back, or
- write the code such that everything agrees to at-least-once processing.
Turns out not all of those options are fun to reason about :)