Publishing to Kafka — Synchronous vs Asynchronous (2024)

Kafka is widely used for the asynchronous processing of events/messages.

By default, the Kafka client uses a blocking call to push the messages to the Kafka broker. We can use the non-blocking call if application requirements permit.

We were waiting for a response from the Kafka cluster in most of our applications while publishing messages. This was leading to timeouts and then to outages in case of network glitches, or slowness from the Kafka cluster. Below is a more detailed description.

Publishing to Kafka — Synchronous vs Asynchronous (3)

Before:

Earlier when the message was published to the Kafka broker, it was done in a synchronous way. The Kafka client used to wait for acknowledgment from the broker. Although by nature sending to the messaging broker should be asynchronous, it was actually synchronous & blocking, which was creating outages during slowness from the Kafka cluster.

In case of any error, while pushing to the primary cluster, we were trying to send the message to the backup cluster. Even if the backup cluster fails, we write the events to files.

public void send(String destination, String key, StreamMessage streamMessage) {
Future<RecordMetadata> result =
kafkaProducer.send(new ProducerRecord<>(destination, key, streamMessage.toJsonBytes()));
try {
result.get();
} catch (InterruptedException | ExecutionException exception) {
LOG.warn("Sending Data to Backup Cluster", exception);
sendMessageToBackupCluster(destination, key, streamMessage);
}
}

line 5, result.get(), in the above code snippet is blocking.

Now:

Now the client doesn’t wait for the broker to acknowledge. Instead, messages are dumped into the Kafka client’s buffer and the flow is returned back. Any error/exception can be handled in the callback.

public void send(String destination, String key, StreamMessage streamMessage) {
kafkaProducer.send(new ProducerRecord<>(destination, key, streamMessage.toJsonBytes()),
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
LOG.warn("Sending Data to Backup Cluster", exception);
sendMessageToBackupCluster(destination, key, streamMessage);
} else {
// LOG.debug("SUCCESS. Metadata {}", metadata);
}
}
});
}

Here the request is not blocked.

Impact:

The average response time of API endpoints was reduced to 3 milliseconds from 100 milliseconds.

Publishing to Kafka — Synchronous vs Asynchronous (4)

Fallback:

We are trying to send the messages to the backup Kafka cluster in case of a failure while publishing messages to the primary cluster. This is again a network call. And as mentioned above, operations in callbacks should be reasonably fast. Hence to avoid blocking the producer IO thread in callbacks, we started publishing messages to the backup cluster in an asynchronous manner.

In case of a failure, while publishing to the backup cluster, we are dumping the messages to text files on the disk and process them using a cron.

Here is the pseudo-code:

Send to the primary cluster
Handle response or error in the callback
If error {
Send to the backup cluster // first fallback
Handle response or error in the callback
If error {
Dump message to text files on disk // second fallback
}
If Success {
print metadata
}
} If Success {
print metadata
}

Concern:

We need to ensure that heavy processing is not done in the callback. As per this document:

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body to parallelize processing

Also, since all the messages are placed in the Kafka client buffer, we must ensure that the application shutdown is handled gracefully. In case the application shutdown is not graceful, all the messages present in the buffer will be lost.

The graceful shutdown can be achieved by destroying all the producers on the trigger of a context-close event.

public void onApplicationEvent(ContextClosedEvent event) {
LOG.info("starting producer shutdown");

// Destroy producers here

LOG.info("completed producer shutdown for all producers");
}

Conclusion:

Since using a callback to handle the errors while producing a message to Kafka broker is a non-blocking call. It increases the throughput significantly. We should use this approach instead of blocking calls unless application requirements specify otherwise.

Publishing to Kafka — Synchronous vs Asynchronous (2024)
Top Articles
Latest Posts
Article information

Author: Arielle Torp

Last Updated:

Views: 5502

Rating: 4 / 5 (61 voted)

Reviews: 92% of readers found this page helpful

Author information

Name: Arielle Torp

Birthday: 1997-09-20

Address: 87313 Erdman Vista, North Dustinborough, WA 37563

Phone: +97216742823598

Job: Central Technology Officer

Hobby: Taekwondo, Macrame, Foreign language learning, Kite flying, Cooking, Skiing, Computer programming

Introduction: My name is Arielle Torp, I am a comfortable, kind, zealous, lovely, jolly, colorful, adventurous person who loves writing and wants to share my knowledge and understanding with you.