1. Setting Up Apache Kafka Locally
1.1 Prerequisites
Before we install Kafka, ensure that the following are installed:
- Java (JDK 8 or higher) – Kafka runs on Java.
- .NET SDK (6.0 or later) – Required to build our .NET Core application.
- Docker (Optional) – Can be used to run Kafka in a containerized environment.
1.2 Installing and Configuring Kafka
Step 1: Download Kafka
Download the latest stable version of Apache Kafka from Kafka’s official site.
Step 2: Extract and Start Kafka
Unzip the downloaded Kafka archive and navigate to the extracted folder.
Step 3: Start ZooKeeper
Kafka requires ZooKeeper to manage brokers. Start ZooKeeper using the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
For Windows users, use:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
Step 4: Start Kafka Broker
Once ZooKeeper is running, start the Kafka broker:
bin/kafka-server-start.sh config/server.properties
For Windows users:
bin\windows\kafka-server-start.bat config\server.properties
Step 5: Verify Kafka Installation
To verify if Kafka is running, create a test topic:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
List available topics:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Kafka is now installed and ready for use.
2. Publishing and Consuming Events in .NET Core
To integrate Kafka with .NET Core, we will use the Confluent Kafka .NET Client.
2.1 Install Confluent Kafka NuGet Package
Run the following command to install the Kafka client in your .NET application:
dotnet add package Confluent.Kafka
2.2 Creating a Kafka Producer
Create a KafkaProducer class in a .NET Core console application:
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
class KafkaProducer
{
public static async Task Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
for (int i = 1; i <= 10; i++)
{
var result = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = $"Message {i}" });
Console.WriteLine($"Produced message: {result.Value} at {result.TopicPartitionOffset}");
}
}
}
Run the producer to send messages to Kafka:
dotnet run
2.3 Creating a Kafka Consumer
Create a KafkaConsumer class to read messages from Kafka:
using Confluent.Kafka;
using System;
class KafkaConsumer
{
public static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");
while (true)
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Consumed message: {consumeResult.Value} from {consumeResult.TopicPartitionOffset}");
}
}
}
Run the consumer in a separate terminal:
dotnet run
3. Handling Real-Time Data Processing
Kafka can be used for real-time processing of streaming data in various ways. We will implement a simple message filtering pipeline in .NET Core.
3.1 Creating a Real-Time Event Processor
Modify the consumer to process messages:
using Confluent.Kafka;
using System;
class KafkaProcessor
{
public static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "event-processor-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");
using var producer = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9092" }).Build();
while (true)
{
var consumeResult = consumer.Consume();
var processedMessage = consumeResult.Value.ToUpper();
producer.Produce("processed-topic", new Message<Null, string> { Value = processedMessage });
Console.WriteLine($"Processed and forwarded message: {processedMessage}");
}
}
}
This processor consumes messages, converts them to uppercase, and republishes them to a new topic (processed-topic
).
3.2 Monitoring Kafka Streams
Kafka provides monitoring tools like:
-
Kafka Console Consumer: To check messages in a topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processed-topic --from-beginning
-
Kafka Manager: A web-based UI for managing Kafka clusters.
Conclusion
We have successfully implemented an event-driven architecture in .NET Core using Apache Kafka. We covered:
- Installing and configuring Kafka locally
- Publishing and consuming events in .NET Core
- Processing real-time data using Kafka streams
This setup can be extended for scalable microservices, real-time analytics, log processing, and more. Kafka’s ability to handle high-throughput messaging makes it an ideal choice for building reactive, scalable applications in .NET.
Start exploring advanced Kafka features like partitioning, replication, and stream processing to build robust event-driven systems!