When we need multi-source data integration, Strawberry Shake allows us to wire up GraphQL with Subscriptions to implement a pub-sub system.
For instructions on creating a Strawberry Shake GraphQL client to communicate using the GraphQL API of our Strawberry Shake series, check out the previous article.
Multi-Source Data Integration Requirements
When expanding our Strawberry Shake GraphQL project with multi-source data, we must consider the best way to manage it and keep its integrity. Thanks to Strawberry Shake’s Subscriptions, we can have a robust and scalable pub-sub system as our solution.
First, we have to define the requirements for our sources and pub-sub system. For our example, the two events will represent the data sources. The client ShippingContainerClient
subscribes to these events in API. The first onShippingContainerAdded
event signals that a new available container has appeared. The second onShippingContainerSpaceChanged
event produces a notification of the changed parameters of the containers. Thus, our GetShippingContainers
data is always updated.
Now that we have our plan, we can start implementing it.
Add Real-Time Functionality to GraphQL Server
Subscriptions in GraphQL are reactive APIs like a stream of query responses. WebSockets represent a default implementation of a transport protocol for a subscription. Protocols like SignalR and gRPC are worth considering depending on the goals, but they are not available at the time of writing this article.
To add subscriptions to the GraphQL server we update the request pipeline to use WebSockets and add subscriptions pub-sub system.
Let’s add WebSockets to the server in Program
class:
app.UseWebSockets();
It is important to note that we should add this middleware before GraphQL.
Now, it’s time to add InMemory
subscriptions:
builder.Services .AddGraphQLServer() .AddQueryType<Query>() .AddMutationType<Mutation>() .AddInMemorySubscriptions();
In this case, the AddInMemorySubscriptions()
method adds an in-memory pub-sub system for GraphQL subscriptions to our schema.
Create Subscriptions
Let’s create a new Subscription
class with the required events:
public class Subscriptions { [Subscribe] [Topic] public async Task<ShippingContainer> OnShippingContainerAddedAsync( [EventMessage] string shippingContainerId, [Service] ApplicationDbContext dbContext, CancellationToken cancellationToken) => await dbContext.ShippingContainers .FirstAsync(x => x.Id == shippingContainerId, cancellationToken); [Subscribe] [Topic] public async Task<ShippingContainer> OnShippingContainerSpaceChangedAsync( [EventMessage] string shippingContainerName, [Service] ApplicationDbContext dbContext, CancellationToken cancellationToken) => await dbContext.ShippingContainers .FirstAsync(x => x.Name == shippingContainerName, cancellationToken); }
Our first subscription OnShippingContainerAddedAsync
event sends a message with the newly added container. While OnShippingContainerSpaceChangedAsync
event sends information with updated container volume.
We inject the dbContext
directly into methods instead of the constructor. In the GraphQL Strawberry Shake framework, injecting services directly into resolver methods using [Service]
is recommended over constructor injection because it optimizes resolver execution and simplifies refactoring. Constructor injection should be avoided as it turns GraphQL type definitions into singletons, leading to potential synchronization issues during request execution.
To register subscriptions, we need to add them to the schema builder in Program
class:
.AddSubscriptionType<Subscriptions>()
Create Mutations
Let’s create a new Mutation
class with AddAvaliableShippingContainerAsync()
and UpdateShippingContainerAsync()
methods. They will be used for sending data to subscriptions. AddAvaliableShippingContainerAsync()
creates a ShippingContainer
object from the input, saves it to the database, and sends an event using eventSender
. If an exception occurs, it logs the error and returns null
:
public async Task<AddShippingContainerPayload> AddAvaliableShippingContainerAsync( AddShippingContainerInput input, [Service] ApplicationDbContext dbContext, [Service] ITopicEventSender eventSender, CancellationToken cancellationToken) { AddShippingContainerPayload payload; try { var shippingContainer = new ShippingContainer { Id = dbContext.ShippingContainers.Count().ToString(), Name = input.Name, Space = new ShippingContainer.AvailableSpace { Length = input.Length, Width = input.Width, Height = input.Height } }; dbContext.ShippingContainers.Add(shippingContainer); await dbContext.SaveChangesAsync(cancellationToken); await eventSender.SendAsync( nameof(Subscriptions.OnShippingContainerAddedAsync), shippingContainer.Id, cancellationToken); payload = new AddShippingContainerPayload(shippingContainer); } catch (Exception exception) { Console.WriteLine(exception.Message); return null!; } return payload; }
In the same way, let’s create UpdateShippingContainerAsync()
method that updates the dimensions of an existing shipping container and notifies subscribers via an event. It retrieves the ShippingContainer
object by its name, updates its dimensions, saves the changes to the database, and sends an event using eventSender
. If an exception occurs, it logs the error and returns null
:
public async Task<UpdateShippingContainerPayload> UpdateShippingContainerAsync( UpdateShippingContainerInput input, [Service] ApplicationDbContext dbContext, [Service] ITopicEventSender eventSender, CancellationToken cancellationToken) { UpdateShippingContainerPayload payload; try { var shippingContainer = await dbContext.ShippingContainers.FirstAsync(x => x.Name == input.Name, cancellationToken); shippingContainer!.Space!.Length = input.Length; shippingContainer!.Space!.Width = input.Width; shippingContainer!.Space!.Height = input.Height; dbContext.ShippingContainers.Update(shippingContainer); await dbContext.SaveChangesAsync(cancellationToken); await eventSender.SendAsync( nameof(Subscriptions.OnShippingContainerSpaceChangedAsync), shippingContainer.Name, cancellationToken); payload = new UpdateShippingContainerPayload(shippingContainer); } catch (Exception exception) { Console.WriteLine(exception.Message); return null!; } return payload; }
After we save the new ShippingContainer
to the database, we use the ITopicEventSender
interface to send a new message.
In the same way, let’s send an event whenever we update the container size in the UpdateShippingContainerAsync()
method:
await eventSender.SendAsync( nameof(Subscriptions.OnShippingContainerSpaceChangedAsync), shippingContainer.Name, cancellationToken);
Now, it is time to add subscriptions to the GraphQL client schema.
Strawberry Shake Subscriptions
Strawberry Shake client subscribes to a stream of events instead of one request and updates a store dynamically for each event.
Let’s run our server and create a new GraphQL schema for the client:
dotnet graphql init http://localhost:5299/graphql/ -n ShippingContainerSubClient -p .\GraphQLStrawberryShakeSubs
Further, let’s add a subscription file for StrawberryShake, created by the .NET GraphQL tool, when a newly available shipping container appears:
subscription OnContainerAdded { onShippingContainerAdded { name space { length width height volume } } }
Identically, we create an updated space subscription in the shipping container:
subscription OnContainerSpaceChanged { onShippingContainerSpaceChanged { name space { volume } } }
These subscriptions will tie up to previously created mutations.
Configuring Transport Settings
Analogous to the GraphQL server, let’s add the StrawberryShake.Transport.WebSockets
package to the GraphQLStrawberryShakeSubs
project. We have to build our GraphQL client to kick off the code generation.
At this point, we are ready to register WebSockets in transport settings:
builder.Services .AddShippingContainerSubClient() .ConfigureHttpClient(client => client.BaseAddress = new Uri(graphQLUrl)) .ConfigureWebSocketClient(client => client.Uri = new Uri(wsGraphQLUrl));
The added method ConfigureWebSocketClient()
allows us to configure the WebSockets client for GraphQL subscriptions.
Subscribe to the GraphQL Response Stream
At last, we are ready to create added
and updated
subscription methods to our ShippingContainerController
. Two endpoints use Strawberry Shake subscriptions to handle real-time updates about shipping containers:
[HttpGet("added")] public async Task<IActionResult> OnContainerAdded() { var name = ""; var tcs = new TaskCompletionSource<string>(); IDisposable subscription = null!; subscription = client.OnContainerAdded .Watch() .Subscribe(result => { result.EnsureNoErrors(); name = result.Data!.OnShippingContainerAdded.Name; subscription?.Dispose(); tcs.SetResult(name); }); name = await tcs.Task; return Ok(JsonSerializer.Serialize(name)); }
The OnContainerAdded()
method subscribes to the OnContainerAdded()
event, waits for a new container to be added, captures its name, and returns it. Now let’s create another subscription for updated
container:
[HttpGet("updated")] public async Task<IActionResult> OnContainerSpaceChanged() { var tcs = new TaskCompletionSource<(string, double)>(); IDisposable subscription = null!; subscription = client.OnContainerSpaceChanged .Watch() .Subscribe(result => { result.EnsureNoErrors(); tcs.SetResult((result.Data!.OnShippingContainerSpaceChanged.Name, result.Data!.OnShippingContainerSpaceChanged.Space!.Volume)); subscription?.Dispose(); }); (var name, var volume) = await tcs.Task; return Ok(JsonSerializer.Serialize(new { name, volume })); }
The OnContainerSpaceChanged()
method subscribes to the OnContainerSpaceChanged()
event, waits for a container space change, captures the container’s name and new volume, and returns them as a JSON response.
It must be remembered that the subscription
variable is an IDisposable
. Hence, whenever we call the Dispose()
method the subscription
will stop receiving events.
Conclusion
To sum it up, after the second article of our Strawberry Shake series, we now know how to create subscriptions on the server and client sides using the WebSockets transport protocol to implement multi-source data integration.
We hope that this tutorial shows something new. For any questions, please leave them in the comments section below, and make sure to check out our next article.