At its core, Kafka is a distributed streaming platform designed to handle large volumes of data in real-time. Its exceptional performance, fault tolerance, and scalability make it a pivotal component in event-driven architectures.
In this article, we’ll learn how to integrate Kafka with an ASP.NET Core Web API using a custom Docker Compose setup and the Confluent Kafka Nuget package.
Understanding Kafka and Event-Driven Architecture
Kafka utilizes a publish-subscribe model. In this model, producers publish messages to topics, and consumers subscribe to those topics to receive the messages. This model allows for seamless communication between different system components and promotes the creation of scalable and responsive applications.
Kafka comprises several components that work together to provide a reliable and scalable messaging system:
In this diagram, we have producers that create new events or data messages and publish them to specific topics in Kafka.
The central component is the Kafka Cluster, comprising multiple Brokers for high availability, fault tolerance, and load balancing. The cluster receives these messages and stores them. The Kafka Cluster of this streaming data platform works as a center where producers send data, while consumers receive such information. Various partitions divide each topic, and Kafka distributes them across multiple brokers in a cluster to ensure both replication and parallelism.
Zookeeper plays the vital role of managing and coordinating the Kafka Brokers. It helps maintain a list of brokers, tracks the status of nodes, and, most importantly, operates leader elections for partition.
The final element represented in the diagram is the consumers. Consumers subscribe to one or more topics in the Kafka Cluster, consume the data feed, and process it. We can organize consumers into consumer groups to optimize this process, enabling load balancing across the system. The consumers act as the end receivers of the streaming channel.
Event-driven Architecture
Event-driven is an architectural pattern that organizes a system as loosely coupled components communicating and coordinating through events.
People often link Kafka with microservices because it helps services talk to each other without being too dependent.
However, Kafka isn’t only for microservices. It fits into all kinds of setups, even with big applications starting to adopt the event-driven approach.
Kafka’s importance in event-driven applications arises from its high throughput, fault tolerance, and event sequencing capabilities. Because events are foundational to such architectures, Kafka’s efficient event management is crucial for building reliable, scalable systems.
Let’s see how to set up Kafka.
Application Description
Let’s simulate a basic scenario in an e-commerce application:
Here, the customer calls the Web API, which publishes an “OrderPlaced” event to a Kafka topic, which an Order Confirmation Service subscribes to.
We implement the producer responsible for publishing the event in the Web API project, while a console application represents the consumer component, which consumes and processes events.
Here, Kafka topics act as a logical channel for message publication, enabling categorization, organization, and parallel processing.
Setting Up Kafka
To leverage Kafka for building scalable and event-driven applications within an ASP.NET Core Web API, we need to establish a connection between our Web API project and the Kafka broker. Accordingly, we utilize the Docker Compose file in our project to orchestrate the deployment of Kafka and Zookeeper containers in this setup.
For a complete reference, feel free to check out the official Confluent GitHub repository, which provides a comprehensive Docker Compose file for setting up an entire Kafka ecosystem.
Next, let’s start the containers:
docker-compose up -d
We use Developer PowerShell in Visual Studio to run the command from the directory containing our docker-compose.yml
file.
Now, let’s check the output:
[+] Running 15/2 broker 11 layers Pulled zookeeper 2 layers Pulled [+] Running 2/3 Network kafka default Created Container zookeeper Started Container broker Started
As we can see, both the Zookeeper and Kafka broker are running successfully.
Next, let’s create a topic:
docker exec -it broker kafka-topics --create --topic order-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
We create an “order-events” topic, specifying its partitions and replication factor. Partitions dictate the parallel streams of data Kafka can handle, and adjusting them optimizes performance based on specific requirements. A replication factor of “1” indicates a single copy for simplicity, though we can increase it for fault tolerance and high availability in production environments.
We follow a consistent and descriptive naming convention, like “order-events” which helps us understand the functionality and purpose of each component within the Kafka-based system.
Integrating Kafka With ASP.NET Core Web API
To begin with, let’s create a Web API project and install the Confluent.Kafka
package to it:
dotnet add package Confluent.Kafka --version 2.3.0
Confluent Kafka is a robust and widely adopted distribution of Apache Kafka, a leading open-source event streaming platform. It extends Kafka’s capabilities with additional features, tools, and components to simplify the development, deployment, and management of Kafka-based applications.
Configuring the Kafka Producer
Now, let’s start by configuring the producer and adding it to our services in the Program.cs
file:
var producerConfig = new ProducerConfig { BootstrapServers = $"localhost:{Helper.GetKafkaBrokerPort( Directory.GetParent(Environment.CurrentDirectory)?.FullName!)}", ClientId = "order-producer" }; builder.Services.AddSingleton( new ProducerBuilder<string, string>(producerConfig).Build());
Here, we initialize the ProducerConfig
object, specify the Kafka broker’s address, and assign the producer a unique identifier (ClientId
). Apache Kafka uses the default port “9092” for communication between clients and brokers. Kafka brokers will listen for incoming connections from producers and consumers.Â
We retrieve the Kafka broker address using the GetKafkaBrokerPort()
utility method that we create inside the Shared
library Within the method, we read the docker-compose.yml
file to extract the port configuration for the Kafka broker. For brevity, we have excluded the implementation from the article. Please refer to the GitHub repository to see the implementation.
The ClientId
is a distinctive identifier for the producer within the Kafka ecosystem. This allows Kafka to track and distinguish messages originating from different producers. It also enables Kafka to maintain state information about the producer, facilitating effective load distribution and evenly distributing messages across broker partitions.
After that, we register the Kafka producer as a singleton service within the ASP.NET Core dependency injection container. We use the ProducerBuilder
class to construct and configure the producer and utilize the Build()
method to finalize its creation.
Configuring the Kafka Consumer
Let’s create a console app to configure the consumer in the Program.cs
class:
var consumerConfig = new ConsumerConfig { BootstrapServers = $"localhost:{Helper.GetKafkaBrokerPort( Directory.GetParent(Environment.CurrentDirectory)?.Parent?.Parent?.Parent?.FullName!)}", GroupId = "order-consumer", AutoOffsetReset = AutoOffsetReset.Earliest };
We start by initializing the ConsumerConfig
class to configure the Kafka consumer. Similarly, we fetch the Kafka broker’s address. Then, we assign a consumer group identifier (GroupId
) as “order-consumer” and set the AutoOffsetReset
to Earliest
.
We select AutoOffsetReset.Earliest
to make the consumer start from the earliest available message, ensuring we process all messages and maintain the entire message history. This is useful for new consumers or restarting after inactivity to catch up on missed messages.
Another option available for the auto-offset reset configuration is AutoOffsetReset.Latest
. When set to Latest
 the consumer starts consuming messages from the latest offset, meaning it begins with the most recent messages published to the topic.
Next, let’s register the Kafka consumer just below the consumerConfig
initialization:
var builder = Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { services.AddSingleton( new ConsumerBuilder<string, string>(consumerConfig).Build()); });
We use the CreateDefaultBuilder()
method to configure the host, and we register the Kafka consumer as a singleton service using the specified consumerConfig
.Â
Producing Messages with Kafka in ASP.NET Core
Publishing messages to Kafka from an ASP.NET Core Web API involves configuring a Kafka producer and sending messages to a specific topic.
First, let’s create an OrderDetails
class inside the Shared
library:
public class OrderDetails { public int OrderId { get; set; } public string? ProductName { get; set; } public decimal Price { get; set; } public DateTime OrderDate { get; set; } }
For the sake of simplicity, we have combined the entity OrderDetails
class and utility method into a single project Shared
library. While this approach may suffice for demonstration purposes, it’s best practice to separate entities and utilities into distinct projects in a production environment, as they typically handle different data and serve various purposes.
Next, let’s create an OrderController
class in the Web API:
[ApiController] [Route("api/[controller]")] public class OrderController(IProducer<string, string> producer) : ControllerBase { private readonly IProducer<string, string> _producer = producer; private const string Topic = "order-events"; [HttpPost("place-order")] public async Task<IActionResult> PlaceOrder(OrderDetails orderDetails) { try { var kafkaMessage = new Message<string, string> { Value = JsonConvert.SerializeObject(orderDetails) }; await _producer.ProduceAsync(Topic, kafkaMessage); return Ok("Order placed successfully"); } catch (ProduceException<string, string> ex) { return BadRequest($"Error publishing message: {ex.Error.Reason}"); } } }
As a first step, we inject the Kafka producer in the primary constructor through dependency injection and set the Kafka topic to “order-events.” Next, we create a POST
endpoint that accepts orderDetails
as the input parameter.
When producing messages to Kafka, it’s essential to serialize complex data structures like the orderDetails
object into a format that Kafka can handle.
In this case, we use JSON serialization to convert the orderDetails
object into a string representation. Although Kafka messages are typically composed of key-value pairs represented as byte arrays, the Message
class used here allows us to encapsulate the serialized value directly.
It’s worth noting that while Kafka messages can include both keys and values, in this scenario, we only deal with the value, which is set to the serialized JSON string of the orderDetails
object. The key defaults to null
if we don’t specify it, which is acceptable for our use case.
Finally, we invoke the ProduceAsync()
method to send the message to the “order-events” topic asynchronously.
Consuming Kafka Messages in ASP.NET Core
To process the messages published by our ASP.NET Core Web API, we must create a Kafka consumer within our console application.
Now, let’s create the ConsumerService
class in the console app:
public class ConsumerService(IConsumer<string, string> consumer) : IHostedService { private readonly IConsumer<string, string> _consumer = consumer; public Task StartAsync(CancellationToken cancellationToken) { _consumer.Subscribe("order-events"); Task.Run(() => { while (!cancellationToken.IsCancellationRequested) { var consumeResult = _consumer.Consume(cancellationToken); if (consumeResult is null) { return; } var orderDetails = JsonConvert.DeserializeObject<OrderDetails>(consumeResult.Message.Value); Console.WriteLine($"Received message: " + $"Order Id: {orderDetails?.OrderId}, Product name: {orderDetails?.ProductName}, " + $"Price: {orderDetails?.Price}, Order date: {orderDetails?.OrderDate}"); } }, cancellationToken); return Task.CompletedTask; } }
Straightaway, we define a ConsumerService
class that implements the IHostedService
interface, serving as the background service responsible for consuming messages from a Kafka topic.
After that, we execute the StartAsync()
method, which initiates when the hosted service starts. The Kafka consumer subscribes to the “order-events” topic within this method. Then, we initiate the background task using the Task.Run()
method, creating a loop that continuously listens for incoming Kafka messages as long as the cancellation token has not been signaled.
We check if the consumeResult
is null
before attempting to access its properties to avoid a NullReferenceException
.
Finally, we deserialize the JSON string representation into an instance of the OrderDetails
class.
Now, let’s register the ConsumerService
class in Program
class and invoke the consumer:
var builder = Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { services.AddSingleton( new ConsumerBuilder<string, string>(consumerConfig).Build()); services.AddHostedService<ConsumerService>(); }); var host = builder.Build(); var kafkaConsumerService = host.Services.GetRequiredService<IHostedService>(); await kafkaConsumerService.StartAsync(default); await Task.Delay(Timeout.Infinite);
Here, we add a ConsumerService
class as the hosted service to manage the Kafka consumer’s lifecycle.
Then, we build the host and retrieve the Kafka consumer service. Then, we invoke the StartAsync()
method on the Kafka consumer service, initiating the consumption of Kafka messages.
Finally, we call the Task.Delay(Timeout.Infinite)
method to keep the application running indefinitely. By passing a Timeout.Infinite
value to Task.Delay()
method, we create an indefinite delay, allowing the application to remain active and responsive to events or messages until we manually terminate it.
Testing the Kafka Integration
Now, it’s time to test our implementation. First, let’s run the Web API and the console app simultaneously by setting them to multiple start-up projects.
Now, let’s invoke the endpoint:
Here, we successfully publish the message to the Kafka topic.
Now, let’s inspect the consumer:
Received message: Order Id: 1, Product name: Mobile, Price: 100.0, Order date: 30-03-2024 16:28:00
As we can see, the consumer receives the message we published.
Optimizing Kafka Usage
We can optimize Kafka usage in our ASP.NET Core Web API by implementing several practical tips to enhance performance and efficiency.
First, we should consider batching multiple messages into a single request to reduce overhead and improve throughput. We configure the BatchSize
property in the ProducerConfig
class to specify the maximum number of messages to include in a batch. By default, Kafka sets this property to a value of 1000000 bytes, but we can adjust it based on our application’s requirements.
Then we need to partition Kafka topics to enable parallel processing, ensuring efficient utilization of resources. Implementing monitoring solutions is crucial to track Kafka metrics and performance, to be able to timely detect issues and resolve them.
Lastly, we should optimize message serialization/deserialization processes by selecting efficient serialization formats. By incorporating these strategies, we can ensure Kafka’s optimal performance in our ASP.NET Core Web API.
Best Practices for Kafka Integration in ASP.NET Core
Lastly, let’s explore the best practices for seamlessly integrating Kafka into our ASP.NET Core application.
First, we should implement comprehensive error-handling mechanisms, managing issues like connection errors or message processing failures. By doing so, we ensure a resilient and reliable system.
Next, we recommend configuring our Kafka producers to be idempotent. This way we prevent duplicate messages during retries and ensure consistent behavior throughout our application. It’s a strategic move contributing to our Kafka integration’s robustness.
We can accomplish this using the EnableIdempotence
property in ProducerConfig
, which prevents duplicate messages during retries. This ensures unique identifiers for messages and prevents duplicate production, even during transient failures.
Now, we shift our focus to scalability.
Accordingly, we should leverage consumer groups for load balancing and parallel message processing. This empowers our application to scale efficiently, meeting the demands of increased workloads.
We assign unique group identifiers to consumer groups so that Kafka can track progress and distribute messages evenly. Deploying multiple instances of our consumer app with the same group ID enables Kafka to balance partitions for efficient automatic processing.
Conclusion
In this article, we’ve explored Confluent Kafka’s transformative impact on ASP.NET Core Web API applications, underscoring its pivotal role in enabling scalable, resilient, and responsive event-driven architectures.
Therefore, by integrating Kafka, we can leverage its robust features to build dynamic and efficient systems that meet the demands of modern software development.