When we need multi-source data integration, Strawberry Shake allows us to wire up GraphQL with Subscriptions to implement a pub-sub system.

To download the source code for this article, you can visit our GitHub repository.

For instructions on creating a Strawberry Shake GraphQL client to communicate using the GraphQL API of our Strawberry Shake series, check out the previous article.

Multi-Source Data Integration Requirements

When expanding our Strawberry Shake GraphQL project with multi-source data, we must consider the best way to manage it and keep its integrity. Thanks to Strawberry Shake’s Subscriptions, we can have a robust and scalable pub-sub system as our solution.

Support Code Maze on Patreon to get rid of ads and get the best discounts on our products!
Become a patron at Patreon!

First, we have to define the requirements for our sources and pub-sub system. For our example, the two events will represent the data sources. The client ShippingContainerClient subscribes to these events in API. The first onShippingContainerAdded event signals that a new available container has appeared. The second onShippingContainerSpaceChanged event produces a notification of the changed parameters of the containers. Thus, our GetShippingContainers data is always updated. 

Now that we have our plan, we can start implementing it.

Add Real-Time Functionality to GraphQL Server

Subscriptions in GraphQL are reactive APIs like a stream of query responses. WebSockets represent a default implementation of a transport protocol for a subscription. Protocols like SignalR and gRPC are worth considering depending on the goals, but they are not available at the time of writing this article.

To add subscriptions to the GraphQL server we update the request pipeline to use WebSockets and add subscriptions pub-sub system.

Let’s add WebSockets to the server in Program class:

app.UseWebSockets();

It is important to note that we should add this middleware before GraphQL.

Now, it’s time to add InMemory subscriptions:

builder.Services
    .AddGraphQLServer()
    .AddQueryType<Query>()
    .AddMutationType<Mutation>()
    .AddInMemorySubscriptions();

In this case, the AddInMemorySubscriptions() method adds an in-memory pub-sub system for GraphQL subscriptions to our schema.

Create Subscriptions

Let’s create a new Subscription class with the required events:

public class Subscriptions
{
    [Subscribe]
    [Topic]
    public async Task<ShippingContainer> OnShippingContainerAddedAsync(
        [EventMessage] string shippingContainerId,
        [Service] ApplicationDbContext dbContext,
        CancellationToken cancellationToken) => await dbContext.ShippingContainers
            .FirstAsync(x => x.Id == shippingContainerId, cancellationToken);

    [Subscribe]
    [Topic]
    public async Task<ShippingContainer> OnShippingContainerSpaceChangedAsync(
        [EventMessage] string shippingContainerName,
        [Service] ApplicationDbContext dbContext,
        CancellationToken cancellationToken) => await dbContext.ShippingContainers
            .FirstAsync(x => x.Name == shippingContainerName, cancellationToken);
}

Our first subscription OnShippingContainerAddedAsync event sends a message with the newly added container. While OnShippingContainerSpaceChangedAsync event sends information with updated container volume.

We inject the dbContext directly into methods instead of the constructor. In the GraphQL Strawberry Shake framework, injecting services directly into resolver methods using [Service] is recommended over constructor injection because it optimizes resolver execution and simplifies refactoring. Constructor injection should be avoided as it turns GraphQL type definitions into singletons, leading to potential synchronization issues during request execution.

To register subscriptions, we need to add them to the schema builder in Program class:

.AddSubscriptionType<Subscriptions>()

Create Mutations

Let’s create a new Mutation class with AddAvaliableShippingContainerAsync() and UpdateShippingContainerAsync() methods. They will be used for sending data to subscriptions. AddAvaliableShippingContainerAsync() creates a ShippingContainer object from the input, saves it to the database, and sends an event using eventSender. If an exception occurs, it logs the error and returns null:

public async Task<AddShippingContainerPayload> AddAvaliableShippingContainerAsync(
    AddShippingContainerInput input,
    [Service] ApplicationDbContext dbContext,
    [Service] ITopicEventSender eventSender,
    CancellationToken cancellationToken)
{
    AddShippingContainerPayload payload;
    try
    {
        var shippingContainer = new ShippingContainer
        {
            Id = dbContext.ShippingContainers.Count().ToString(),
            Name = input.Name,
            Space = new ShippingContainer.AvailableSpace
            {
                Length = input.Length,
                Width = input.Width,
                Height = input.Height
            }
        };

        dbContext.ShippingContainers.Add(shippingContainer);

        await dbContext.SaveChangesAsync(cancellationToken);
        await eventSender.SendAsync(
            nameof(Subscriptions.OnShippingContainerAddedAsync),
            shippingContainer.Id,
            cancellationToken);

        payload = new AddShippingContainerPayload(shippingContainer);
    }
    catch (Exception exception)
    {
        Console.WriteLine(exception.Message);
        return null!;
    }

    return payload;
}

In the same way, let’s create UpdateShippingContainerAsync() method that updates the dimensions of an existing shipping container and notifies subscribers via an event. It retrieves the ShippingContainer object by its name, updates its dimensions, saves the changes to the database, and sends an event using eventSender. If an exception occurs, it logs the error and returns null:

public async Task<UpdateShippingContainerPayload> UpdateShippingContainerAsync(
    UpdateShippingContainerInput input,
    [Service] ApplicationDbContext dbContext,
    [Service] ITopicEventSender eventSender,
    CancellationToken cancellationToken)
{
    UpdateShippingContainerPayload payload;
    try
    {
        var shippingContainer = await dbContext.ShippingContainers.FirstAsync(x => x.Name == input.Name,
            cancellationToken);
        shippingContainer!.Space!.Length = input.Length;
        shippingContainer!.Space!.Width = input.Width;
        shippingContainer!.Space!.Height = input.Height;

        dbContext.ShippingContainers.Update(shippingContainer);
        await dbContext.SaveChangesAsync(cancellationToken);
        await eventSender.SendAsync(
            nameof(Subscriptions.OnShippingContainerSpaceChangedAsync),
            shippingContainer.Name,
            cancellationToken);

        payload = new UpdateShippingContainerPayload(shippingContainer);
    }
    catch (Exception exception)
    {
        Console.WriteLine(exception.Message);
        return null!;
    }

    return payload;
}

After we save the new ShippingContainer to the database, we use the ITopicEventSender interface to send a new message.

In the same way, let’s send an event whenever we update the container size in the UpdateShippingContainerAsync() method:

await eventSender.SendAsync(
    nameof(Subscriptions.OnShippingContainerSpaceChangedAsync),
    shippingContainer.Name,
    cancellationToken);

Now, it is time to add subscriptions to the GraphQL client schema.

Strawberry Shake Subscriptions

Strawberry Shake client subscribes to a stream of events instead of one request and updates a store dynamically for each event.

Let’s run our server and create a new GraphQL schema for the client:

dotnet graphql init http://localhost:5299/graphql/ -n ShippingContainerSubClient -p .\GraphQLStrawberryShakeSubs

Further, let’s add a subscription file for StrawberryShake, created by the .NET GraphQL tool, when a newly available shipping container appears:

subscription OnContainerAdded {
  onShippingContainerAdded {
    name
    space {
      length
      width
      height
      volume
    }
  }
}

Identically, we create an updated space subscription in the shipping container:

subscription OnContainerSpaceChanged {
  onShippingContainerSpaceChanged {
    name
    space {
      volume
    }
  }
}

These subscriptions will tie up to previously created mutations.

Configuring Transport Settings

Analogous to the GraphQL server, let’s add the StrawberryShake.Transport.WebSockets package to the GraphQLStrawberryShakeSubs project. We have to build our GraphQL client to kick off the code generation.

At this point, we are ready to register WebSockets in transport settings:

builder.Services
    .AddShippingContainerSubClient()
    .ConfigureHttpClient(client => client.BaseAddress = new Uri(graphQLUrl))
    .ConfigureWebSocketClient(client => client.Uri = new Uri(wsGraphQLUrl));

The added method ConfigureWebSocketClient() allows us to configure the WebSockets client for GraphQL subscriptions.

Subscribe to the GraphQL Response Stream

At last, we are ready to create added and updated subscription methods to our ShippingContainerController. Two endpoints use Strawberry Shake subscriptions to handle real-time updates about shipping containers:

[HttpGet("added")]
public async Task<IActionResult> OnContainerAdded()
{
    var name = "";
    var tcs = new TaskCompletionSource<string>();
    IDisposable subscription = null!;

    subscription = client.OnContainerAdded
        .Watch()
        .Subscribe(result =>
        {
            result.EnsureNoErrors();
            name = result.Data!.OnShippingContainerAdded.Name;
            subscription?.Dispose();
            tcs.SetResult(name);
        });

    name = await tcs.Task;

    return Ok(JsonSerializer.Serialize(name));
}

The OnContainerAdded() method subscribes to the OnContainerAdded() event, waits for a new container to be added, captures its name, and returns it. Now let’s create another subscription for updated container:

[HttpGet("updated")]
public async Task<IActionResult> OnContainerSpaceChanged()
{
    var tcs = new TaskCompletionSource<(string, double)>();
    IDisposable subscription = null!;

    subscription = client.OnContainerSpaceChanged
        .Watch()
        .Subscribe(result =>
        {
            result.EnsureNoErrors();

            tcs.SetResult((result.Data!.OnShippingContainerSpaceChanged.Name,
                result.Data!.OnShippingContainerSpaceChanged.Space!.Volume));
            subscription?.Dispose();
        });

    (var name, var volume) = await tcs.Task;

    return Ok(JsonSerializer.Serialize(new { name, volume }));
}

The OnContainerSpaceChanged() method subscribes to the OnContainerSpaceChanged() event, waits for a container space change, captures the container’s name and new volume, and returns them as a JSON response.

It must be remembered that the subscription variable is an IDisposable. Hence, whenever we call the Dispose() method the subscription will stop receiving events.

Conclusion

To sum it up, after the second article of our Strawberry Shake series, we now know how to create subscriptions on the server and client sides using the WebSockets transport protocol to implement multi-source data integration.

We hope that this tutorial shows something new. For any questions, please leave them in the comments section below, and make sure to check out our next article.

Liked it? Take a second to support Code Maze on Patreon and get the ad free reading experience!
Become a patron at Patreon!