Implementing Saga 002 : Message Broker

In the earlier part of this series, we created the building blocks of our microservice – the individual services. To recollect, we build 3 services, namely, Order Service, Inventory Service, and Payment Service.

In this part, we will continue building our example Saga implementation. In previous post, we had created endpoints in Order Service to create an order. Ideally, when an Order is created, the items specified in the order, if available in the inventory, should be reserved untill the payment is completed or the order is cancelled for other reasons. Similarly, if for some reason, if the Inventory Service fails to reserve the items specified in the order, the Order Service which had previously created an Order in the pending state, should mark the order entry with appropriate state (for sake of simplicity, we will not explore retry in this example).

This kind of asynchronous communication requires implementation of a choreography bases Saga pattern, which can be build using a message broker. In our case, we will use RabiitMq as the message broker.

Setting up RabbitMq Docker Container

Let us now set up RabbitMq. We will use docker continainers for the purpose. Following is our entry in our docker compose for RabbitMq.

saga.demo.rabbitmq:
    image: "rabbitmq:3.10.6-management-alpine"
    hostname: "sagademo"
    ports:
      - 5672:5672
      - 15672:15672

    environment:
        RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-admin}
        RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS:-admin}

We have asked the docker compose to create a RabbitMq container with management plugin available. We have also set the default username/passwords. We could further dockerize our services as well. The complete docker compose file looks like the following.

version: '3.4'

services:
  saga.services.inventoryservice:
    image: ${DOCKER_REGISTRY-}sagaservicesinventoryservice
    build:
      context: .
      dockerfile: InventoryService/Dockerfile
    ports:
      - 8080:80
      - 9000:443

  saga.services.orderservice:
    image: ${DOCKER_REGISTRY-}sagaservicesorderservice
    build:
      context: .
      dockerfile: OrderService/Dockerfile
    ports:
      - 8081:80
      - 9001:443


  saga.services.paymentservice:
    image: ${DOCKER_REGISTRY-}sagaservicespaymentservice
    build:
      context: .
      dockerfile: PaymentService/Dockerfile
    ports:
      - 8082:80
      - 9002:443

  saga.demo.rabbitmq:
    image: "rabbitmq:3.10.6-management-alpine"
    hostname: "sagademo"
    ports:
      - 5672:5672
      - 15672:15672

    environment:
        RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-admin}
        RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS:-admin}

Configure RabbitMq in Order Service

Having set up our docker container for RabbitMq, it is now time to ensure our services uses it. Let us start with the Order Service. Open the appsettings.json and add the RabbitMq configuration details to it.

  "RabbitMqSettings": {
    "Uri": "sagademo",
    "UserName": "admin",
    "Password": "admin"
  }

Now in Program.cs, we will ensure the configuration is read and used for initializing the required services for consuming RabbitMq. We will use MassTransit library to interact with RabbitMq.

//set up RabbitMq
var rabbitMqSettings = builder.Configuration.GetSection(nameof(RabbitMqSettings)).Get<RabbitMqSettings>();
if (rabbitMqSettings is null) throw new Exception("Unable to find RabbitMq Settings");

builder.Services.AddMassTransit(mt => mt.AddMassTransit(x => 
{
    x.UsingRabbitMq((cntxt, cfg) => {
        cfg.Host(rabbitMqSettings.Uri, "/", c => {
            c.Username(rabbitMqSettings.UserName);
            c.Password(rabbitMqSettings.Password);
        });
    });
}));

Now that we have configured the RabbitMq services, it is time to start using it in our services. Each time an Order is placed by the Order Service, we need to raise an event (a message to the message broker) to notify dependend services of the same. Let us go ahead and define the message contract first.

For the same, we will create a separate class library, Saga.Shared.Contracts which would be shared among the services, interested in same contracts. This could be shared as a nuget package as well.

[RabbitQueue("order-creation-initiated")]
public record OrderCreationInitiated : IBaseEvent<OrderCreationInitiated>
{
    public Guid EventId { get; init; }
    public Guid OrderId { get; init; }

    public Guid CustomerId { get; init; }
    public IReadOnlyList<OrderItemEntry> OrderItems { get; init; } = null!;
}

public record OrderItemEntry
{
    public Guid ItemId { get; init; }
    public int Qty { get; init; }
}

Where IBaseEvent<T> is defined as

public interface IBaseEvent<TDerieved> where TDerieved : IBaseEvent<TDerieved>
{
    public Guid EventId { get; init; }
    public static string QueueName => typeof(TDerieved).GetCustomAttribute<RabbitQueueAttribute>()?.Name ?? string.Empty;
}

Similarly, create OrderCreationFailed and OrderCreationSucceeded contracts to be used by the services. I will leave out the details here, but you could check the entire code discussed in this blog post in my Github.

Publishing Messages

In the earlier step, we have defined the contracts. We can now proceed to publish the message OrderCreationInitiated from the Order Service when the Order is created so that the dependend services can take necessary action.

Let us head back to our OrderService and make changes to the CreateOrder() method, so that each time an order is successfully created, a message is send to the message broker.

public class OrderService : IOrderService
{
    private readonly IOrderRepository _orderRepository;
    private readonly ILogger<OrderService> _logger;
    private readonly IBus _bus;
    public OrderService(IOrderRepository orderRepository,ILogger<OrderService> logger,IBus bus)
    {
        _orderRepository = orderRepository;
        _logger = logger;
        _bus = bus;
    }
 
    public Order CreateOrder(Order order)
    {
        _logger.LogInformation("Inserting new Order");
        order.State = OrderState.Initiated;
        var currentOrder = _orderRepository.Insert(order);
        if (currentOrder.Id != default)
        {
            _bus.Publish(new OrderCreationInitiated
            {
                OrderId = currentOrder.Id,
                CustomerId = currentOrder.CustomerId,
                OrderItems = currentOrder.OrderItems.Select(x => new OrderItemEntry()
                {
                    ItemId = x.OrderItemId,
                    Qty = x.Quantity
                }).ToList().AsReadOnly()
            });
        }

        return currentOrder;
    }
}

Notice that we are using IBus instead of IPublishEndpoint. This is because of the associated scope. The IBus is a singleton and is easier to inject in the IOrderService implementation as it is also a singleton in our case. On other hand, IPublishEndpoint has a scoped lifetime and is impossible to inject in a singleton.

Consuming Message

Now that we have address the dependend/interested services who are interested in this message. In our case, the Inventory Service would be interested in the message, so that it could reserve the inventory items for the particular order. We need to first define a Consumer for consuming the message in the Inventory Service.

public class OrderCreationInitiatedConsumer : IConsumer<OrderCreationInitiated>
{
    private readonly IInventoryService _inventoryService;
    private readonly IPublishEndpoint _publishEndPoint;
    private readonly ILogger<OrderCreationInitiatedConsumer> _logger;
    public OrderCreationInitiatedConsumer([FromServices]IInventoryService inventoryService,
                                          [FromServices] IPublishEndpoint publishEndPoint,
                                          [FromServices]ILogger<OrderCreationInitiatedConsumer> logger)
    {
        _inventoryService = inventoryService;
        _publishEndPoint = publishEndPoint;
        _logger = logger;
    }
    public Task Consume(ConsumeContext<OrderCreationInitiated> context)
    {
        var orderInfo = new OrderDto
        {
            OrderId = context.Message.OrderId,
            Items = context.Message.OrderItems.ToDictionary(x => x.ItemId, y => y.Qty)
        };

        try
        {
            _inventoryService.ReserveStock(orderInfo);

            _publishEndPoint.Publish(new OrderCreationSucceeded
            {
                OrderId = context.Message.OrderId,
            });
        }
        catch(Exception ex)
        {
            _logger.LogError($"Order Creation Failed with Message [{ex.Message}]. Reverting Changes");
            _publishEndPoint.Publish(new OrderCreationFailed
            {
                OrderId = context.Message.OrderId
            });
        }

        return Task.CompletedTask;
    }


}

public class OrderCreationInitiatedConsumerDefinition : ConsumerDefinition<OrderCreationInitiatedConsumer>
{
    public OrderCreationInitiatedConsumerDefinition()
    {
        EndpointName = $"{IBaseEvent<OrderCreationInitiated>.QueueName}-inventory-service";
    }
}

As you can observe from the code the Consumer OrderCreationInitiatedConsumer, implements the mandatory IConsumer interface from MassTransist. On recieving the message, it uses the InventoryService to reservce the stock. On success, it publishes another message OrderCreationSucceeded (on failure it publishes OrderCreationFailed). This is then consumed by the Order Service to change the state of the Order it has created from Initiated to Pending.

We have also defined as OrderCreationInitiatedConsumerDefinition. The Consumer definition provides detail definition of characterstics of the Consumer. In our case, we have used it to define the RabbitMq queue name which it needs to listen to. Remember, while publishing the message, the messages are send to the exchange (in our case, we are using fan-out configuration). The messages are send then send from exchange to individual queues, with each of our services/consumers listenting to a specified queue.

But for the consumer to work, we need to ensure the pipeline knows about it. This is done by specifying it along with the code for adding the RabbitMq/Masstransist service to the pipeline.

builder.Services.AddMassTransit(mt => mt.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreationInitiatedConsumer>(typeof(OrderCreationInitiatedConsumerDefinition));

    x.UsingRabbitMq((cntxt, cfg) => {
        cfg.Host(rabbitMqSettings.Uri, "/", c => {
            c.Username(rabbitMqSettings.UserName);
            c.Password(rabbitMqSettings.Password);
        });
        cfg.ConfigureEndpoints(cntxt);
    });
}));

As seen in the code, we have specified to add a new Consumer OrderCreationInitiatedConsumer to the pipeline. We have also specified consumer definition() OrderCreationInitiatedConsumerDefinition) it should use.

The gist of messaging mechanism could be summarized as follows.

Similarly, we need to modify the OrderService to support OrderCreationSucceededConsumer and OrderCreationFailedConsumer. I am skipping the complete code here, but if you are interested in the complete source code discussed in this tutorial, you could find it in my Github

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s