Kafka Comprehensive Tutorial – Part 3
May 10, 2019
Data Processing Guarantees -An explanation of key processing semantics including at least once, at most once and exactly once
May 11, 2019
Show all

Kafka Comprehensive Tutorial – Part 4

Kafka Broker | Command-line Options and Procedure

Kafka Broker
Kafka Broker | Command-line Options and Procedure

2. What is Kafka Broker?

A Kafka broker is also known as Kafka server and a Kafka node. These all names are its synonyms. In simple words, a broker is a mediator between two. However, Kafka broker is more precisely described as a Message Broker which is responsible for mediating the conversation between different computer systems, guaranteeing delivery of the message to the correct parties.

Hence, the Kafka cluster typically consists of multiple brokers. Kafka Cluster uses Zookeeper for maintaining the cluster state. A single Broker can handle thousands of reads and writes per second. Whereas, if there is no performance impact, each broker can handle TB of messages. In addition, to be very sure that ZooKeeper performs broker leader election.

Kafka Broker
Working of Broker in Kafka

Basically, a broker in Kafka is modeled as KafkaServer, which hosts topics. Here, given topics are always partitioned across brokers, in a cluster a single broker hosts topic partitions of one or more topics actually, even when a topic is only partitioned to just a single partition.

In addition, producers send a message to a broker, after receiving broker stores them on disk keyed by unique offset. Moreover, by topic, partition and offset a broker allows consumers to fetch messages. However, brokers can create a cluster just by sharing information with each other directly or indirectly using Zookeeper. Also, we can say Kafka cluster has exactly one broker which acts as the Controller. By using kafka-server-start.sh script, we can start a single Kafka broker.

3. How to Start Kafka Broker?

  • Start Zookeeper.
  1. ./bin/zookeeper-server-start.sh config/zookeeper.properties

We can start a Kafka server, only when Zookeeper is up and running(that will connect to Zookeeper).

  1. ./bin/kafka-server-start.sh config/server.properties
  2. kafka-server-start.sh script

Hence, kafka-server-start.sh starts a broker.

  1. $ ./bin/kafka-server-start.sh
  2. USAGE: ./bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*


kafka-server-start.sh uses config/log4j.properties for logging configuration that we can override using KAFKA_LOG4J_OPTS environment variable.

  1. KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/log4j.properties"

kafka-server-start.sh accepts KAFKA_HEAP_OPTS and EXTRA_ARGS environment variables.

4. Kafka Command-line Options

-name 
Defaults to kafkaServer when in daemon mode.
-loggc
Enabled when in daemon mode.
-daemon
Enables daemon mode.
–override property=value 
Value that should override the value set for property in server.properties file.

  1. $ ./bin/kafka-server-start.sh config/server.properties --override broker.id=100
  2. Output
  3. INFO [KafkaServer id=100] started (kafka.server.KafkaServer)

Kafka Queuing: Apache Kafka as a Messaging System

Basically, Queuing in Kafka is one of the models for messaging traditionally. So, let’s begin with the brief introduction to Kafka as a Messaging System, that will help us to understand the Kafka Queuing well. Moreover, we will see some of the applications of Kafka Queue to clear the concept better.So, let’s start with Kafka Queuing tutorial.

Kafka Queuing
Kafka Queuing

2. Kafka as a Messaging System

There are two models for messaging traditionally, such as Kafka queuing and publish-subscribe in Kafka.

i. Kafka Queue

In this Kafka messaging system, a pool of Kafka consumers may read from a server. Also, each record goes to one of them here. It has some strengths as well as some weakness. Its strength is that it permits us to divide up the processing of data over multiple consumer instances, that help us scale our processing. But its weakness is, it is not multi-subscriber, as soon as one process reads the data it’s gone.

ii. Kafka Publish-Subscribe

Whereas in this Kafka Publish-Subscribe system, the record is broadcast to all the Kafka consumers. It permits us to broadcast data to multiple processes. However, it also has some limitations, like there is no way of scaling processing because here every message goes to every subscriber.
In Kafka, these two concepts are generalized by the Kafka consumer group. However, the consumer group in Kafka permits us to divide up processing over a collection of processes, with a Kafka queue. Here the collection of processes refers to the members of the consumer group. Moreover, Kafka permits us to broadcast messages to multiple consumer groups, with Kafka publish-subscribe.

So, the main benefit of Kafka’s model is both these properties are available in every Kafka topic —it can scale processing as well as it is multi-subscriber. Hence, that implies we do not have to select one or the other.
Kafka has stronger ordering guarantees than a traditional messaging system, too.
Since there is no parallelism in the processing in the traditional system, Kafka performs it well with the notion of parallelism. Because Kafka can offer both ordering guarantees and load balancing over a pool of consumer processes.

3. Need for Kafka Queuing

One of Kafka Queuing application is Microservice architecture. It essentially demands some sort of message queuing system. Let’s understand microservice architecture first. It is a concept of decoupling an interconnected monolithic application into different independent modules as well as external data sources along with APIs. Thus, for the purpose of handling microservices-external-source and inter-microservice and communications, Message queuing comes into the picture.
In addition, while we divide a big monolithic application into smaller, microservices (loosely-coupled), at that time the REST API calls increases which are amongst those microservices, and the number of connections to external data sources also increases.

However, keeping this huge system synchronous is not desirable, because it can render the entire application unresponsive. Also, it can defeat the whole purpose of dividing into microservices in the first place.
Hence, having Kafka at that time makes the whole data flow easier. Because it is distributed, highly fault-tolerant and also it has constant monitoring of broker nodes through services like Zookeeper. So, it makes it efficient to work.

4. Message Queuing in ML Solutions Pipeline

Apart from that, we can also use Kafka queuing for the various ML solution pipelines. However, ML solutions are built as:
A user interface (on the client side, mobile/web) — →An API server and the database — →Machine learning (blackbox).
However, ML black box is very compute-heavy and also it’s not practically easy to have those requests on a blocking synchronous mode. Moreover, all the requests can be in a queue and configure consumer API to take those requests one by one and feed them into the ML black box, in this scenario. So, while it comes to compute-intensive tasks, this pipeline can easily handle such as recognizing objects from thousands of images, that might take considerable time even without missing any requests.

Basically, microservices deployed into containers, such as by fault-tolerant distributed clusters of Kafka broker nodes it is mediated and using a Zookeeper it monitors it, which seems like a new way of enterprising software development.

How to Create Kafka Clients: Avro Producer & Consumer Client

In this article of Kafka clients, we will learn to create Apache Kafka clients by using Kafka API. There are several ways of creating Kafka clients such as at-most-once, at-least-once, and exactly-once message processing needs. So, in this Kafka Clients tutorial, we’ll learn the detailed description of all three ways. Moreover, we will see how to use the Avro client in detail.

So, let’s start Kafka Client Tutorial.

Kafka Clients
How to create Kafka Clients: Avro Producer & Consumer Client

2. What are Kafka Clients?

  • Prerequisites to create Kafka clients
  1. Initially, for creating Kafka Clients, we have to setup Apache Kafka middleware on our local machine. 
  2. Moreover, before starting to create Kafka clients, a locally installed single node Kafka instance must run on our local machine along with a running Zookeeper and a running Kafka node.


Further, in Kafka Clients to create a topic named normal-topic with two partitions the command is:

  1. ~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

Further, execute the following command, to check the status of the created topic:

  1. ~/kafka/bin/kafka-topics.sh --list --topic normal-topic --zookeeper localhost:2181

Also, to increase the partition if the topic needs to be altered, execute the following command:

  1. ~/kafka/bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

3. Kafka Producer Client

Here, is the following code to implement a Kafka producer client. It will help to send text messages and also to adjust the loop in order to control the number of messages that need to be sent to create Kafka Clients:

//import util.properties packages

import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;

public class ProducerClient {
    public static void main(String[] str) throws InterruptedException, IOException {
        System.<em>out</em>.println("Starting ProducerExample ...");
        <em>sendMessages</em>();
    }

    private static void sendMessages() throws InterruptedException, IOException {
        Producer<String, String> producer = <em>createProducer</em>();
        <em>sendMessages</em>(producer);
        // Allow the producer to complete sending of the messages before program exit.
        Thread.<em>sleep</em>(20);
    }

    private static Producer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        // Controls how much bytes sender would wait to batch up before publishing to Kafka.
        props.put("batch.size", 10);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer(props);
    }

    private static void sendMessages(Producer<String, String> producer) {
        String topic = "normal-topic";
        int partition = 0;
        long record = 1;
        for (int i = 1; i <= 10; i++) {
            producer.send(
                    new ProducerRecord<String, String>(topic, partition, Long.<em>toString</em>(record), Long.<em>toString</em>(record++)));
        }
    }
}

4.6 Message Delivery Semantics

Below definitions are quoted from Akka Documentation

at-most-once delivery

means that for each message handed to the mechanism, that message is delivered zero or one times; in more casual terms it means that messages may be lost.

at-least-once delivery

means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds; again, in more casual terms this means that messages may be duplicated but not lost.

exactly-once delivery

means that for each message handed to the mechanism exactly one delivery is made to the recipient; the message can neither be lost nor duplicated.

The first one is the cheapest—highest performance, least implementation overhead—because it can be done in a fire-and-forget fashion without keeping state at the sending end or in the transport mechanism. The second one requires retries to counter transport losses, which means keeping state at the sending end and having an acknowledgement mechanism at the receiving end. The third is most expensive—and has consequently worst performance—because in addition to the second it requires state to be kept at the receiving end in order to filter out duplicate deliveries

Now that we understand a little about how producers and consumers work, let’s discuss the semantic guarantees Kafka provides between producer and consumer. Clearly there are multiple possible message delivery guarantees that could be provided:

  • At most once—Messages may be lost but are never redelivered.
  • At least once—Messages are never lost but may be redelivered.
  • Exactly once—this is what people actually want, each message is delivered once and only once.

It’s worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message.

Many systems claim to provide “exactly once” delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don’t translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost).

Kafka’s semantics are straight-forward. When publishing a message we have a notion of the message being “committed” to the log. Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains “alive”. The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let’s assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key.

Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. The main use case for this is exactly-once processing between Kafka topics (described below).

Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message.

Now let’s describe the semantics from the point-of-view of the consumer. All replicas have the exact same log with the same offsets. The consumer controls its position in this log. If the consumer never crashed it could just store this position in memory, but if the consumer fails and we want this topic partition to be taken over by another process the new process will need to choose an appropriate position from which to start processing. Let’s say the consumer reads some messages — it has several options for processing the messages and updating its position.

  1. It can read the messages, then save its position in the log, and finally process the messages. In this case there is a possibility that the consumer process crashes after saving its position but before saving the output of its message processing. In this case the process that took over processing would start at the saved position even though a few messages prior to that position had not been processed. This corresponds to “at-most-once” semantics as in the case of a consumer failure messages may not be processed.
  2. It can read the messages, process the messages, and finally save its position. In this case there is a possibility that the consumer process crashes after processing messages but before saving its position. In this case when the new process takes over the first few messages it receives will already have been processed. This corresponds to the “at-least-once” semantics in the case of consumer failure. In many cases messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself).

So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a Kafka Streams application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer’s position is stored as a message in a topic, so we can write the offset to Kafka in the same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer’s position will revert to its old value and the produced data on the output topics will not be visible to other consumers, depending on their “isolation level.” In the default “read_uncommitted” isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in “read_committed,” the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction).

When writing to an external system, the limitation is in the need to coordinate the consumer’s position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a Kafka Connect connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication.

So effectively Kafka supports exactly-once delivery in Kafka Streams, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also Kafka Connect). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.

4. Consumer Can Register With Kafka

Various Ways a Consumer Can Register With Kafka

Before explaining how to register at-most-once, a-least-once, or exactly-once consumers, let us look at the two ways a consumer can register with a Kafka broker. 

  1. Registration using the subscribe method call. When a consumer registers with Kafka with a subscribe method call, Kafka rebalances the available consumers when a topic or partition gets added/deleted, or when a consumer gets added or deleted. This registration further offers two variants: (a) Along with the registration call, consumer can also provide a listener as a second parameter to the subscribe method call. When a consumer is registered this way, Kafka notifies the listener whenever a rebalance occurs. The listener could provide the consumer an opportunity to manually manage the offset, which is very useful for an exactly-once type consumer. (b) Subscription of a consumer that does not provide the second optional listener parameter.
  2. Registration of consumer to Kafka with an assign method call. When a consumer is registered with an assign method call, Kafka does not offer an automatic re-balance of the consumers.

Either of the above registration options (1 or 2) can be used by at-most-once, a-least-once or exactly-once consumers.

In the consumer examples explained below, the at-most-once and at-least-once consumer registers to Kafka by using option (1, b). The exactly-once consumer shows two examples, the first example registers to Kafka by using option (1, a), and the second example registers to Kafka by using option (2).

At first, let’s learn several ways, by which a Kafka consumer client can register with a Kafka broker. Specifically, there are two methods, either using the subscribe() method call or using an assign() method call. Let’s learn both these Kafka Clients methods in detail.

a. Using the Subscribe() Method Call

When using a subscribe method call, Kafka automatically rebalances the available consumers:

  • at the time of topic or partition gets added/deleted.
  • at the time a consumer gets added or deleted.

b. Using the Assign() Method Call.

However, Kafka clients do not offer an automatic re-balance of the consumers, when a consumer is registered with an assign method call.

Either of the above registration options can be used by at-most-once, at-least-once or exactly-once consumers.
i. At-most-once Kafka Consumer (Zero or More Deliveries)
Basically, it is the default behavior of a Kafka Consumer.
In order to configure this type of consumer in Kafka Clients, follow these steps:

  • First, set ‘enable.auto.commit’ to true.
  • Also, set ‘auto.commit.interval.ms’ to a lower timeframe.
  • Make sure, don’t make calls to consumer.commitSync(); from the consumer. Moreover,  Kafka would auto-commit offset at the specified interval, with this configuration of the consumer.

However, there is a possibility that consumer could exhibit at-most-once or at-least-once behavior, while a consumer is configured this way. Although, let’s declare this consumer as at-most-once because at-most-once is the lower messaging guarantee. Let’s discuss both consumer behaviors in detail:

  • At-most-once scenario

The moment when commit interval has occurred, and also which triggers Kafka to automatically commit the last used offset, this scenario happens. However, let’s suppose the messages and consumer have crashed between the processing. Then it starts to receive messages from the last committed offset when consumer restarts. Meanwhile, a consumer could lose a few messages.
Explore Advantages and Disadvantages of Kafka

  • At-least-once scenario

While consumer processes a message and commits the message into its persistent store and consumer crashes at that point this scenario happens. However, let’s suppose Kafka could not get a chance to commit the offset to the broker since commit interval has not passed. Then, it gets delivered with a few older messages from the last committed offset when the consumer restarts.
Code for Kafka Consumer:

  1. public class AtMostOnceConsumer {
  2. public static void main(String[] str) throws InterruptedException {
  3. System.out.println(“Starting AtMostOnceConsumer …”);
  4. execute();
  5. }
  6. private static void execute() throws InterruptedException {
  7. KafkaConsumer<String, String> consumer = createConsumer();
  8. // Subscribe to all partition in that topic. ‘assign’ could be used here
  9. // instead of ‘subscribe’ to subscribe to specific partition.
  10. consumer.subscribe(Arrays.asList(“normal-topic”));
  11. processRecords(consumer);
  12. }
  13. private static KafkaConsumer<String, String> createConsumer() {
  14. Properties props = new Properties();
  15. props.put(“bootstrap.servers”, “localhost:9092”);
  16. String consumeGroup = “cg1”;
  17. props.put(“group.id”, consumeGroup);
  18. // Set this property, if auto commit should happen.
  19. props.put(“enable.auto.commit”, “true”);
  20. // Auto commit interval, kafka would commit offset at this interval.
  21. props.put(“auto.commit.interval.ms”, “101”);
  22. // This is how to control number of records being read in each poll
  23. props.put(“max.partition.fetch.bytes”, “135”);
  24. // Set this if you want to always read from beginning.
  25. // props.put(“auto.offset.reset”, “earliest”);
  26. props.put(“heartbeat.interval.ms”, “3000”);
  27. props.put(“session.timeout.ms”, “6001”);
  28. props.put(“key.deserializer”,
  29. “org.apache.kafka.common.serialization.StringDeserializer”);
  30. props.put(“value.deserializer”,
  31. “org.apache.kafka.common.serialization.StringDeserializer”);
  32. return new KafkaConsumer<String, String>(props);
  33. }
  34. private static void processRecords(KafkaConsumer<String, String> consumer) {
  35. while (true) {
  36. ConsumerRecords<String, String> records = consumer.poll(100);
  37. long lastOffset = 0;
  38. for (ConsumerRecord<String, String> record : records) {
  39. System.out.printf(“\n\roffset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
  40. lastOffset = record.offset();
  41. }
  42. System.out.println(“lastOffset read: ” + lastOffset);
  43. process();
  44. }
  45. }
  46. private static void process() throws InterruptedException {
  47. // create some delay to simulate processing of the message.
  48. Thread.sleep(20);
  49. }
  50. }

ii. At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)
In order to configure this type of consumer, follow these steps:

  • First, set ‘enable.auto.commit’ to false  or
  • Also, set ‘enable.auto.commit’ to true with ‘auto.commit.interval.ms’ to a higher number.

By making the following call consumer.commitSync(), Consumer should now then take control of the message offset commits to Kafka;
In addition, to avoid reprocessing of the duplicate messages, implement ‘idempotent’ behavior within consumer, especially, for this type of consumer because in the following scenario, duplicate message delivery could happen.

Code:

  1. public class AtLeastOnceConsumer {
  2. public static void main(String[] str) throws InterruptedException {
  3. System.out.println(“Starting AutoOffsetGuranteedAtLeastOnceConsumer …”);
  4. execute();
  5. }
  6. private static void execute() throws InterruptedException {
  7. KafkaConsumer<String, String> consumer = createConsumer();
  8. // Subscribe to all partition in that topic. ‘assign’ could be used here
  9. // instead of ‘subscribe’ to subscribe to specific partition.
  10. consumer.subscribe(Arrays.asList(“normal-topic”));
  11. processRecords(consumer);
  12. }
  13. private static KafkaConsumer<String, String> createConsumer() {
  14. Properties props = new Properties();
  15. props.put(“bootstrap.servers”, “localhost:9092”);
  16. String consumeGroup = “cg1”;
  17. props.put(“group.id”, consumeGroup);
  18. // Set this property, if auto commit should happen.
  19. props.put(“enable.auto.commit”, “true”);
  20. // Make Auto commit interval to a big number so that auto commit does not happen,
  21. // we are going to control the offset commit via consumer.commitSync(); after processing // message.
  22. props.put(“auto.commit.interval.ms”, “999999999999”);
  23. // This is how to control number of messages being read in each poll
  24. props.put(“max.partition.fetch.bytes”, “135”);
  25. props.put(“heartbeat.interval.ms”, “3000”);
  26. props.put(“session.timeout.ms”, “6001”);
  27. props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  28. props.put(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”);
  29. return new KafkaConsumer<String, String>(props);
  30. }
  31. private static void processRecords(KafkaConsumer<String, String> consumer) throws {
  32. while (true) {
  33. ConsumerRecords<String, String> records = consumer.poll(100);
  34. long lastOffset = 0;
  35. for (ConsumerRecord<String, String> record : records) {
  36. System.out.printf(“\n\roffset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
  37. lastOffset = record.offset();
  38. }
  39. System.out.println(“lastOffset read: ” + lastOffset);
  40. process();
  41. // Below call is important to control the offset commit. Do this call after you
  42. // finish processing the business process.
  43. consumer.commitSync();
  44. }
  45. }
  46. private static void process() throws InterruptedException {
  47. // create some delay to simulate processing of the record.
  48. Thread.sleep(20);
  49. }
  50. }

iii. Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)
Here, via a ‘subscribe’(1, a) registration method call, a consumer registers with Kafka.
Make sure, the offset should be manually managed in this case. In order to setup exactly-once scenario in Kafka Clients, follow these steps:

  • At first, set enable.auto.commit = false.
  • After processing the message, don’t make calls to consumer.commitSync().
  • Moreover, by making a ‘subscribe’ call, Register consumer to a topic.
  • In order to start reading from a specific offset of that topic/partition, implement a ConsumerRebalanceListener. Also, perform consumer.seek(topicPartition, offset), within the listener.
  • As a safety net, implement idempotent.

Code:

  1. public class ExactlyOnceDynamicConsumer {
  2. private static OffsetManager offsetManager = new OffsetManager(“storage2”);
  3. public static void main(String[] str) throws InterruptedException {
  4. System.out.println(“Starting ExactlyOnceDynamicConsumer …”);
  5. readMessages();
  6. }
  7. private static void readMessages() throws InterruptedException {
  8. KafkaConsumer<String, String> consumer = createConsumer();
  9. // Manually controlling offset but register consumer to topics to get dynamically
  10. // assigned partitions. Inside MyConsumerRebalancerListener use
  11. // consumer.seek(topicPartition,offset) to control offset which messages to be read.
  12. consumer.subscribe(Arrays.asList(“normal-topic”),
  13. new MyConsumerRebalancerListener(consumer));
  14. processRecords(consumer);
  15. }
  16. private static KafkaConsumer<String, String> createConsumer() {
  17. Properties props = new Properties();
  18. props.put(“bootstrap.servers”, “localhost:9092”);
  19. String consumeGroup = “cg3”;
  20. props.put(“group.id”, consumeGroup);
  21. // To turn off the auto commit, below is a key setting.
  22. props.put(“enable.auto.commit”, “false”);
  23. props.put(“heartbeat.interval.ms”, “2000”);
  24. props.put(“session.timeout.ms”, “6001”);
  25. // Control maximum data on each poll, make sure this value is bigger than the maximum // single message size
  26. props.put(“max.partition.fetch.bytes”, “140”);
  27. props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  28. props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  29. return new KafkaConsumer<String, String>(props);
  30. }
  31. private static void processRecords(KafkaConsumer<String, String> consumer)
  32. while (true) {
  33. ConsumerRecords<String, String> records = consumer.poll(100);
  34. for (ConsumerRecord<String, String> record : records) {
  35. System.out.printf(“offset = %d, key = %s, value = %s\n”, record.offset(), record.key(), record.value());
  36. // Save processed offset in external storage.
  37. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
  38. }
  39. }
  40. }
  41. }
  42. public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
  43. private OffsetManager offsetManager = new OffsetManager(“storage2”);
  44. private Consumer<String, String> consumer;
  45. public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
  46. this.consumer = consumer;
  47. }
  48. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  49. for (TopicPartition partition : partitions) {
  50. offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
  51. }
  52. }
  53. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  54. for (TopicPartition partition : partitions) {
  55. consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
  56. }
  57. }
  58. }
  59. /**
  60. * The partition offset are stored in an external storage. In this case in a local file system where
  61. * program runs.
  62. */
  63. public class OffsetManager {
  64. private String storagePrefix;
  65. public OffsetMpublic class ExactlyOnceDynamicConsumer {
  66. private static OffsetManager offsetManager = new OffsetManager(“storage2”);
  67. public static void main(String[] str) throws InterruptedException {
  68. System.out.println(“Starting ExactlyOnceDynamicConsumer …”);
  69. readMessages();
  70. }
  71. private static void readMessages() throws InterruptedException {
  72. KafkaConsumer<String, String> consumer = createConsumer()
  73. // Manually controlling offset but register consumer to topics to get dynamically
  74. // assigned partitions. Inside MyConsumerRebalancerListener use
  75. // consumer.seek(topicPartition,offset) to control offset which messages to be read.
  76. consumer.subscribe(Arrays.asList(“normal-topic”),
  77. new MyConsumerRebalancerListener(consumer));
  78. processRecords(consumer);
  79. }
  80. private static KafkaConsumer<String, String> createConsumer() {
  81. Properties props = new Properties();
  82. props.put(“bootstrap.servers”, “localhost:9092”);
  83. String consumeGroup = “cg3”;
  84. props.put(“group.id”, consumeGroup);
  85. // To turn off the auto commit, below is a key setting.
  86. props.put(“enable.auto.commit”, “false”);
  87. props.put(“heartbeat.interval.ms”, “2000”);
  88. props.put(“session.timeout.ms”, “6001”);
  89. // Control maximum data on each poll, make sure this value is bigger than the maximum // single message size
  90. props.put(“max.partition.fetch.bytes”, “140”);
  91. props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  92. props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  93. return new KafkaConsumer<String, String>(props);
  94. }
  95. private static void processRecords(KafkaConsumer<String, String> consumer) {
  96. while (true) {
  97. ConsumerRecords<String, String> records = consumer.poll(100);
  98. for (ConsumerRecord<String, String> record : records) {
  99. System.out.printf(“offset = %d, key = %s, value = %s\n”, record.offset(), record.key(), record.value());
  100. // Save processed offset in external storage.
  101. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
  102. }
  103. }
  104. }
  105. }
  106. public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
  107. private OffsetManager offsetManager = new OffsetManager(“storage2”);
  108. private Consumer<String, String> consumer;
  109. public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
  110. this.consumer = consumer;
  111. }
  112. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  113. for (TopicPartition partition : partitions) {
  114. offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
  115. }
  116. }
  117. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  118. for (TopicPartition partition : partitions) {
  119. consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
  120. }
  121. }
  122. }
  123. /**
  124. * The partition offset are stored in an external storage. In this case in a local file system where
  125. * program runs.
  126. */
  127. public class OffsetManager {
  128. private String storagePrefix;
  129. public OffsetManager(String storagePrefix) {
  130. this.storagePrefix = storagePrefix;
  131. }
  132. /**
  133. * in an external storage, overwrite the offset for the topic.
  134. *
  135. * @param topic – Topic name.
  136. * @param partition – Partition of the topic.
  137. * @param offset – offset to be stored.
  138. */
  139. void saveOffsetInExternalStore(String topic, int partition, long offset) {
  140. try {
  141. FileWriter writer = new FileWriter(storageName(topic, partition), false);
  142. BufferedWriter bufferedWriter = new BufferedWriter(writer);
  143. bufferedWriter.write(offset + “”);
  144. bufferedWriter.flush();
  145. bufferedWriter.close();
  146. } catch (Exception e) {
  147. e.printStackTrace();
  148. throw new RuntimeException(e);
  149. }
  150. }
  151. /**
  152. * @return he last offset + 1 for the provided topic and partition.
  153. */
  154. long readOffsetFromExternalStore(String topic, int partition) {
  155. try {
  156. Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
  157. return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
  158. } catch (Exception e) {
  159. e.printStackTrace();
  160. }
  161. return 0;
  162. }
  163. private String storageName(String topic, int partition) {
  164. return storagePrefix + “-” + topic + “-” + partition;
  165. }
  166. }
  167. anager(String storagePrefix) {
  168. this.storagePrefix = storagePrefix;
  169. }
  170. /**
  171. * in an external storage, overwrite the offset for the topic.
  172. *
  173. * @param topic – Topic name.
  174. * @param partition – Partition of the topic.
  175. * @param offset – offset to be stored.
  176. */
  177. void saveOffsetInExternalStore(String topic, int partition, long offset) {
  178. try {
  179. FileWriter writer = new FileWriter(storageName(topic, partition), false);
  180. BufferedWriter bufferedWriter = new BufferedWriter(writer);
  181. bufferedWriter.write(offset + “”);
  182. bufferedWriter.flush();
  183. bufferedWriter.close();
  184. } catch (Exception e) {
  185. e.printStackTrace();
  186. throw new RuntimeException(e);
  187. }
  188. }
  189. /**
  190. * @return he last offset + 1 for the provided topic and partition.
  191. */
  192. long readOffsetFromExternalStore(String topic, int partition) {
  193. try {
  194. Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
  195. return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
  196. } catch (Exception e) {
  197. e.printStackTrace();
  198. }
  199. return 0;
  200. }
  201. private String storageName(String topic, int partition) {
  202. return storagePrefix + “-” + topic + “-” + partition;
  203. }
  204. }


iv. Exactly-Once Kafka Static Consumer via Assign (One and Only One Message Delivery)
Here, via an ‘assign (2) registration method call, the consumer registers with Kafka clients.
Make sure, the offset should be manually managed in this case. In order to setup Exactly-once Kafka Static Consumer via Assign, follow  these steps:

  • At first, set enable.auto.commit = false
  • Remember, after processing the message, don’t make calls to consumer.commitSync().
  • Moreover, by using ‘assign’ call, register consumer to the specific partition.
  • by calling consumer.seek(topicPartition, offset), seek to specific message offset, on startup of the consumer.
  • Also, as a safety net, implement idempotent.

Code:

  1. public class ExactlyOnceStaticConsumer {
  2. private static OffsetManager offsetManager = new OffsetManager(“storage1”);
  3. public static void main(String[] str) throws InterruptedException, IOException {
  4. System.out.println(“Starting ExactlyOnceStaticConsumer …”);
  5. readMessages();
  6. }
  7. private static void readMessages() throws InterruptedException, IOException {
  8. KafkaConsumer<String, String> consumer = createConsumer();
  9. String topic = “normal-topic”;
  10. int partition =1;
  11. TopicPartition topicPartition =
  12. registerConsumerToSpecificPartition(consumer, topic, partition);
  13. // Read the offset for the topic and partition from external storage.
  14. long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
  15. // Use seek and go to exact offset for that topic and partition.
  16. consumer.seek(topicPartition, offset);
  17. processRecords(consumer);
  18. }
  19. private static KafkaConsumer<String, String> createConsumer() {
  20. Properties props = new Properties();
  21. props.put(“bootstrap.servers”, “localhost:9092”);
  22. String consumeGroup = “cg2”;
  23. props.put(“group.id”, consumeGroup);
  24. // To turn off the auto commit, below is a key setting.
  25. props.put(“enable.auto.commit”, “false”);
  26. props.put(“heartbeat.interval.ms”, “2000”);
  27. props.put(“session.timeout.ms”, “6001”);
  28. // control maximum data on each poll, make sure this value is bigger than the maximum // single message size
  29. props.put(“max.partition.fetch.bytes”, “140”);
  30. props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  31. props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  32. return new KafkaConsumer<String, String>(props);
  33. }
  34. /**
  35. * Manually listens for specific topic partition. Now, see an example of how to * dynamically listens to partition and want to manually control offset,
  36. * ExactlyOnceDynamicConsumer.java
  37. */
  38. private static TopicPartition registerConsumerToSpecificPartition(
  39. KafkaConsumer<String, String> consumer, String topic, int partition) {
  40. TopicPartition topicPartition = new TopicPartition(topic, partition);
  41. List<TopicPartition> partitions = Arrays.asList(topicPartition);
  42. consumer.assign(partitions);
  43. return topicPartition;
  44. }
  45. /**
  46. * Process data and store offset in external store. Best practice is to do these operations
  47. * atomically.
  48. */
  49. private static void processRecords(KafkaConsumer<String, String> consumer) throws {
  50. while (true) {
  51. ConsumerRecords<String, String> records = consumer.poll(100);
  52. for (ConsumerRecord<String, String> record : records) {
  53. System.out.printf(“offset = %d, key = %s, value = %s\n”, record.offset(), record.key(), record.value());
  54. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
  55. }
  56. }
  57. }
  58. }

5. Avro Producer and Consumer

On defining Avro, it is an open source binary message exchange protocol. Basically, in order to send optimized messages across the wire, which also reduces the network overhead, we use it. Moreover, for messages that can be defined using JSON, Avro can enforce a schema. By using these schemas, Avro can generate binding objects in various programming languages. Using Avro with Kafka is natively supported as well as highly recommended.

Below is a simple Avro consumer and producer.

  1. public class AvroConsumerExample {
  2. public static void main(String[] str) throws InterruptedException {
  3. System.out.println(“Starting AutoOffsetAvroConsumerExample …”);
  4. readMessages();
  5. }
  6. private static void readMessages() throws InterruptedException {
  7. KafkaConsumer<String, byte[]> consumer = createConsumer();
  8. // Assign to specific topic and partition.
  9. consumer.assign(Arrays.asList(new TopicPartition(“avro-topic”, 0)));
  10. processRecords(consumer);
  11. }
  12. private static void processRecords(KafkaConsumer<String, byte[]> consumer) throws {
  13. while (true) {
  14. ConsumerRecords<String, byte[]> records = consumer.poll(100);
  15. long lastOffset = 0;
  16. for (ConsumerRecord<String, byte[]> record : records) {
  17. GenericRecord genericRecord = AvroSupport.byteArrayToData(AvroSupport.getSchema(), record.value());
  18. String firstName = AvroSupport.getValue(genericRecord, “firstName”, String.class);
  19. System.out.printf(“\n\roffset = %d, key = %s, value = %s”, record.offset(), record.key(), firstName);
  20. lastOffset = record.offset();
  21. }
  22. System.out.println(“lastOffset read: ” + lastOffset);
  23. consumer.commitSync();
  24. }
  25. }
  26. private static KafkaConsumer<String, byte[]> createConsumer() {
  27. Properties props = new Properties();
  28. props.put(“bootstrap.servers”, “localhost:9092”);
  29. String consumeGroup = “cg1”;
  30. props.put(“group.id”, consumeGroup);
  31. props.put(“enable.auto.commit”, “true”);
  32. props.put(“auto.offset.reset”, “earliest”);
  33. props.put(“auto.commit.interval.ms”, “100”);
  34. props.put(“heartbeat.interval.ms”, “3000”);
  35. props.put(“session.timeout.ms”, “30000”);
  36. props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
  37. props.put(“value.deserializer”, “org.apache.kafka.common.serialization.ByteArrayDeserializer”);
  38. return new KafkaConsumer<String, byte[]>(props);
  39. }
  40. }
  41. public class AvroProducerExample {
  42. public static void main(String[] str) throws InterruptedException, IOException {
  43. System.out.println(“Starting ProducerAvroExample …”);
  44. sendMessages();
  45. }
  46. private static void sendMessages() throws InterruptedException, IOException {
  47. Producer<String, byte[]> producer = createProducer();
  48. sendRecords(producer);
  49. }
  50. private static Producer<String, byte[]> createProducer() {
  51. Properties props = new Properties();
  52. props.put(“bootstrap.servers”, “localhost:9092”);
  53. props.put(“acks”, “all”);
  54. props.put(“retries”, 0);
  55. props.put(“batch.size”, 16384);
  56. props.put(“linger.ms”, 1);
  57. props.put(“buffer.memory”, 33554432);
  58. props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
  59. props.put(“value.serializer”, “org.apache.kafka.common.serialization.ByteArraySerializer”);
  60. return new KafkaProducer(props);
  61. }
  62. private static void sendRecords(Producer<String, byte[]> producer) throws IOException, {
  63. String topic = “avro-topic”;
  64. int partition = 0;
  65. while (true) {
  66. for (int i = 1; i < 100; i++)
  67. producer.send(new ProducerRecord<String, byte[]>(topic, partition, Integer.toString(0), record(i + “”)));
  68. }
  69. }
  70. private static byte[] record(String name) throws IOException {
  71. GenericRecord record = new GenericData.Record(AvroSupport.getSchema());
  72. record.put(“firstName”, name);
  73. return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);
  74. }
  75. }


Amir Masoud Sefidian
Amir Masoud Sefidian
Data Scientist, Researcher, Software Developer

Comments are closed.