Implementing Kafka for Event Streaming- Producer
Apache Kafka has transformed the way modern applications handle real-time data streams. It serves as a highly scalable, fault-tolerant, and distributed streaming platform that successful in handling high-throughput data pipelines. In this blog, we will walk through the process of setting up and implementing a Kafka producer in .NET Core, capable of streaming simulated temperature data.
Introduction to Kafka
Kafka is a distributed streaming platform designed to handle real-time data feeds with high-throughput and low-latency. It is originally developed by LinkedIn and later open-sourced through the Apache Software Foundation, Kafka has become a foundation for building scalable event-driven architectures.
Set up Kafka Environment
To start on Kafka, you need to set up a environment.
- Download and install Kafka: Visit the website of Apache Kafka to download and installation instruction specific to your operating system.
- Start Kafka and Zookeeper: Kafka relies on Zookeeper for coordination. Start Kafka and Zookeeper brokers using the commands.
- Kafka topic: Topics are used to categorize data streams. You can create a topic using Kafka command line or through program.
Creating a Kafka Producer
Create a .NET Core Project
Set up a .NET Core Project and configure it to use Kafka
dotnet new console -n KafkaTemperatureProducer
cd KafkaTemperatureProducer
Add necessary NuGet packages.
dotnet add package Confluent.Kafka
In your Project directory, “Confluent.Kafka” NuGet Package provides a .NET client for Kafka.
Configuring Kafka Producer
Configure Kafka producer to connect to Kafka broker and send data.
‘appsettings.json’
{
"Kafka": {
"BootstrapServers": "Localhost:9092", // Replace with your Kafka broker addess
"Topic": "topic-name",
"ClientId": "KafkaExampleProducer"
},
"Device":{
"DeviceId": ""
}
}
Producer Configuration
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
public static class KafkaProducerConfig
{
public static ProducerConfig GetConfig(IConfiguration configuration)
{
return new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
ClientId = configuration["Kafka:ClientId"]
};
}
}
Create model to send simulated temperature data
public class TemperatureData
{
public string? MessageId { get; set; }
public string? DeviceId { get; set; }
public double Temperature { get; set; }
public double Humidity { get; set; }
}
Program.cs
using System;
using Confluent.Kafka;
using System.Threading;
using System.Text.Json;
//using Newtonsoft.Json;
using Microsoft.Extensions.Configuration;
//using Microsoft.Extensions.Configuration.Json;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
try
{
//Load configuration from appsettings.json
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.Build();
var kafkaConfig = KafkaProducerConfig.GetConfig(configuration);
string topic = configuration["Kafka:Topic"];
string deviceId = configuration["Device:DeviceId"] ;
using var producer = new ProducerBuilder<Null, string>(kafkaConfig).Build();
var random = new Random();
for (int i = 0; i < 1000; i++)
{
var data = new TemperatureData
{
MessageId = Guid.NewGuid().ToString(),
DeviceId = deviceId,
Temperature = random.NextDouble() * 100, //Random temperature betn 0 - 100
Humidity = random.NextDouble() * 100 //Random humidity betn 0 - 100
};
var jsonMessage = JsonSerializer.Serialize(data);
var deliveryReport = producer.ProduceAsync(topic, new Message<Null, string> { Value = jsonMessage }).Result;
Console.WriteLine($"Produced message to {deliveryReport.Topic} partition {deliveryReport.Partition} @ offset {deliveryReport.Offset}");
Thread.Sleep(1000); // Delay betn msgs
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
}

Key Flow
- Temperature Generator generates random temperature and humidity data.
- Producer Application reads configuration from
appsettings.json, connects to the Kafka Broker via the Kafka .NET client. - JSON messages containing temperature data are produced to the Kafka Topic (
Demo-topic). - Messages can then be consumed by downstream applications or stored for further processing.
This diagram provides a high-level overview of how Kafka is implemented in a .NET Core environment for event streaming, demonstrating the flow of data from simulation to ingestion into Kafka.
Kafka Producer implementation
- Temperature Generator:
– Simulates temperature data and generates JSON messages containing temperature and humidity information. - Producer Application (.NET Core):
-Reads configuration settings fromappsettings.json.
-Uses Kafka .NET client to connect to Kafka broker (BootstarpServers configured in appsettings.json). - Kafka Broker:
-Central component of Kafka architecture that handles storage, distribution, and replication of data topics. - Kafka Topic:
-Logical channel where messages are published by producers and consumed by consumers.
-Demo-topic as configured in appsettings.json is where our producer sends messages. - JSON Message:
-Serialized representation of TemperatureData containingMessageId,DeviceId, Temperature, andHumidity. - appsettings.json Configuration:
_Stores Kafka-related configuration (BootstrapServers,ClientId,Topic) and device specific information (DeviceId).
Summary
By following above, You’ve learned how to implement Kafka producer. We have configured Kafka environment, then configured a .NET Core project, created a Kafka Producer to sends simulated temperature data and used json for data serialization.
This post provide a overview and implementation guide for setting up a Kafka producer in .NET Core focusing on sending simulated temperature data.
Read previous blog on Introduction to Kafka by following this link:
https://blog.nashtechglobal.com/introduction-to-kafka-and-setup-on-linux/