NashTech Blog

Table of Contents
Apply ESLint in Node.js

Implementing Kafka for Event Streaming- Producer

Apache Kafka has transformed the way modern applications handle real-time data streams. It serves as a highly scalable, fault-tolerant, and distributed streaming platform that successful in handling high-throughput data pipelines. In this blog, we will walk through the process of setting up and implementing a Kafka producer in .NET Core, capable of streaming simulated temperature data.

Introduction to Kafka

Kafka is a distributed streaming platform designed to handle real-time data feeds with high-throughput and low-latency. It is originally developed by LinkedIn and later open-sourced through the Apache Software Foundation, Kafka has become a foundation for building scalable event-driven architectures.

Set up Kafka Environment

To start on Kafka, you need to set up a environment.

  • Download and install Kafka: Visit the website of Apache Kafka to download and installation instruction specific to your operating system.
  • Start Kafka and Zookeeper: Kafka relies on Zookeeper for coordination. Start Kafka and Zookeeper brokers using the commands.
  • Kafka topic: Topics are used to categorize data streams. You can create a topic using Kafka command line or through program.

Creating a Kafka Producer

Create a .NET Core Project

Set up a .NET Core Project and configure it to use Kafka

dotnet new console -n KafkaTemperatureProducer
cd KafkaTemperatureProducer

Add necessary NuGet packages.

dotnet add package Confluent.Kafka

In your Project directory, “Confluent.Kafka” NuGet Package provides a .NET client for Kafka.

Configuring Kafka Producer

Configure Kafka producer to connect to Kafka broker and send data.
‘appsettings.json’

{
    "Kafka": {
        "BootstrapServers": "Localhost:9092", // Replace with your Kafka broker addess
        "Topic": "topic-name",
        "ClientId": "KafkaExampleProducer"
    },
    "Device":{
    "DeviceId": ""
    }
}

Producer Configuration

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;

public static class KafkaProducerConfig
{
    public static ProducerConfig GetConfig(IConfiguration configuration)
    {
        return new ProducerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"],
            ClientId = configuration["Kafka:ClientId"]
        };
    }
}

Create model to send simulated temperature data

public class TemperatureData
{
    public string? MessageId { get; set; }
    public string? DeviceId { get; set; }
    public double Temperature { get; set; }
    public double Humidity { get; set; }
}

Program.cs

using System;
using Confluent.Kafka;
using System.Threading;
using System.Text.Json;
//using Newtonsoft.Json;
using Microsoft.Extensions.Configuration;
//using Microsoft.Extensions.Configuration.Json;

namespace KafkaProducer
{

    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //Load configuration from appsettings.json
                var configuration = new ConfigurationBuilder()
                    .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
                    .Build();

                var kafkaConfig = KafkaProducerConfig.GetConfig(configuration);
                string topic = configuration["Kafka:Topic"];
                string deviceId = configuration["Device:DeviceId"] ;

                using var producer = new ProducerBuilder<Null, string>(kafkaConfig).Build();

                var random = new Random();
                for (int i = 0; i < 1000; i++)
                {
                    var data = new TemperatureData
                    {
                        MessageId = Guid.NewGuid().ToString(),
                        DeviceId = deviceId,
                        Temperature = random.NextDouble() * 100, //Random temperature betn 0 - 100
                        Humidity = random.NextDouble() * 100 //Random humidity betn 0 - 100
                    };

                    var jsonMessage = JsonSerializer.Serialize(data);

                    var deliveryReport = producer.ProduceAsync(topic, new Message<Null, string> { Value = jsonMessage }).Result;

                    Console.WriteLine($"Produced message to {deliveryReport.Topic} partition {deliveryReport.Partition} @ offset {deliveryReport.Offset}");

                    Thread.Sleep(1000); // Delay betn msgs
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error: {ex.Message}");
            }
        }
    }
}

Key Flow

  • Temperature Generator generates random temperature and humidity data.
  • Producer Application reads configuration from appsettings.json, connects to the Kafka Broker via the Kafka .NET client.
  • JSON messages containing temperature data are produced to the Kafka Topic (Demo-topic).
  • Messages can then be consumed by downstream applications or stored for further processing.

This diagram provides a high-level overview of how Kafka is implemented in a .NET Core environment for event streaming, demonstrating the flow of data from simulation to ingestion into Kafka.

Kafka Producer implementation

  • Temperature Generator:
    – Simulates temperature data and generates JSON messages containing temperature and humidity information.
  • Producer Application (.NET Core):
    -Reads configuration settings from appsettings.json.
    -Uses Kafka .NET client to connect to Kafka broker (BootstarpServers configured in appsettings.json).
  • Kafka Broker:
    -Central component of Kafka architecture that handles storage, distribution, and replication of data topics.
  • Kafka Topic:
    -Logical channel where messages are published by producers and consumed by consumers.
    -Demo-topic as configured in appsettings.json is where our producer sends messages.
  • JSON Message:
    -Serialized representation of TemperatureData containing MessageId, DeviceId, Temperature, and Humidity.
  • appsettings.json Configuration:
    _Stores Kafka-related configuration (BootstrapServers, ClientId, Topic) and device specific information (DeviceId).

Summary

By following above, You’ve learned how to implement Kafka producer. We have configured Kafka environment, then configured a .NET Core project, created a Kafka Producer to sends simulated temperature data and used json for data serialization.

This post provide a overview and implementation guide for setting up a Kafka producer in .NET Core focusing on sending simulated temperature data.

Read previous blog on Introduction to Kafka by following this link:
https://blog.nashtechglobal.com/introduction-to-kafka-and-setup-on-linux/

Picture of MudassirQ

MudassirQ

Leave a Comment

Your email address will not be published. Required fields are marked *

Suggested Article

Scroll to Top