no

Apache Kafka Producers and Consumers in Code

1. Overview

Apache Kafka is simply another event-driven stream processing conceived to provide high-throughput but low latency messaging system.

This project demonstrates producing and consuming kafka messages using Apache Avro (data serialization framework).

A random temperature is produced every given time that simulates reading the value from an IOT device. The value is then written on a Kafka topic and read by another service and written on the console.

2. Prerequirements

You must have the following installed on your local machine:
  • Docker
  • Java 11
  • IntelliJ/Eclipse

3. Project Modules

For this exercise, we will provide the following projects:

  • random-temp-to-kafka-service - generates and writes a random temperature (integer) to a Kafka topic
    public void onStatus(TempDto wordDto) {
    
    	RandomTempAvroModel avroModel = randomTempToAvroTransformer.getRandomAvroModelFromDto(wordDto);
    	kafkaProducer.send(kafkaConfigData.getTopicName(), avroModel.getId(), avroModel);
    }
      
  • kafka-to-console-service - reads and writes the random temperature to console from a Kafka topic.
    @Override
    @KafkaListener(id = "randomTempTopicListener", topics = "${kafka-config.topic-name}")
    public void receive(@Payload List<RandomTempAvroModel> messages,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Long> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    
    	messages.size(),
    	keys.toString(),
    	partitions.toString(),
    	offsets.toString(),
    	Thread.currentThread().getId());
    
    	LOG.info("messages received={}", transformer.getTempModels(messages));
    }
      
  • kafka-to-console-service-v2 - this version uses the Spring framework to initialize Kafka.
  • kafka-producer - contains initialization parameters for a Kafka producer
    kafka-config:
      bootstrap-servers: localhost:19092, localhost:29092, localhost:39092
      schema-registry-url-key: schema.registry.url
      schema-registry-url: http://localhost:8081
      topic-name: random-temp-topic
      topic-names-to-create:
        - random-temp-topic
      num-of-partitions: 3
      replication-factor: 3
    
    kafka-producer-config:
      key-serializer-class: org.apache.kafka.common.serialization.LongSerializer
      value-serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer
      compression-type: snappy
      acks: all
      batch-size: 16384
      batch-size-boost-factor: 100
      linger-ms: 5
      request-timeout-ms: 60000
      retry-count: 5
      
    @Bean
    public Map<String, Object> producerConfig() {
    
    	Map<String, Object> props = new HashMap<>();
    	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers());
    	props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl());
    	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getKeySerializerClass());
    	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getValueSerializerClass());
    	props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfigData.getBatchSize() *
    		kafkaProducerConfigData.getBatchSizeBoostFactor());
    	props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfigData.getLingerMs());
    	props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaProducerConfigData.getCompressionType());
    	props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfigData.getAcks());
    	props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerConfigData.getRequestTimeoutMs());
    	props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfigData.getRetryCount());
    
    	return props;
    }
      
  • kafka-consumer - contains initialization parameters for a Kafka consumer
    kafka-config:
      bootstrap-servers: localhost:19092, localhost:29092, localhost:39092
      schema-registry-url-key: schema.registry.url
      schema-registry-url: http://localhost:8081
      topic-name: random-temp-topic
      topic-names-to-create:
        - random-temp-topic
    
    kafka-consumer-config:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      consumer-group-id: random-words-topic-consumer
      auto-offset-reset: earliest
      specific-avro-reader-key: specific.avro.reader
      specific-avro-reader: true
      batch-listener: true
      auto-startup: true
      concurrency-level: 3
      session-timeout-ms: 10000
      heartbeat-interval-ms: 3000
      max-poll-interval-ms: 300000
      max-poll-records: 500
      max-partition-fetch-bytes-default: 1048576
      max-partition-fetch-bytes-boost-factor: 1
      poll-timeout-ms: 150
      
    @Bean
    public Map<String, Object> consumerConfigs() {
    
    	Map<String, Object> props = new HashMap<>();
    	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers());
    	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getKeyDeserializer());
    	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getValueDeserializer());
    	props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfigData.getConsumerGroupId());
    	props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.getAutoOffsetReset());
    	props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl());
    	props.put(kafkaConsumerConfigData.getSpecificAvroReaderKey(), kafkaConsumerConfigData.getSpecificAvroReader());
    	props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.getSessionTimeoutMs());
    	props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getHeartbeatIntervalMs());
    	props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getMaxPollIntervalMs());
    	props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
    		kafkaConsumerConfigData.getMaxPartitionFetchBytesDefault() *
    			kafkaConsumerConfigData.getMaxPartitionFetchBytesBoostFactor());
    	props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.getMaxPollRecords());
    	return props;
    }
      
  • kafka-config - common configuration classes for Retry & Kafka producer and consumer
  • kafka-model - common model class
    The common model used is generated from a JSON file.
    {
      "namespace": "com.czetsuyatech.kafka.avro.model",
      "type": "record",
      "name": "RandomTempAvroModel",
      "fields": [
        {
          "name": "deviceId",
          "type": "long"
        },
        {
          "name": "id",
          "type": "long"
        },
        {
          "name": "temp",
          "type": {
            "type": "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 1
          }
        },
        {
          "name": "createdAt",
          "type": [
            "null",
            "long"
          ],
          "logicalType": [
            "null",
            "date"
          ]
        }
      ]
    }
      
    And this is the maven plugin that converts the JSON file to a Java class.
    <plugin>
    	<groupId>org.apache.avro</groupId>
    	<artifactId>avro-maven-plugin</artifactId>
    	<version>${avro.version}</version>
    	<configuration>
    	  <stringType>String</stringType>
    	</configuration>
    	<executions>
    	  <execution>
    		<phase>generate-sources</phase>
    		<goals>
    		  <goal>schema</goal>
    		</goals>
    		<configuration>
    		  <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
    		  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
    		</configuration>
    	  </execution>
    	</executions>
    </plugin>
      
  • kafka-admin - orchestrates the creation of the required Kafka topics

4. Running the Demo Application

You can import all the projects in IntelliJ and create a Run Configuration for these 3 projects:
  • random-temp-to-kafka-service
  • kafka-to-console-service-v1
  • kafka-to-console-service-v2
Or you can build and run the projects in the terminal using the maven command.

5. Viewing the Kafka Topic

For viewing the Kafka topic, you may download and use KafkaTool.

6. Summary

In this blog, we learn how to create Kafka consumer and producer how they interact together with a Kafka broker. A more in depth explanation and demonstration is in the video.

7. Github Repository

Related

spring 6767157782745748357

Post a Comment Default Comments

Outsourcing

Are you looking for Java Developers in the Philippines? Get in touch.

Donations

If you like what I do, you can support this channel by buying me a coffee. I would be grateful for your contribution!

item