Building a Real-Time C# Kafka Producer Using a Web API for User Creation Events
In modern application architectures, real-time data streaming and event-driven communication are becoming increasingly important. Apache Kafka has emerged as a leading distributed streaming platform, enabling seamless event processing. In this article, we will explore how to build a C# Kafka producer using a Web API to publish user creation events in real time, ensuring efficient and reliable data streaming.
The Scenario
Imagine you are developing a web application where users can sign up and create accounts. You want to capture user creation events and publish them to a Kafka topic in real time. This will allow other services or consumers to react to new user registrations immediately.
Setting up the Environment
Before diving into the implementation, ensure you have the following prerequisites:
a) Kafka Cluster: Set up a Kafka cluster or use an existing one. Make sure you have the necessary configurations, such as broker addresses and topic details.
b) Confluent.Kafka Library: Install the Confluent.Kafka NuGet package into your C# project. This library provides the essential components for creating Kafka producers.
c) ASP.NET Web API: Create a new ASP.NET Web API project or use an existing one for user registration.
Configuring Kafka Producer
First, let's create an interface that defines the operations for the producer:
public interface IProducerService
{
Task Send(string message);
Task SetTopic(string topic);
}
After that, let's create the producer class that inherits our interface:
public class ProducerService : IProducerService
We need a variable to receive the configuration of the Kafka cluster
private readonly ProducerConfig config = default!;
Anotthe two variables that we are going to use to produce the message
private readonly IProducer<int, string> producer = default!;
private readonly ProducerBuilder<int, string> builder = default!;
And finally, we need a variable to store the Kafka topic
public string Topic { get; private set; }
Let's configure our service in the class constructor
public ProducerService(ProducerConfig config, ILogger<ProducerService> logger)
{
this.config = config;
this.logger = logger;
Topic = "";
builder = new ProducerBuilder<int, string>(config);
producer = builder.Build();
}
Next step, implement the methods to send messages and set the topic:
public async Task Send(string message)
{
await Task.Run(() =>
{
var messagePacket = new Message<int, string>() { Key = 1, Value = message };
producer.ProduceAsync(Topic, messagePacket, CancellationToken.None);
});
}
public async Task SetTopic(string topic)
{
await Task.Run(() => { Topic = topic; });
}
Implementing the User Registration API
In your ASP.NET Web API project, add an endpoint to handle user registrations. When a new user is successfully created, the Web API will publish the user creation event to the Kafka topic using the previously configured Kafka producer.
[Route("api/[controller]")]
[ApiController]
public class UsersController : ControllerBase
{
private readonly IProducerService producerService;
private const string topic = "USER_REGISTRATION";
public UsersController(IProducerService producerService)
{
this.producerService = producerService;
}
[HttpPost]
public async Task<IActionResult> CreateUserAsync([FromBody] UserRegistrationDto userDto)
{
// Your user registration logic here...
// Assuming the user creation is successful, proceed to publish the event
// Example user creation event
string userCreationEvent = $"New user created: {userDto.Email}";
// Publish the event to Kafka
await producerService.SetTopic(topic);
await producerService.Send(userCreationEvent);
return Ok("User created successfully!");
}
}
Configuring Kafka Topic and Web API
Make sure to add the necessary Kafka and topic configuration settings to your appsettings.json:
{
"ProducerConfig": {
"BootstrapServers": "XXXXXXXXX",
"SaslMechanism": "PLAIN",
"SaslUsername": "XXXXXXX",
"SaslPassword": "XXXXXXX",
"SecurityProtocol": "SaslSsl"
}
}