Event Sourcing Series Part 2: Azure Building Blocks
September 10, 2025 · 8 min read
Event Sourcing, Azure, Cosmos DB, Azure Functions, Service Bus
This is Part 2 of a 5-part series on Event Sourcing and Saga Orchestration with Azure.
Now that you've decided event sourcing is right for your domain (you did read Part 1, right?), let's build it properly with Azure services.
The Architecture Overview
┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Command │────▶│ Azure Function │────▶│ Cosmos DB │
│ (Write) │ │ (Command Handler)│ │ (Event Store) │
└─────────────┘ └─────────────────┘ └────────┬────────┘
│ Change Feed
▼
┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Query │◀────│ Azure Function │◀────│ Service Bus │
│ (Read) │ │ (Projection) │ │ (Events) │
└─────────────┘ └─────────────────┘ └─────────────────┘
Why These Services?
Cosmos DB as Event Store
- Append-only writes - Events are immutable, Cosmos handles this well
- Partition by aggregate ID - All events for an order live together
- Change Feed - Built-in event streaming without polling
- Global distribution - If you need multi-region
Azure Functions for Handlers
- Serverless - Pay per execution, scale automatically
- Triggers - HTTP for commands, Change Feed for projections
- Durable Functions - For saga orchestration (Part 4)
Service Bus for Event Distribution
- Reliable delivery - At-least-once guarantees
- Topics/Subscriptions - Multiple consumers per event
- Dead letter queues - Handle failures gracefully
Step 1: Define Your Events
Events are the heart of the system. Get these right.
// Base event class
public abstract record DomainEvent
{
public string EventId { get; init; } = Guid.NewGuid().ToString();
public string EventType => GetType().Name;
public DateTime OccurredAt { get; init; } = DateTime.UtcNow;
public int Version { get; init; }
}
// Order aggregate events
public record OrderCreated(
string OrderId,
string CustomerId,
DateTime CreatedAt
) : DomainEvent;
public record OrderItemAdded(
string OrderId,
string ProductId,
string ProductName,
decimal UnitPrice,
int Quantity
) : DomainEvent;
public record OrderItemRemoved(
string OrderId,
string ProductId
) : DomainEvent;
public record PaymentReceived(
string OrderId,
decimal Amount,
string PaymentMethod,
string TransactionId
) : DomainEvent;
public record OrderShipped(
string OrderId,
string TrackingNumber,
string Carrier,
DateTime EstimatedDelivery
) : DomainEvent;
public record OrderCancelled(
string OrderId,
string Reason,
string CancelledBy
) : DomainEvent;
Step 2: Set Up Cosmos DB Event Store
Container Configuration
// Partition key: /aggregateId
// This keeps all events for an order together
{
"id": "evt_abc123", // Unique event ID
"aggregateId": "order_456", // Partition key
"aggregateType": "Order",
"eventType": "OrderCreated",
"version": 1, // For optimistic concurrency
"occurredAt": "2025-09-10T10:30:00Z",
"data": {
"orderId": "order_456",
"customerId": "cust_789",
"createdAt": "2025-09-10T10:30:00Z"
}
}
Event Store Repository
public class CosmosEventStore : IEventStore
{
private readonly Container _container;
public CosmosEventStore(CosmosClient client, string databaseName)
{
_container = client.GetContainer(databaseName, "events");
}
public async Task<IEnumerable<DomainEvent>> GetEventsAsync(
string aggregateId,
CancellationToken ct = default)
{
var query = new QueryDefinition(
"SELECT * FROM c WHERE c.aggregateId = @id ORDER BY c.version")
.WithParameter("@id", aggregateId);
var events = new List<DomainEvent>();
using var iterator = _container.GetItemQueryIterator<EventDocument>(query);
while (iterator.HasMoreResults)
{
var response = await iterator.ReadNextAsync(ct);
events.AddRange(response.Select(doc => doc.ToDomainEvent()));
}
return events;
}
public async Task AppendEventsAsync(
string aggregateId,
string aggregateType,
IEnumerable<DomainEvent> events,
int expectedVersion,
CancellationToken ct = default)
{
var batch = _container.CreateTransactionalBatch(
new PartitionKey(aggregateId));
var version = expectedVersion;
foreach (var evt in events)
{
version++;
var document = new EventDocument
{
Id = evt.EventId,
AggregateId = aggregateId,
AggregateType = aggregateType,
EventType = evt.EventType,
Version = version,
OccurredAt = evt.OccurredAt,
Data = evt
};
batch.CreateItem(document);
}
var response = await batch.ExecuteAsync(ct);
if (!response.IsSuccessStatusCode)
{
// Version conflict = optimistic concurrency failure
if (response.StatusCode == HttpStatusCode.Conflict)
{
throw new ConcurrencyException(
$"Aggregate {aggregateId} was modified by another process");
}
throw new EventStoreException(
$"Failed to append events: {response.StatusCode}");
}
}
}
Step 3: Build Your Aggregate
The aggregate reconstructs itself from events:
public class Order
{
public string Id { get; private set; }
public string CustomerId { get; private set; }
public OrderStatus Status { get; private set; }
public List<OrderItem> Items { get; private set; } = new();
public decimal Total => Items.Sum(i => i.UnitPrice * i.Quantity);
public int Version { get; private set; }
private readonly List<DomainEvent> _uncommittedEvents = new();
public IReadOnlyList<DomainEvent> UncommittedEvents => _uncommittedEvents;
// Reconstruct from events
public static Order FromEvents(IEnumerable<DomainEvent> events)
{
var order = new Order();
foreach (var evt in events)
{
order.Apply(evt);
order.Version++;
}
return order;
}
// Command: Create new order
public static Order Create(string orderId, string customerId)
{
var order = new Order();
order.RaiseEvent(new OrderCreated(orderId, customerId, DateTime.UtcNow));
return order;
}
// Command: Add item
public void AddItem(string productId, string productName, decimal price, int qty)
{
if (Status != OrderStatus.Draft)
throw new InvalidOperationException("Cannot modify a submitted order");
RaiseEvent(new OrderItemAdded(Id, productId, productName, price, qty));
}
// Command: Submit order
public void SubmitForPayment()
{
if (!Items.Any())
throw new InvalidOperationException("Cannot submit empty order");
RaiseEvent(new OrderSubmitted(Id, Total, DateTime.UtcNow));
}
// Apply events to state
private void Apply(DomainEvent evt)
{
switch (evt)
{
case OrderCreated e:
Id = e.OrderId;
CustomerId = e.CustomerId;
Status = OrderStatus.Draft;
break;
case OrderItemAdded e:
Items.Add(new OrderItem(e.ProductId, e.ProductName, e.UnitPrice, e.Quantity));
break;
case OrderItemRemoved e:
Items.RemoveAll(i => i.ProductId == e.ProductId);
break;
case PaymentReceived e:
Status = OrderStatus.Paid;
break;
case OrderShipped e:
Status = OrderStatus.Shipped;
break;
case OrderCancelled e:
Status = OrderStatus.Cancelled;
break;
}
}
private void RaiseEvent(DomainEvent evt)
{
Apply(evt);
_uncommittedEvents.Add(evt);
}
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
}
Step 4: Command Handler (Azure Function)
public class OrderCommandHandler
{
private readonly IEventStore _eventStore;
private readonly IServiceBusPublisher _publisher;
public OrderCommandHandler(IEventStore eventStore, IServiceBusPublisher publisher)
{
_eventStore = eventStore;
_publisher = publisher;
}
[Function("CreateOrder")]
public async Task<IActionResult> CreateOrder(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest req)
{
var command = await req.ReadFromJsonAsync<CreateOrderCommand>();
// Create aggregate
var order = Order.Create(Guid.NewGuid().ToString(), command.CustomerId);
// Add initial items
foreach (var item in command.Items)
{
order.AddItem(item.ProductId, item.ProductName, item.Price, item.Quantity);
}
// Persist events
await _eventStore.AppendEventsAsync(
order.Id,
"Order",
order.UncommittedEvents,
expectedVersion: 0);
// Publish events to Service Bus
foreach (var evt in order.UncommittedEvents)
{
await _publisher.PublishAsync(evt);
}
return new OkObjectResult(new { OrderId = order.Id });
}
[Function("AddItemToOrder")]
public async Task<IActionResult> AddItem(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orders/{orderId}/items")]
HttpRequest req,
string orderId)
{
var command = await req.ReadFromJsonAsync<AddItemCommand>();
// Load aggregate from events
var events = await _eventStore.GetEventsAsync(orderId);
var order = Order.FromEvents(events);
// Execute command
order.AddItem(command.ProductId, command.ProductName, command.Price, command.Quantity);
// Persist new events
await _eventStore.AppendEventsAsync(
orderId,
"Order",
order.UncommittedEvents,
expectedVersion: order.Version);
// Publish events
foreach (var evt in order.UncommittedEvents)
{
await _publisher.PublishAsync(evt);
}
return new OkResult();
}
}
Step 5: Projections with Change Feed
The Change Feed automatically captures new events. Build read models from them:
public class OrderProjectionHandler
{
private readonly IOrderReadModelRepository _readRepo;
[Function("ProjectOrders")]
public async Task Run(
[CosmosDBTrigger(
databaseName: "eventstore",
containerName: "events",
Connection = "CosmosConnection",
LeaseContainerName = "leases",
CreateLeaseContainerIfNotExists = true)]
IReadOnlyList<EventDocument> events,
ILogger log)
{
foreach (var eventDoc in events)
{
await ProjectEvent(eventDoc);
}
}
private async Task ProjectEvent(EventDocument doc)
{
switch (doc.EventType)
{
case "OrderCreated":
var created = doc.Data.Deserialize<OrderCreated>();
await _readRepo.CreateAsync(new OrderReadModel
{
Id = created.OrderId,
CustomerId = created.CustomerId,
Status = "Draft",
CreatedAt = created.CreatedAt,
ItemCount = 0,
Total = 0
});
break;
case "OrderItemAdded":
var itemAdded = doc.Data.Deserialize<OrderItemAdded>();
await _readRepo.UpdateAsync(itemAdded.OrderId, order =>
{
order.ItemCount++;
order.Total += itemAdded.UnitPrice * itemAdded.Quantity;
});
break;
case "OrderShipped":
var shipped = doc.Data.Deserialize<OrderShipped>();
await _readRepo.UpdateAsync(shipped.OrderId, order =>
{
order.Status = "Shipped";
order.TrackingNumber = shipped.TrackingNumber;
order.ShippedAt = DateTime.UtcNow;
});
break;
}
}
}
Step 6: Service Bus for Event Distribution
Multiple services need to react to events. Use Service Bus topics:
public class ServiceBusPublisher : IServiceBusPublisher
{
private readonly ServiceBusSender _sender;
public ServiceBusPublisher(ServiceBusClient client)
{
_sender = client.CreateSender("domain-events");
}
public async Task PublishAsync(DomainEvent evt)
{
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(evt))
{
MessageId = evt.EventId,
Subject = evt.EventType,
ContentType = "application/json",
ApplicationProperties =
{
["EventType"] = evt.EventType,
["OccurredAt"] = evt.OccurredAt.ToString("O")
}
};
await _sender.SendMessageAsync(message);
}
}
Subscribers listen on their own subscriptions:
[Function("InventoryEventHandler")]
public async Task HandleInventoryEvents(
[ServiceBusTrigger("domain-events", "inventory-subscription", Connection = "ServiceBus")]
ServiceBusReceivedMessage message,
ILogger log)
{
var eventType = message.ApplicationProperties["EventType"].ToString();
if (eventType == "OrderShipped")
{
var evt = JsonSerializer.Deserialize<OrderShipped>(message.Body);
await _inventoryService.ReserveStock(evt.OrderId);
}
}
Key Takeaways
- Cosmos DB - Partition by aggregate ID, use Change Feed for projections
- Events are immutable - Never update, only append
- Optimistic concurrency - Version numbers prevent conflicts
- Projections are separate - Read models built from events asynchronously
- Service Bus - Decouple event consumers with topics/subscriptions
Coming Up Next
In Part 3, we'll tackle the hard problem: what happens when a business process spans multiple aggregates? That's where Saga orchestration comes in.
This is Part 2 of a 5-part series on Event Sourcing and Saga Orchestration:
- Part 1: The Honest Truth About Event Sourcing
- Part 2: Event Sourcing with Azure - Building Blocks (You are here)
- Part 3: Saga Orchestration - Distributed Transactions Done Right
- Part 4: Implementing a Saga Orchestrator with Azure Durable Functions
- Part 5: Putting It All Together - Interview-Ready Knowledge