When data is in motion, it is fluid, just like water flowing in a river. Kafka is truly a powerful platform for real-time data streaming.
In the Kafka ecosystem, a Kafka Producer is a crucial component responsible for publishing data to Kafka topics. These producers play a pivotal role in real-time data streaming, ensuring that data is efficiently and reliably delivered to Kafka brokers and consuming applications. A well-configured Kafka producer is essential for maintaining data integrity, low latency, and scalability, making it a cornerstone of any data streaming architecture.
Ignoring best practices can lead to a range of issues, including data loss, inefficient resource utilisation, and operational challenges. Let’s explore some Kafka producer best practices, and understand why they are vital.
Optimise Batch Size
Kafka producers can send messages in batches, which is more efficient than sending individual messages. However, it’s important to strike a balance between batch size and latency. Smaller batches reduce latency, while larger batches maximise throughput. Related configurations include batch.size
and linger.ms
When sending messages out as soon as possible is what is needed and high throughput is not critical, then setting linger.ms
to 0
is the way to go, and this is actually the default setting in Kafka producer.
When high throughput is of higher criticality, then there is a need to enable batching on the producer by setting linger.ms
to a positive value while setting batch.size
to a reasonable value. Experiment to find the optimal batch size and linger value for your use case. It would be great if metrics were set up such that there is statistics available for analysis when experimenting with the different setting combinations of these configuration, see below Monitoring and Metrics section for more information.
Acknowledgments
Kafka provides various acknowledgment modes (acks
), options including all
(the strongest guarantee) and 0
(the weakest guarantee).
Choose the appropriate acks
setting based on your data requirements. all
is for strong guarantees, ensuring data is safely replicated, while 0
offers low latency at the cost of possibly losing data, and 1
is for a balance between performance and reliability. Note that older versions of Kafka default to 1
, and newer versions default to all
, which is generally a safer option, choose a lower value when data loss is tolerable and low latency is critical.
When acks
is set to all
, make sure min.insync.replicas
is set to a reasonable value (for example, when replication factors is set to 3
, min.insync.replicas
is typically set to 2
). This is because all
here means all the in-sync replicas (2
in previous example), not all existing replicas (3
in previous example).
Compression
Enable compression to reduce the amount of data sent over the network and stored in Kafka brokers. This can significantly improve throughput and storage efficiency at the low overhead of CPU usage in compression. To be more effective with compression, enable batching as mentioned in the above Optimise Batch Size section.
Compression can be configured at various places: producer, broker, or topic level, but by default compression is not enabled. Refer to this blog for more details.
It is commendable to do:
- Leave topic
compression.type
to its default valueproducer
, meaning retain the original compression codec set by the producer. - Specify
compression.type
in the producer.
and it is not recommended to:
- Specify different compression algorithms at these different places, which results in the broker doing extra decompression and compression work unnecessarily, wasting precious resources.
- Leave
compression.type
in the producer unspecified while setting acompression.type
codec on topic, which results in compression being done on broker, not producer. Exception to this is when producer is already CPU intensive, in this case we do not want to add more CPU usage to the producer.
When enabled in the producer, it does compression before sending data to the broker. It is the consumer that decompresses the data before use. Depending on the cases, the broker, at minimum, does some amount of decompression work to validate data integrity. In three scenarios, the broker does a full decompression:
- When the received batch from producer is compressed by a codec different from what is configured on the topic;
- When running Confluent Server brokers and have enabled Schema Validation on the topic.
- When compaction is enabled, broker periodically decompresses batches to filter out records eligible for compaction.
It is advisable to verify integration with intended consumers once compression is enabled, making sure they are able to decompress messages successfully, especially when there are heterogeneous systems involved – for example, integration with a legacy system, a third party system, a non JVM based system, etc.
Comparison between compression algorithms:
Compression type | Compression ratio | CPU usage | Compression speed | Network bandwidth usage |
---|---|---|---|---|
Gzip | Highest | Highest | Slowest | Lowest |
Snappy | Medium | Moderate | Moderate | Medium |
Lz4 | Low | Lowest | Fastest | Highest |
Zstd | Medium | Moderate | Moderate | Medium |
Error Handling
Implement robust error handling and retries in your Kafka producer code. Handle transient errors gracefully to prevent data loss. For example, if data loss is not acceptable then we could utilise callback mechanism of KafkaProducer
to verify that messages are published as expected.
Example code in Java that uses a callback in producer:
producer.send(producerRecord, (recordMetadata, exception) -> {
if (exception == null) {
System.out.println("Record written to offset " +
recordMetadata.offset() + " partition " +
recordMetadata.partition());
} else {
System.err.println("An error occurred");
exception.printStackTrace(System.err);
}
});
By default, the Kafka producer will automatically retry sending messages when it encounters retriable errors, for example, NotEnoughReplicasException
, and TimeoutException
.
In other cases, when errors are non-retriable such as RecordTooLargeException
, TopicAuthorizationException
, producer will return the error to application, e.g. via callback. Applications should take these scenarios under consideration and implement error handling logics to logs errors, raise alerts, or even halt message publishing depending on use cases.
Throttling
Monitor your producer’s performance and implement throttling mechanisms to prevent overloading Kafka brokers.
Kafka’s producer configurations include max.request.size
, linger.ms
, and max.in.flight.requests.per.connection
, which can be adjusted to control the sending rate. For example, increasing linger.ms
allows you to introduce a delay before sending a batch of records. max.in.flight.requests.per.connection
, when set to 1, it ensures that a new request is not sent until the previous one is acknowledged.
Also, we can choose to implement rate limiting logic in application level. Here is an example of Rate Limiting in Golang.
Another way to implement throttling is via broker configuration to enforce quotas on clients (including producers and consumers). Refer to Kafka Definitive Guide 2nd page 75 for more information.
Idempotence
Enable idempotence in your Kafka producer settings to ensure that messages sent to Kafka are not deduplicated, where duplicates are prevented during retries. To enable idempotent producer, set enable.idempotence
to true.
Another benefit of enabling idempotence is that message ordering is guaranteed, and when we start using transactions, the producer automatically will enable idempotence.
Transactions
Transactions enable atomic writes to multiple Kafka topics and partitions. All the messages in the transaction will be successfully written, or none will be.
Consider using transactions in scenarios:
- There is a need to publish multiple messages atomically, no matter they arrive in the same/different partitions or topics;
- In addition to publishing messages, there is a need to consume messages from source topics and commit offsets, and these should be done atomically;
Example Java code to initiate transactions in a producer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
The purpose of the transactional.id
is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application.
Message Keys
Select a meaningful message key. Keys determine the partition to which a message is sent, which can be important for maintaining message order within a partition. Common choices are string, JSON string, and Avro.
Generally speaking, string is lightweight and simpler, but it lacks structure; whereas Avro supports structure – e.g. there can be three fields that form a composite key – yet it is an indication of a dependency of a Schema Registry which serve the purpose of managing all Avro schemas. JSON string supports structure, it may or may not require the use of Schema Registry, depending on the need and how to centralise the schema management.
Also, please note that in JSON string approach, the two JSON strings {"a":1,"b":2}
and {"b":2,"a":1}
are represented by different bytes sequences hence they could end up being assigned to different partitions even though they look equivalent.
Be careful when there are producers with different flavours – some are written in Java, some are in Python, for example – it is advised to pay attention to The Difference in Hashing Algorithms which can result in messages with the same key being published to different partitions of the same topic due to different algorithms are chosen as the default hashing calculation in partitioner implementations in different language/libraries, which is configurable though. So if this is the case, then we should choose a consistent hashing algorithm across the platform or even the entire enterprise.
Monitoring and Metrics
Implement comprehensive monitoring and metrics collection to gain insights into your producer’s performance. Tools like Prometheus and Grafana, with Kafka’s built-in metrics, can provide valuable information. Visit here for a reference of what built-in producer metrics are available. For support for adding custom metrics, keep an eye on KIP-877.
Below are some metrics that are of interest when we want to understand how batches are performing in a Kafka producer:
- batch-size-avg
- records-per-request-avg
- record-queue-time-avg
- record-send-rate
- record-size-avg
- compression-rate-avg
And metrics for requests in general:
- request-rate
- requests-in-flight
- request-latency-avg
Testing
Perform thorough testing of your Kafka producer in different scenarios, including high-load conditions, network failures, and broker outages. Testing can uncover potential issues before they affect your production environment.
For unit testing, we can leverage MockProducer
and here is an example of its usage.
For integration testing, we can leverage EmbeddedKafka if your application is written in SpringBoot; Testcontainers is another option in environments where container is supported.
For benchmark testing, Kafka has a built-in tool kafka-producer-perf-test
; for rich features we can take a look at Trogdor.
Version Compatibility
Ensure that your Kafka producer’s version is compatible with your Kafka broker version to avoid compatibility issues. If Confluent Platform is being used, we can refer to this table for compatibility check.
Security
Follow security best practices when configuring your Kafka producer, such as using SSL/TLS for encryption and SASL for authentication, or enabling mTLS (Mutual TLS).
With Kafka records produced, we can leverage existing producer features like interceptors to enhance the security of event data in transit and at rest. To learn more about this, please follow the LimePoint talk Securing Kafka Event Data & Selective Encryption
Conclusion
By following these Kafka producer best practices, you can create robust and efficient data streaming pipelines. Properly configured producers help ensure data reliability, low latency, and efficient use of resources, making Kafka an invaluable tool for real-time data processing.
🚀 Take Your #Kafka journey to the next level! 🚀
Ready to delve deeper and unlock the full potential of data streams? As a Confluent Premier Partner, LimePoint is here to guide your journey. Reach out to us today — we’re eager to collaborate with you!