Part 4 - Error Handling of Writes to External Data Stores
Part 4: Error Handling of Writes to External Data Stores
Writing to multiple data stores consistently is a critical challenge in distributed systems. Without proper handling, your data can become:
- Lost - when failures occur during processing
- Duplicated - when the same operation is performed multiple times
- Corrupted - when partial updates leave data in an inconsistent state
Traditional solutions often rely on distributed transactions, which add complexity and can impact performance. KurrentDB provides built-in features specifically designed to address these challenges in a more elegant way.
In this section, you'll explore practical failure scenarios that commonly occur when writing to external data stores like PostgreSQL, and how it can be handled with a combination of KurrentDB features and application code.
Step 9: Handle Application Outage with Checkpoints
Event processing applications like the order processor can sometimes go down while processing historic or live events from a persistent subscription. When this happens, we typically want to avoid reprocessing all previous messages for 2 main reasons:
Data duplication risks: Reprocessing messages can cause duplicate data in external systems, especially those that don't support idempotency or deduplication (such as email sending operations).
Performance concerns: Reprocessing large volumes of events can significantly impact system performance, particularly when the stream contains many events.
KurrentDB addresses this challenge using checkpoints in persistent subscriptions.
In this step, we'll examine how event processing applications can recover from and outage, and how KurrentDB's checkpoint mechanism minimizes the number of events that need to be reprocessed when it happens.
Checkpoint in Persistent Subscription
Checkpoints in persistent subscriptions are records of the last successfully processed event position, stored as events with in system streams.
They enable subscriptions to resume from their last position after interruptions (like server restarts or leader changes), though this may result in some events being received multiple times by consumers since checkpoints are written periodically rather than after every event.
Click here for more information about checkpoints in persistent subscriptions.
Run this command in the terminal to start PostgreSQL CLI:
docker exec -it postgres psql -U postgres
Run this command in Postgres CLI to list the orders that have started the order fulfillment process:
select orderid from OrderFulfillment;
You should see the same 4 orders in the table from step 8:
orderid ---------------------------------------- order-b0d1a15a21d24ffa97785ce7b345a87e order-f16847c9a2e44b23bdacc5c92b6dbb25 order-44c1c762ca1d440bb2e03a2655ba7edb order-c49064f930344a72bd6173db57e43f78
Exit Postgres CLI by running the command:
exit
Run this command in the terminal to display the current information of the consumer group:
curl -i -X GET http://localhost:2113/subscriptions/%24ce-order/fulfillment/info
You should see that 4 items have been processed by the consumer group so far:
"totalItemsProcessed": 4,
This means the consumer group has 4 acks - one for each order you queried previously.
You should also see that the last known event number is 3:
"lastKnownEventNumber": 3,
This is the event number of the last and 4th
OrderPlaced
event in$ce-order
. (3 is displayed since event numbers are zero-based).Finally, you should also see that the last checkpointed event position is 3:
"lastCheckpointedEventPosition": 3,
This means that the consumer group will resume from position 3 even if there is an application outage.
Run this command in the terminal to stop the order processor application. This simulates an application outage:
docker compose --profile app stop
Run this command in the terminal to append 2 more new
OrderPlaced
events in KurrentDB while the application is down:./scripts/3-generate-data-during-app-outage.sh
Run this command in the terminal to display the current information of the consumer group:
curl -i -X GET http://localhost:2113/subscriptions/%24ce-order/fulfillment/info
You should see that the
totalItemsProcessed
is still 4:"totalItemsProcessed": 4,
This is expected even though 2 new
OrderPlaced
events were appended previously. This is because the application is currently down.However, you should see that the
lastKnownEventNumber
is 5 instead of 3:"lastKnownEventNumber": 5,
This means the consumer group is aware that 2 more events were appended.
Finally, you should also see that the last checkpointed event position is still 3:
"lastCheckpointedEventPosition": 3,
This is because no new events have been processed yet and so the checkpoint is not updated yet.
Run this command in the terminal to stop the order processor application. This simulates an application recovery:
docker compose --profile app start
Run this command in the terminal to view the application log after the application has restarted:
docker compose --profile app logs
You should see messages that indicate that the 2 new events are created:
orderprocessor | OrderProcessor started orderprocessor | Subscribing events from stream orderprocessor | Received event #4 in $ce-order stream orderprocessor | Order fulfillment for order-36bf75fe641e453b906946ba4b5288c5 created. orderprocessor | Received event #5 in $ce-order stream orderprocessor | Order fulfillment for order-babc43583617421a90d4c7d039900142 created.
Notice how the processor received events starting from event #4 because of the previously saved checkpoint.
How often are Checkpoints Saved?
The frequency at which checkpoints are saved depends on three key configuration parameters:
- minCheckpointCount - Minimum number of events before a checkpoint may be written
- maxCheckpointCount - Maximum number of events before forcing a checkpoint
- checkPointAfterMilliseconds - Time-based threshold for checkpoint creation
For this tutorial, we've configured these parameters to their minimum values to ensure checkpoints are saved after each processed event. This makes the behavior more predictable and easier to observe.
Performance Note
While frequent checkpointing provides better recovery guarantees, it's not necessarily the best practice for production environments. Each checkpoint operation triggers a disk write, so excessive checkpointing can introduce significant performance overhead. In production, you should balance recovery needs with performance considerations.
See Step 4 for more information.
Run this command in the terminal to display the current information of the consumer group:
curl -i -X GET http://localhost:2113/subscriptions/%24ce-order/fulfillment/info
You should see that the totalItemsProcessed
is now 6 instead of 4:
"totalItemsProcessed": 6,
The lastKnownEventNumber
is still 5:
"lastKnownEventNumber": 5,
And the checkpoint has been updated to 5:
"lastCheckpointedEventPosition": 5,
- Run this command in the terminal to start PostgreSQL CLI:
docker exec -it postgres psql -U postgres
- Run this command in Postgres CLI to list the orders that have started the order fulfillment process:
select orderid from OrderFulfillment;
You should now see the 6 orders in total:
orderid
----------------------------------------
order-b0d1a15a21d24ffa97785ce7b345a87e
order-f16847c9a2e44b23bdacc5c92b6dbb25
order-44c1c762ca1d440bb2e03a2655ba7edb
order-c49064f930344a72bd6173db57e43f78
order-36bf75fe641e453b906946ba4b5288c5
order-babc43583617421a90d4c7d039900142
- Exit Postgres CLI by running the command:
exit
Step 10. Handle Transient Errors by Retrying Events
Transient errors are temporary failures that resolve themselves over time - such as database disconnections, network issues, or service restarts. When these errors occur, the best strategy is often to retry processing rather than failing permanently.
For example, if PostgreSQL becomes unavailable while the order processor is running:
- The database connection fails
- The OrderPlaced event processing throws an exception
- Without retry logic, this event would be lost
- With retry logic, processing resumes once the database recovers
Most transient errors resolve within seconds or minutes. KurrentDB's persistent subscriptions provide built-in retry capabilities, helping your system maintain data consistency during temporary outages.
In this step, you'll see how these retries prevent data loss when database connectivity is interrupted.
Run this command in the terminal to stop PostgreSQL to simulate a database outage:
docker compose --profile db stop postgres
Run this command in the terminal to append 2 new
OrderPlaced
events in KurrentDB:./scripts/4-generate-data-during-db-outage.sh
Run this command in the terminal to view the application log in follow mode:
docker compose --profile app logs -f
Wait for a few seconds and you will start to notice messages that indicate a transient error is detected and the application will retry the event:
TODO
Notice that the application retries this continuously for a while.
Press
ctrl + c
to exit follow mode.Run this command in the terminal to stop PostgreSQL to simulate database recovery:
docker compose --profile db start postgres
Run this command in the terminal to view the application log in follow mode again:
docker compose --profile app logs -f
Notice that the event processing that have been retrying continuously have now been processed.
Duplicated Messages
TODO
Press
ctrl + c
to exit follow mode.Run this command in the terminal to start PostgreSQL CLI:
docker exec -it postgres psql -U postgres
Run this command in Postgres CLI to list the orders that have started the order fulfillment process:
select orderid from OrderFulfillment;
You should now see the 8 orders in total:
orderid
----------------------------------------
order-b0d1a15a21d24ffa97785ce7b345a87e
order-f16847c9a2e44b23bdacc5c92b6dbb25
order-44c1c762ca1d440bb2e03a2655ba7edb
order-c49064f930344a72bd6173db57e43f78
order-36bf75fe641e453b906946ba4b5288c5
order-babc43583617421a90d4c7d039900142
order-3d268df88f9c451eae9cae49d10656d5
order-ad53653936ff469ea208cce8726906eb
- Exit Postgres CLI by running the command:
exit
Step 11. Examine How Transient Errors are Handled in the Codebase
Run this command in the terminal to open the main program for the order processor application:
code ./OrderProcessor/Program.cs
Locate and examine the code that handles transient errors:
catch (Exception ex) { // ------------------------------------------------------------- // // Warning: This is just one example of a transient error check // // You should to add more checks based on your needs // // ------------------------------------------------------------- // var exceptionIsTransient = // Exception is transient if it mateches one of the following patterns: ex is SocketException { SocketErrorCode: SocketError.TryAgain } || // Socket error indicating the name of the host could not be resolved (https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socketerror?view=net-9.0) ex is NpgsqlException { IsTransient: true }; // Postgres exception indicating the error is transient (https://www.npgsql.org/doc/api/Npgsql.NpgsqlException.html#Npgsql_NpgsqlException_IsTransient) if (exceptionIsTransient) // If exception is transient.. { Console.WriteLine($"Detected DB transient error {ex.Message}. Retrying."); await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, // Send a not acknowledge message to the consumer group and request it to retry "Detected DB transient error", e); Thread.Sleep(1000); // Wait for a second before retrying to avoid overwhelming the database }
To handle transient errors in exception, you can:
- Identify a list of errors that are recoverable
- Catch these errors and call
subscription.Nack()
to send a not acknowledge message to KurrentDB with theRetry
flag
With this approach, KurrentDB will re-send the event again for a configured number of time (defined by
maxRetryCount
). If the error is recovered before this, then the processor will successfully handle this error. Otherwise, KurrentDB will park this event.Parking Events in Persistent Subscription
While an event is being retried in the persistent subscription, other events in its stream will be stuck and not pushed to the processor until the event is ack'd, skipped, or parked.
When an event is parked, it is stored in KurrentDB for future play back while other events in the consumer group can be unstuck and pushed.
Click here for more information about parking.
Dangers of Setting a High
maxRetryCount
ConfigurationThe
maxRetryCount
configuration of consumer group sets the number of times it should retry an event when it is instructed. While a highmaxRetryCount
may increase the chance for a transient error to recover while it says waits for server to recover. On the other hand, it can also increase the load on the server that may already be under distress with high load, making it more difficult to recover.You should ensure that
maxRetryCount
is set appropriately so that it potentially overload a recovering server.
Step 12. Handle Permanent Errors by Skipping Events
Event processors sometimes encounter permanent errors that cannot be resolved through retries. These unrecoverable errors typically result from:
- Malformed events with structural or syntactical problems
- Bugs in the processor code that require deployment of fixes
When these permanent errors occur, continuous retrying is futile and blocks subsequent events in the stream from being processed.
One solution is to skip the problematic event, allowing the processor to continue with other events in the stream.
In this step, you will find out how to detect and skip events that trigger permanent errors.
Info
Events can also be "parked" instead of skipped, allowing them to be replayed later.
This step focuses only on skipping for simplicity.
Click here for more information about parking events.
Run this command in the terminal to generate an invalid
OrderPlaced
event:curl -i -X POST \ -H "Content-Type: application/vnd.eventstore.events+json" \ http://localhost:2113/streams/order-b3f2d72c-e008-44ec-a612-5f7fbe9c9240 \ -d ' [ { "eventId": "fbf4a1a1-b4a3-4dfe-a01f-ec52c34e16e4", "eventType": "order-placed", "data": { "thisEvent": "is invalid" } } ]'
Run this command in the terminal to view the application log of the order processor application:
docker compose --profile app logs
You should see a new log message that indicate a permanent error has been detected and the event has been skipped.
orderprocessor | Received event #8 in $ce-order stream orderprocessor | Detected permanent error System.ArgumentException: Order ID cannot be null or empty (Parameter 'orderId')
Step 13. Examine How Permanent Errors are Handled in the Codebase
Run this command in the terminal to open the main program for the order processor application:
code ./OrderProcessor/Program.cs
Locate and examine the code that handles permanent errors:
catch (Exception ex) { if (exceptionIsTransient) // If exception is transient.. { ... } else // If exception is not transient (i.e. permanent).. { Console.WriteLine($"Detected permanent error {ex}. Skipping."); await subscription.Nack(PersistentSubscriptionNakEventAction.Skip, // Send a not acknowledge message to the consumer group and request it to skip "Detected permanent error", e); }
Errors not classified as transient are considered permanent errors. To handle these unrecoverable situations call
subscription.Nack()
with thePersistentSubscriptionNakEventAction.Skip
flag.This instructs the consumer group to skip processing the problematic event and deliver the next available event in the stream.