In this article, we provide a technical guide on implementing the Saga Pattern using Rebus and RabbitMQ in a .NET environment.

Rebus is a .NET library designed to create distributed applications with messaging capabilities, simplifying the process of sending and receiving messages. Check out our detailed article for a deeper understanding of Rebus and its implementation.

The Saga Pattern is a design pattern we can use to manage long-running business transactions by coordinating distributed services. If you are unfamiliar with this pattern, we strongly recommend reading our comprehensive guide to learn more about the Saga Pattern and its applications.

Support Code Maze on Patreon to get rid of ads and get the best discounts on our products!
Become a patron at Patreon!
To download the source code for this article, you can visit our GitHub repository.

Let’s get started.

Setting Up the Project to Implement the Saga Pattern Using Rebus

Before we jump into the implementation, let’s prepare our project.

Installing RabbitMQ

First, let’s use Docker to run RabbitMQ locally:

docker run -d --hostname rabbit-broker --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

We use the docker run command to create a new Docker container.

The rabbitmq:3-management image includes a UI accessible on the port 15672. Additionally, we map the port 5672 as the default communication port for RabbitMQ.

As a result, we can now access the management UI by navigating to localhost:15672 in our browser and logging in with the default credentials guest/guest.

Installing Rebus

Now, let’s install the necessary Rebus packages:

dotnet add package Rebus
dotnet add package Rebus.RabbitMq
dotnet add package Rebus.ServiceProvider

These commands install the core Rebus package, the RabbitMQ transport package, and the service provider package to integrate Rebus with the .NET dependency injection.

After that, in the Program class, let’s register Rebus in the service container:

builder.Services.AddRebus(configure => configure
    .Routing(routing => routing.TypeBased().MapAssemblyOf<Program>("Rebus.OrderQueue"))
    .Transport(transport => 
        transport.UseRabbitMq(
            builder.Configuration.GetConnectionString("RabbitMq"),
            "Rebus.OrderQueue"))
    .Sagas(saga => saga.StoreInMemory())
);

Here, we set the routing to use type-based routing and map all message types in the same assembly as the Program class to the Rebus.OrderQueue queue. This ensures that messages of specific types are routed to the designated queue.

Next, we set RabbitMQ as the transport layer. We specify the connection string for RabbitMQ and set Rebus.OrderQueue as the queue name.

Finally, we set the saga storage to use in-memory storage. In-memory storage is useful for development and testing but we should replace it with a persistent storage solution for production environments.

Implementing the Saga Pattern Using Rebus

With RabbitMQ and Rebus in place, we’re ready to implement the saga.

Defining Saga Data

First, let’s define the saga data:

public class OrderSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }
    public Guid OrderId { get; set; }
    public bool IsOrderPlaced { get; set; }
    public bool IsPaymentProcessed { get; set; }
    public bool IsOrderShipped { get; set; }
}

The OrderSagaData class implements the ISagaData interface and we use it to persist information about our order throughout the saga.

The OrderId property uniquely identifies the order and we will use it for correlating messages, while IsOrderPlaced, IsPaymentProcessed, and IsOrderShipped track the status of the order processing steps.

Additionally, the ISagaData interface also adds Id and Revision properties, which Rebus uses internally.

Commands and Events in the Saga Pattern Using Rebus

Now, let’s define messages for our saga:

public class PlaceOrderCommand
{
    public Guid OrderId { get; set; }
}

public class ProcessPaymentCommand
{
    public Guid OrderId { get; set; }
}

public class ShipOrderCommand
{
    public Guid OrderId { get; set; }
}

public class OrderShippedEvent
{
    public Guid OrderId { get; set; }
}

Here, we create the PlaceOrderCommand, ProcessPaymentCommand, ShipOrderCommand and OrderShippedEvent classes.

To summarize, we can define two message types when implementing the Saga Pattern using Rebus: commands and events.

Commands tell components what action to take while events notify the Saga about a completed process.

In comparison to NServiceBus there are no corresponding interfaces like ICommand or IEvent that distinguish them.

Instead, the difference is in the naming convention, commands use verbs and imperatives while events use past tense.

Implementing the Saga

With our saga data and messages ready, we can now create handlers to process them.

First, let’s define a new class:

public class OrderSaga : Saga<OrderSagaData>, 
    IAmInitiatedBy<PlaceOrderCommand>,
    IHandleMessages<ProcessPaymentCommand>, 
    IHandleMessages<ShipOrderCommand>,
    IHandleMessages<OrderShippedEvent>
{
    private IBus _bus;
    private readonly IOrderRepository _orderRepository;

    public OrderSaga(IBus bus, IOrderRepository orderRepository)
    {
        _bus = bus;
        _orderRepository = orderRepository;
    }
        
    protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config)
    {
        config.Correlate<PlaceOrderCommand>(m => m.OrderId, d => d.OrderId);
        config.Correlate<ProcessPaymentCommand>(m => m.OrderId, d => d.OrderId);
        config.Correlate<ShipOrderCommand>(m => m.OrderId, d => d.OrderId);
        config.Correlate<OrderShippedEvent>(m => m.OrderId, d => d.OrderId);
    }
}

We start by defining a new OrderSaga class that derives from the Saga<OrderSagaData> class.

Next, we implement the IAmInitiatedBy<PlaceOrderCommand> interface. This indicates that when our service receives a PlaceOrderCommand message and no existing saga is running, Rebus should create a new saga.

We also implement the IHandleMessages<T> interface for each message type.

Next, in the class constructor, we inject instances of the IBus and IOrderRepository interfaces. We will use IBus instance to send messages out of our handlers.

Lastly, we override the CorrelateMessages() method from the base Saga<T> class, specifying how to match a message with the corresponding saga data.

Implementing Message Handlers in the Saga Pattern Using Rebus

Next, let’s define handlers for all of our messages:

public Task Handle(PlaceOrderCommand message)
{
    Data.OrderId = message.OrderId;
    Data.IsOrderPlaced = true;

    _orderRepository.AddOrder(new()
    {
        OrderId = message.OrderId,
        Status = OrderStatus.Placed
    });

    return Task.CompletedTask;
}

public async Task Handle(ProcessPaymentCommand message)
{
    Data.IsPaymentProcessed = true;

    var order = _orderRepository.GetOrderById(message.OrderId);
    order.Status = OrderStatus.Processing;
        
    await _bus.Send(new ShipOrderCommand { OrderId = Data.OrderId });
}
    
public async Task Handle(ShipOrderCommand message)
{
    Data.IsOrderShipped = true;
        
    await _bus.Send(new OrderShippedEvent { OrderId = Data.OrderId });
}

public Task Handle(OrderShippedEvent message)
{
    var order = _orderRepository.GetOrderById(message.OrderId);
    order.Status = OrderStatus.Completed;
        
    MarkAsComplete();

    return Task.CompletedTask;
}

We create a Handle() method for each message type. These methods will handle incoming PlaceOrderCommand, ProcessPaymentCommand, ShipOrderCommand, and OrderShippedEvent messages and implement the necessary business logic for each step of the saga.

These methods use the _orderRepository to interact with the order data and _bus to send messages. For now, it’s sufficient for the IOrderRepository interface implementation to use a Dictionary<Guid, Order> field for internal data storage. However, in a production environment, we should replace it with database persistence and make the interface methods asynchronous.

In the handler for the OrderShippedEvent message we are additionally calling the MarkAsComplete() method. It is important to remember to call it when our saga is completed as it indicates that the saga data can be safely deleted.

However, like in the case of NServiceBus, this action is irreversible, so we must call it only when we are sure the saga data will not be needed.

Testing the Implementation Using Rebus.TestHelpers

Lastly, let’s write unit tests to verify our implementation.

Thankfully, Rebus provides us with Rebus.TestHelpers package that makes testing our sagas much easier.

First, let’s install it in our test project:

dotnet add package Rebus.TestHelpers

Testing the Saga Data State

Now we have access to the FakeBus and SagaFixture classes, so let’s use them in our tests:

[Test]
public void WhenPlaceOrderCommandIsReceived_ThenSagaDataIsInitialized()
{
    var busMock = new FakeBus();
    using var fixture = SagaFixture.For(() => new OrderSaga(busMock, _repositoryMock));

    fixture.Deliver(new PlaceOrderCommand
    {
        OrderId = _orderId
    });
    
    var data = fixture.Data
        .OfType<OrderSagaData>()
        .FirstOrDefault();

    Assert.That(data, Is.Not.Null);
    Assert.That(data.OrderId, Is.EqualTo(_orderId));
}

Here, we verify that the saga data is in the expected state after the handler execution.

First, we create an instance of the FakeBus class. In short, the FakeBus class simulates the behavior of the actual IBus interface without requiring a live message broker like RabbitMQ.

Then, we initialize a new SagaFixture instance for OrderSaga class, passing the bus mock, and a repository mock as parameters.

After that, we use the Deliver() method to dispatch a PlaceOrderCommand message. Next, we retrieve the saga data from the fixture using the Data property and select the first matching data instance.

Finally, we assert that the saga data is not null and that the OrderId in the saga matches the expected _orderId.

Testing Message Dispatching

Now, let’s define another test to check if our handler dispatches another message:

[Test]
public void WhenProcessPaymentCommandIsReceived_ThenShipOrderCommandIsSend()
{
    var busMock = new FakeBus();
    using var fixture = SagaFixture.For(() => new OrderSaga(busMock, _repositoryMock));
    fixture.Add(new OrderSagaData
    {
        OrderId = _orderId
    });

    fixture.Deliver(new ProcessPaymentCommand
    {
        OrderId = _orderId
    });

    var command = busMock.Events
        .OfType<MessageSent<ShipOrderCommand>>()
        .Single()
        .CommandMessage;

    Assert.That(command, Is.Not.Null);
    Assert.That(command.OrderId, Is.EqualTo(_orderId));
}

In the same fashion as before, we create the FakeBus instance and use it to create a SagaFixture of OrderSaga.

Since the message handler we test expects the saga data to exist, we inject a new OrderSagaData into the fixture using the Add() method.

Next, we dispatch a ProcessPaymentCommand message using the Deliver() method.

Afterward, we retrieve the history of the events from the fixture using the Events property. We look for an instance of MessageSent<T> class, where T is the expected message type.

Once found, we extract the message using the CommandMessage property.

Finally, we assert that the command is not null and that the OrderId matches the expected _orderId.

Conclusion

In summary, we now know how to implement the Saga Pattern using Rebus and RabbitMQ.

By following this guide, we can now leverage the power of the Saga Pattern to coordinate complex workflows and ensure data consistency across distributed systems. Implementing this pattern with Rebus and RabbitMQ provides a robust solution for managing long-running transactions.

Liked it? Take a second to support Code Maze on Patreon and get the ad free reading experience!
Become a patron at Patreon!