In this article, we provide a technical guide on implementing the Saga Pattern using Rebus and RabbitMQ in a .NET environment.
Rebus is a .NET library designed to create distributed applications with messaging capabilities, simplifying the process of sending and receiving messages. Check out our detailed article for a deeper understanding of Rebus and its implementation.
The Saga Pattern is a design pattern we can use to manage long-running business transactions by coordinating distributed services. If you are unfamiliar with this pattern, we strongly recommend reading our comprehensive guide to learn more about the Saga Pattern and its applications.
Let’s get started.
Setting Up the Project to Implement the Saga Pattern Using Rebus
Before we jump into the implementation, let’s prepare our project.
Installing RabbitMQ
First, let’s use Docker to run RabbitMQ locally:
docker run -d --hostname rabbit-broker --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
We use the docker run
command to create a new Docker container.
The rabbitmq:3-management
image includes a UI accessible on the port 15672
. Additionally, we map the port 5672
as the default communication port for RabbitMQ.
As a result, we can now access the management UI by navigating to localhost:15672
in our browser and logging in with the default credentials guest/guest
.
Installing Rebus
Now, let’s install the necessary Rebus packages:
dotnet add package Rebus dotnet add package Rebus.RabbitMq dotnet add package Rebus.ServiceProvider
These commands install the core Rebus package, the RabbitMQ transport package, and the service provider package to integrate Rebus with the .NET dependency injection.
After that, in the Program
class, let’s register Rebus in the service container:
builder.Services.AddRebus(configure => configure .Routing(routing => routing.TypeBased().MapAssemblyOf<Program>("Rebus.OrderQueue")) .Transport(transport => transport.UseRabbitMq( builder.Configuration.GetConnectionString("RabbitMq"), "Rebus.OrderQueue")) .Sagas(saga => saga.StoreInMemory()) );
Here, we set the routing to use type-based routing and map all message types in the same assembly as the Program
class to the Rebus.OrderQueue
queue. This ensures that messages of specific types are routed to the designated queue.
Next, we set RabbitMQ as the transport layer. We specify the connection string for RabbitMQ and set Rebus.OrderQueue
as the queue name.
Finally, we set the saga storage to use in-memory storage. In-memory storage is useful for development and testing but we should replace it with a persistent storage solution for production environments.
Implementing the Saga Pattern Using Rebus
With RabbitMQ and Rebus in place, we’re ready to implement the saga.
Defining Saga Data
First, let’s define the saga data:
public class OrderSagaData : ISagaData { public Guid Id { get; set; } public int Revision { get; set; } public Guid OrderId { get; set; } public bool IsOrderPlaced { get; set; } public bool IsPaymentProcessed { get; set; } public bool IsOrderShipped { get; set; } }
The OrderSagaData
class implements the ISagaData
interface and we use it to persist information about our order throughout the saga.
The OrderId
property uniquely identifies the order and we will use it for correlating messages, while IsOrderPlaced
, IsPaymentProcessed
, and IsOrderShipped
track the status of the order processing steps.
Additionally, the ISagaData
interface also adds Id
and Revision
properties, which Rebus uses internally.
Commands and Events in the Saga Pattern Using Rebus
Now, let’s define messages for our saga:
public class PlaceOrderCommand { public Guid OrderId { get; set; } } public class ProcessPaymentCommand { public Guid OrderId { get; set; } } public class ShipOrderCommand { public Guid OrderId { get; set; } } public class OrderShippedEvent { public Guid OrderId { get; set; } }
Here, we create the PlaceOrderCommand
, ProcessPaymentCommand
, ShipOrderCommand
and OrderShippedEvent
classes.
To summarize, we can define two message types when implementing the Saga Pattern using Rebus: commands and events.
Commands tell components what action to take while events notify the Saga about a completed process.
In comparison to NServiceBus there are no corresponding interfaces like ICommand
or IEvent
that distinguish them.
Instead, the difference is in the naming convention, commands use verbs and imperatives while events use past tense.
Implementing the Saga
With our saga data and messages ready, we can now create handlers to process them.
First, let’s define a new class:
public class OrderSaga : Saga<OrderSagaData>, IAmInitiatedBy<PlaceOrderCommand>, IHandleMessages<ProcessPaymentCommand>, IHandleMessages<ShipOrderCommand>, IHandleMessages<OrderShippedEvent> { private IBus _bus; private readonly IOrderRepository _orderRepository; public OrderSaga(IBus bus, IOrderRepository orderRepository) { _bus = bus; _orderRepository = orderRepository; } protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config) { config.Correlate<PlaceOrderCommand>(m => m.OrderId, d => d.OrderId); config.Correlate<ProcessPaymentCommand>(m => m.OrderId, d => d.OrderId); config.Correlate<ShipOrderCommand>(m => m.OrderId, d => d.OrderId); config.Correlate<OrderShippedEvent>(m => m.OrderId, d => d.OrderId); } }
We start by defining a new OrderSaga
class that derives from the Saga<OrderSagaData>
class.
Next, we implement the IAmInitiatedBy<PlaceOrderCommand>
interface. This indicates that when our service receives a PlaceOrderCommand
message and no existing saga is running, Rebus should create a new saga.
We also implement the IHandleMessages<T>
interface for each message type.
Next, in the class constructor, we inject instances of the IBus
and IOrderRepository
interfaces. We will use IBus
instance to send messages out of our handlers.
Lastly, we override the CorrelateMessages()
method from the base Saga<T>
class, specifying how to match a message with the corresponding saga data.
Implementing Message Handlers in the Saga Pattern Using Rebus
Next, let’s define handlers for all of our messages:
public Task Handle(PlaceOrderCommand message) { Data.OrderId = message.OrderId; Data.IsOrderPlaced = true; _orderRepository.AddOrder(new() { OrderId = message.OrderId, Status = OrderStatus.Placed }); return Task.CompletedTask; } public async Task Handle(ProcessPaymentCommand message) { Data.IsPaymentProcessed = true; var order = _orderRepository.GetOrderById(message.OrderId); order.Status = OrderStatus.Processing; await _bus.Send(new ShipOrderCommand { OrderId = Data.OrderId }); } public async Task Handle(ShipOrderCommand message) { Data.IsOrderShipped = true; await _bus.Send(new OrderShippedEvent { OrderId = Data.OrderId }); } public Task Handle(OrderShippedEvent message) { var order = _orderRepository.GetOrderById(message.OrderId); order.Status = OrderStatus.Completed; MarkAsComplete(); return Task.CompletedTask; }
We create a Handle()
method for each message type. These methods will handle incoming PlaceOrderCommand
, ProcessPaymentCommand
, ShipOrderCommand
, and OrderShippedEvent
messages and implement the necessary business logic for each step of the saga.
These methods use the _orderRepository
to interact with the order data and _bus
to send messages. For now, it’s sufficient for the IOrderRepository
interface implementation to use a Dictionary<Guid, Order>
field for internal data storage. However, in a production environment, we should replace it with database persistence and make the interface methods asynchronous.
In the handler for the OrderShippedEvent
message we are additionally calling the MarkAsComplete()
method. It is important to remember to call it when our saga is completed as it indicates that the saga data can be safely deleted.
However, like in the case of NServiceBus, this action is irreversible, so we must call it only when we are sure the saga data will not be needed.
Testing the Implementation Using Rebus.TestHelpers
Lastly, let’s write unit tests to verify our implementation.
Thankfully, Rebus provides us with Rebus.TestHelpers
package that makes testing our sagas much easier.
First, let’s install it in our test project:
dotnet add package Rebus.TestHelpers
Testing the Saga Data State
Now we have access to the FakeBus
and SagaFixture
classes, so let’s use them in our tests:
[Test] public void WhenPlaceOrderCommandIsReceived_ThenSagaDataIsInitialized() { var busMock = new FakeBus(); using var fixture = SagaFixture.For(() => new OrderSaga(busMock, _repositoryMock)); fixture.Deliver(new PlaceOrderCommand { OrderId = _orderId }); var data = fixture.Data .OfType<OrderSagaData>() .FirstOrDefault(); Assert.That(data, Is.Not.Null); Assert.That(data.OrderId, Is.EqualTo(_orderId)); }
Here, we verify that the saga data is in the expected state after the handler execution.
First, we create an instance of the FakeBus
class. In short, the FakeBus
class simulates the behavior of the actual IBus
interface without requiring a live message broker like RabbitMQ.
Then, we initialize a new SagaFixture
instance for OrderSaga
class, passing the bus mock, and a repository mock as parameters.
After that, we use the Deliver()
method to dispatch a PlaceOrderCommand
message. Next, we retrieve the saga data from the fixture using the Data
property and select the first matching data instance.
Finally, we assert that the saga data is not null and that the OrderId
in the saga matches the expected _orderId
.
Testing Message Dispatching
Now, let’s define another test to check if our handler dispatches another message:
[Test] public void WhenProcessPaymentCommandIsReceived_ThenShipOrderCommandIsSend() { var busMock = new FakeBus(); using var fixture = SagaFixture.For(() => new OrderSaga(busMock, _repositoryMock)); fixture.Add(new OrderSagaData { OrderId = _orderId }); fixture.Deliver(new ProcessPaymentCommand { OrderId = _orderId }); var command = busMock.Events .OfType<MessageSent<ShipOrderCommand>>() .Single() .CommandMessage; Assert.That(command, Is.Not.Null); Assert.That(command.OrderId, Is.EqualTo(_orderId)); }
In the same fashion as before, we create the FakeBus
instance and use it to create a SagaFixture
of OrderSaga
.
Since the message handler we test expects the saga data to exist, we inject a new OrderSagaData
into the fixture using the Add()
method.
Next, we dispatch a ProcessPaymentCommand
message using the Deliver()
method.
Afterward, we retrieve the history of the events from the fixture using the Events
property. We look for an instance of MessageSent<T>
class, where T
is the expected message type.
Once found, we extract the message using the CommandMessage
property.
Finally, we assert that the command is not null and that the OrderId
matches the expected _orderId
.
Conclusion
In summary, we now know how to implement the Saga Pattern using Rebus and RabbitMQ.
By following this guide, we can now leverage the power of the Saga Pattern to coordinate complex workflows and ensure data consistency across distributed systems. Implementing this pattern with Rebus and RabbitMQ provides a robust solution for managing long-running transactions.