Part 3 - Write to a External Data Store with Persistent Subscription
Part 3: Write to a External Data Store with Persistent Subscription
Now that the event that triggers multiple writes is appended to KurrentDB, you will learn how to handle this event to update a external data store (Postgres) in an eventually consistent way.
In this tutorial you will achieve this by running the order processor application.
This is a sample application within the order fulfillment system which listens for OrderPlaced
events with a KurrentDB persistent subscription. Whenever it receives the event, it will kickstart a order fulfillment process simulated by inserting a record in a PostgreSQL table.
Step 4: Create a KurrentDB Persistent Subscription Consumer Group
To handle the triggering event, persistent subscription consumer group is created on KurrentDB. This is allows client applications to subscribe to events from KurrentDB.
Persistent Subscription and Consumer Group
Persistent subscriptions in EventStoreDB are a type of subscription that is maintained by the server rather than the client. Unlike other subscription types, persistent subscriptions keep their position (checkpoint) on the server side, allowing them to continue from where they left off if the connection is closed or interrupted.
Consumer groups are a concept used with persistent subscriptions that enable the competing consumers pattern. All clients belonging to a single consumer group receive a portion of events, allowing for load balancing and parallel processing. Multiple consumer groups can be created for the same stream, each operating independently with their own checkpoint position maintained by the server.
Click here for more information about persistent subscription and consumer groups.
Run this command in the terminal to create the persistent subscription:
curl -i -X PUT -d $'{ "minCheckPointCount": 0, "maxCheckPointCount": 0, "resolveLinktos": true, "maxRetryCount": 100 }' \ http://localhost:2113/subscriptions/%24ce-order/fulfillment \ -H "Content-Type: application/json"
You will see the following message:
{ "correlationId": "f7aac1a3-b5ea-415c-9e3d-d54c0b465d29", "reason": "", "result": "Success", "label": "ClientMessageCompleted" }
This creates a consumer group called
fulfillment
that subscribes to the$ce-order
stream that you navigated to in the previous step.Review and adjust these settings before production
The checkpoint and retry configuration values above are for demonstration only and may not be suitable for a production use. To avoid performance bottlenecks or unexpected behavior, be sure to test and tune them based on your expected workload.
Checkpoint and Retry Configurations Explained
The configurations above determine how your consumer group processes events and manages checkpoints:
Configuration Option Explanation minCheckPointCount
The minimum number of messages that must be processed before a checkpoint may be written maxCheckPointCount
The maximum number of messages not checkpointed before forcing a checkpoint, preventing excessive event reprocessing after failures maxRetryCount
The maximum number of retries (due to timeout) before a message is considered to be parked, preventing infinite retry loops for problematic events Click here for more information about checkpoints.
Click here for more information about retries.
Step 5. Review the Consumer Group from KurrentDB Admin UI
Navigate to the KurrentDB admin UI that you have went to in the previous part in the tutorial.
Tips
If you are unsure what its URL is, you can execute the following script to find out:
./scripts/get-kurrentdb-ui-url.sh
Click the
Persistent Subscriptions
link from the top navigation bar.Click on
$ce-order
directly below theStream/Group(s)
column header in the dashboard. You will see thefulfillment
consumer group listed under$ce-order
.
Step 6. Start the Order Processor Application
When the order processor application is started, it will connect to the fulfillment
subscription created from last step and begin to process any events in the $ce-order
stream.
Run this command in the terminal to start the order processor application:
./scripts/start-app.sh
You will receive a message, like below, printed in the terminal:
All apps are running.
Run this command in the terminal to view the application log of the order processor application:
docker compose --profile app logs
You should see messages that indicate 2 order fulfillment process has been created:
orderprocessor | OrderProcessor started orderprocessor | Subscribing events from stream orderprocessor | Received event #0 in $ce-order stream orderprocessor | Order fulfillment for order-b0d1a15a21d24ffa97785ce7b345a87e created. orderprocessor | Received event #1 in $ce-order stream orderprocessor | Order fulfillment for order-f16847c9a2e44b23bdacc5c92b6dbb25 created.
Run this command in the terminal to start PostgreSQL CLI:
docker exec -it postgres psql -U postgres
You will receive a message, like below, printed in the terminal:
psql (16.8 (Debian 16.8-1.pgdg120+1)) Type "help" for help. postgres=#
Run this command in Postgres CLI to list the orders that have started the order fulfillment process:
select orderid from OrderFulfillment;
You should see 2 orders in the table and should match the orders listed in the application log.
Tips
If you're stuck with the output and can't exit, press
q
to exit. You're likely in paging mode because the output has overflowed.Exit Postgres CLI by running the command:
exit
Alternative Ways to Kickstart the Order Fulfillment Process
Inserting a record to a relational database is just one of the many ways to kickstart the order fulfillment process. Alternatively, the order processor could have also triggered this by:
- Making a REST call to the order fulfillment API
- Publishing a message to a message broker
- Sending an email to a staff to manually kickstart the process
- Appending an event to a OrderFufillment stream in KurrentDB
- etc.
For demonstration purposes in this tutorial, an insert to a table would suffice.
Step 7. Examine the Order Processor Application Codebase
Run this command in the terminal to open the main program for the order processor application:
code ./OrderProcessor/Program.cs
Most of the code snippets leveraged in this step can be found within this file.
Locate and examine the code that subscribes to stream:
await using var subscription = kurrentdb.SubscribeToStream( // Subscribe to the $ce-order stream in KurrentDB "$ce-order", "fulfillment");
A subscription is created that subscribes to events from the
$ce-order
stream via thefulfillment
consumer group.Different Types of Subscriptions
This sample uses persistent subscriptions to subscribe to events. You can also use catch-up subscriptions or connectors to achieve a similar result.
Click here for more information about catch-up subscriptions
Click here for more information about connectors
Locate and examine the code that processes each event:
await foreach (var message in subscription.Messages) // Iterate through the messages in the subscription { if (message is PersistentSubscriptionMessage.NotFound) // Skip this message if the subscription is not found { Console.WriteLine("Persistent subscription consumer group not found." + "Please recreate it."); continue; } if (message is not PersistentSubscriptionMessage.Event(var e, _)) // Skip this message if it is not an event continue; try { Console.WriteLine($"Received event #{e.Link.EventNumber} in " + // Log the event number of the event in the $ce-order stream $"{e.Link.EventStreamId} stream"); if (EventEncoder.Decode(e.Event.Data, "order-placed") // Try to deserialize the event to an OrderPlaced event is not OrderPlaced orderPlaced) // Skip this message if it is not an OrderPlaced event continue; repository.StartOrderFulfillment(orderPlaced.orderId); // Process the OrderPlaced event by inserting an order fulfillment record into Postgres await subscription.Ack(e); // Send an acknowledge message to the consumer group so that it will send the next event }
The code shows a key part of an event processing pipeline that:
Processes subscription messages from KurrentDB event store's
$ce-order
streamPerforms filtering by:
- Skipping non-event messages
- Skipping messages where the subscription isn't found
- Only processing "order-placed" events
Handles order events by:
- Deserializing the event data into an
OrderPlaced
object - Starting order fulfillment by calling repository.StartOrderFulfillment()
- Deserializing the event data into an
Acknowledges successful processing with await subscription.Ack(e) to tell the consumer group to send the next event
Tips
It is important to send an acknowledge message to the consumer group, otherwise KurrentDB will assume the consumer has not received it yet and will retry.
Run this command in the terminal to open the OrderFulfillmentRepository class used to insert an order fulfillment record in PostgreSQL:
code ./OrderProcessor/OrderFulfillmentRepository.cs
Locate and examine the code that processes each event:
public void StartOrderFulfillment(string? orderId) { if (string.IsNullOrEmpty(orderId)) throw new ArgumentException("Order ID cannot be null or empty", nameof(orderId)); var sql = @" INSERT INTO OrderFulfillment (OrderId, Status) VALUES (@OrderId, 'Started')"; try { _dataAccess.Execute(sql, new { OrderId = orderId }); Console.WriteLine($"Order fulfillment for {orderId} created."); } catch (PostgresException ex) when (ex.SqlState == "23505") // If the error is a unique violation (duplicate key).. { // then it means the order fulfillment already exists. Console.WriteLine($"Order fulfillment for {orderId} " + // Ignore the error and log a message "create request ignored. Already exists."); } }
This method inserts a new fulfillment record with 'Started' status into the database and handles duplicates by catching and ignoring PostgreSQL unique constraint violations.
Ensuring Idempotency in Persistent Subscriptions
Functions used within persistent subscription processing loops must be idempotent to prevent data duplication.
Persistent subscriptions in KurrentDB provide at-least-once delivery guarantees, which means:
- Events may be delivered and processed multiple times
- The same event might be retried after failures or connection issues
- Your processing logic must handle duplicate events gracefully
For example, the
StartOrderFulfillment()
function demonstrates proper idempotent design because:- It attempts to insert a record with a unique orderId constraint
- If the same event is processed again, the database rejects the duplicate
- The function gracefully handles this case without error or side effects
This approach ensures the same order fulfillment record won't be inserted multiple times, even when the same
OrderPlaced
event is processed repeatedly.
Step 8. Process New Events in Real-Time
In this step, you will learn how persistent subscriptions can process new events in real time.
Run this command in the terminal to append 2 more new
OrderPlaced
events in KurrentDB:./scripts/2-generate-data-to-test-real-time.sh
You will receive a message, like below, printed in the terminal:
Appended data to KurrentDB
Run this command in the terminal to view the application log of the order processor application:
docker compose --profile app logs
You should see messages that indicate 4 order fulfillment process has been created:
orderprocessor | OrderProcessor started orderprocessor | Subscribing events from stream orderprocessor | Received event #0 in $ce-order stream orderprocessor | Order fulfillment for order-b0d1a15a21d24ffa97785ce7b345a87e created. orderprocessor | Received event #1 in $ce-order stream orderprocessor | Order fulfillment for order-f16847c9a2e44b23bdacc5c92b6dbb25 created. orderprocessor | Received event #2 in $ce-order stream orderprocessor | Order fulfillment for order-44c1c762ca1d440bb2e03a2655ba7edb created. orderprocessor | Received event #3 in $ce-order stream orderprocessor | Order fulfillment for order-c49064f930344a72bd6173db57e43f78 created.
Run this command in the terminal to start PostgreSQL CLI:
docker exec -it postgres psql -U postgres
Run this command in Postgres CLI to list the orders that have started the order fulfillment process:
select orderid from OrderFulfillment;
You should see 4 orders in the table and should match the orders listed in the application log.
Built-in Real-time Processing with Persistent Subscription
Persistent subscriptions deliver events in real-time to subscribers after catching up with historical events.
Exit Postgres CLI by running the command:
exit