Apache Kafka Producers and Consumers in Code
1. Overview
Apache Kafka is simply another event-driven stream processing conceived to provide high-throughput but low latency messaging system.
This project demonstrates producing and consuming kafka messages using Apache Avro (data serialization framework).
A random temperature is produced every given time that simulates reading the value from an IOT device. The value is then written on a Kafka topic and read by another service and written on the console.
2. Prerequirements
- Docker
- Java 11
- IntelliJ/Eclipse
3. Project Modules
For this exercise, we will provide the following projects:
- random-temp-to-kafka-service - generates and writes a random temperature (integer) to a Kafka topic
public void onStatus(TempDto wordDto) { RandomTempAvroModel avroModel = randomTempToAvroTransformer.getRandomAvroModelFromDto(wordDto); kafkaProducer.send(kafkaConfigData.getTopicName(), avroModel.getId(), avroModel); }
- kafka-to-console-service - reads and writes the random temperature to console from a Kafka topic.
@Override @KafkaListener(id = "randomTempTopicListener", topics = "${kafka-config.topic-name}") public void receive(@Payload List<RandomTempAvroModel> messages, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Long> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { messages.size(), keys.toString(), partitions.toString(), offsets.toString(), Thread.currentThread().getId()); LOG.info("messages received={}", transformer.getTempModels(messages)); }
- kafka-to-console-service-v2 - this version uses the Spring framework to initialize Kafka.
- kafka-producer - contains initialization parameters for a Kafka producer
kafka-config: bootstrap-servers: localhost:19092, localhost:29092, localhost:39092 schema-registry-url-key: schema.registry.url schema-registry-url: http://localhost:8081 topic-name: random-temp-topic topic-names-to-create: - random-temp-topic num-of-partitions: 3 replication-factor: 3 kafka-producer-config: key-serializer-class: org.apache.kafka.common.serialization.LongSerializer value-serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer compression-type: snappy acks: all batch-size: 16384 batch-size-boost-factor: 100 linger-ms: 5 request-timeout-ms: 60000 retry-count: 5
@Bean public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getKeySerializerClass()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getValueSerializerClass()); props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfigData.getBatchSize() * kafkaProducerConfigData.getBatchSizeBoostFactor()); props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfigData.getLingerMs()); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaProducerConfigData.getCompressionType()); props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfigData.getAcks()); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerConfigData.getRequestTimeoutMs()); props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfigData.getRetryCount()); return props; }
- kafka-consumer - contains initialization parameters for a Kafka consumer
kafka-config: bootstrap-servers: localhost:19092, localhost:29092, localhost:39092 schema-registry-url-key: schema.registry.url schema-registry-url: http://localhost:8081 topic-name: random-temp-topic topic-names-to-create: - random-temp-topic kafka-consumer-config: key-deserializer: org.apache.kafka.common.serialization.LongDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer consumer-group-id: random-words-topic-consumer auto-offset-reset: earliest specific-avro-reader-key: specific.avro.reader specific-avro-reader: true batch-listener: true auto-startup: true concurrency-level: 3 session-timeout-ms: 10000 heartbeat-interval-ms: 3000 max-poll-interval-ms: 300000 max-poll-records: 500 max-partition-fetch-bytes-default: 1048576 max-partition-fetch-bytes-boost-factor: 1 poll-timeout-ms: 150
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getKeyDeserializer()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getValueDeserializer()); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfigData.getConsumerGroupId()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.getAutoOffsetReset()); props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl()); props.put(kafkaConsumerConfigData.getSpecificAvroReaderKey(), kafkaConsumerConfigData.getSpecificAvroReader()); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.getSessionTimeoutMs()); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getHeartbeatIntervalMs()); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getMaxPollIntervalMs()); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaConsumerConfigData.getMaxPartitionFetchBytesDefault() * kafkaConsumerConfigData.getMaxPartitionFetchBytesBoostFactor()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.getMaxPollRecords()); return props; }
- kafka-config - common configuration classes for Retry & Kafka producer and consumer
- kafka-model - common model class
The common model used is generated from a JSON file.{ "namespace": "com.czetsuyatech.kafka.avro.model", "type": "record", "name": "RandomTempAvroModel", "fields": [ { "name": "deviceId", "type": "long" }, { "name": "id", "type": "long" }, { "name": "temp", "type": { "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 1 } }, { "name": "createdAt", "type": [ "null", "long" ], "logicalType": [ "null", "date" ] } ] }
And this is the maven plugin that converts the JSON file to a Java class.<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> <configuration> <stringType>String</stringType> </configuration> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin>
- kafka-admin - orchestrates the creation of the required Kafka topics
4. Running the Demo Application
- random-temp-to-kafka-service
- kafka-to-console-service-v1
- kafka-to-console-service-v2
Post a Comment