Updated February 28, 2023
Introduction to Kafka Console Producer
A Kafka-console-producer is a program that comes with Kafka packages which are the source of data in Kafka. It is used to read data from standard input or command line and write it to a Kafka topic (place holder of messages). Kafka-console-producer keeps data into the cluster whenever we enter any text into the console. Topics are made of partitions where producers write this data. In other words, “it creates messages from command line input (STDIN)”.
In Kafka, there are two types of producers.
- Sync -It sends messages directly in the background.
- A sync-It send messages whenever considering the number of messages with higher throughput.
The producer automatically finds broker and partition where data to write. It removes the dependency by connecting to Kafka and then the producer that is going to produce messages to respective broker and partitions. When there is a broker failure, and for some reason, the broker is going down, the producer will automatically recover; this producer provides a booster among the partition and broker. It’s just well-programmed .simply we don’t have to implement the features.
How does Kafka Console Producer work?
Here how does it works:
We have a producer who is sending data to partition 0 of broker 1 of topic A. Producer Know which brokers to write to. Basically, if the producer sends data without a key, then it will choose a broker based on the round-robin algorithm. The producer does a load balancer among the actual brokers. The producer used to write data by choosing to receive an acknowledgement of data.
Types of Confirmation or Acknowledgment Node
- ack=0; in this case, we don’t have actual knowledge about the broker. If the producer sends data to a broker and it’s already down, there is a chance of data loss and danger to use as well.
- ack=1; This is the default confirmation from the brokers where a producer will wait for a leader that is a broker. In this case, the broker is present and ready to accept data from the producer
- ack=all; In this case, we have a combination of Leader and Replicas . If any broker is a failure, the same set of data is present in replica, and possibly there is possibly no data loss.
Features of Kafka console Producer
- It is Thread-safe: -In each producer has a buffer space pool that holds records, which is not yet transmitted to the server. The I/O thread which is used to send these records as a request to the cluster.
- Kafka-console producer is Durable: -The acks is responsible for providing criteria under which the request ace considered complete. With the help ofack=” all”, blocking on the full commit of the record, this setting considered as the durable setting.
- It provides scalability:-The producer maintains buffers of unsent records for each partition. These buffers are sent based on the batch size, which also handles a large number of messages simultaneously.
- Kafka-console producers Fault-tolerant: –When there is a node failure down, the producer has an essential feature to provide resistance to a node and recover automatically.
The Kafka console producer is idempotent, which strengthens delivery semantics from at least once to exactly-once delivery. It has also used a transactional mode that allows an application to send messages to multiple partitions, including a topic.
Example to Implement Kafka Console Producer
Below are the examples mentioned:
Example #1
Code:
C:\kafka_2.12-2.4.1\bin\windows>kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_Program
>my first Kafka
>itsawesome
>happy learning
If you haven’t received any error, it means it is producing the above messages successfully.
Example #2
Passing acks with valued parameter
Code:
C:\kafka_2.12-2.4.1\bin\windows>kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_Program --producer-property acks=all
>this is acked property message
>learning new property called acked
Different option available with Kafka console Producers
- –batch-size <Integer: size>:- We are defining the single batch for sending the Number of messages
- –broker-list <String: broker-list>: -This is required options for the Kafka-console- producer, the broker list string in the form HOST: PORT
- –compression-codec [String: compression-codec]:- This option is used to compress either ‘none’ or ‘gzip’.If specified without a value, then it defaults to ‘gzip’
- –Help: – It will display the usage information.
- –metadata-expiry-ms<Long: metadata expiration interval>:- The period in milliseconds after which we force a refresh of metadata even if we haven’t seen any leadership changes. (Default: 300000).
- –producer-property <String: producer_prop>:-This parameter is used to set user-defined properties as key=value pair to the producer.
- –producer.config<String: config file>:-This is the properties file containing all configuration related to the producer.
- –property <String: prop>:- This attribute provides the liberty to pass user-defined properties to the message reader. These properties allow custom configuration and defined in the form of key=value.
- –request-required-acks<String: request required acks>:- The required acks of the producer requests (default: 1)
- –request-timeout-ms<Integer: request timeout ms>:- The ack timeout of the producer Value must be non-negative and non-zero (default: 1500).
- –Sync: – If a set message sends, requests to the brokers are synchronous, one at a time as they arrive.
- –timeout <Integer: timeouts>:- If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms.
- –topic <String: topic>:- this option is required .basically, the topic id to produce messages to.
- –Version: -Display Kafka version.
Maven Kafka Dependencies
Maven Kafka Dependencies for the below programs:
Code:
<? xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka.example</groupId>
<artifactId>kafka-beginner</artifactId>
<version>1.0</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
Producer Code with JAVA
Code:
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.ProducerConfig;
importorg.apache.kafka.clients.producer.ProducerRecord;
importorg.apache.kafka.common.serialization.StringSerializer;
importjava.io.BufferedReader;
importjava.io.IOException;
importjava.io.InputStreamReader;
importjava.util.Properties;
public class MessageToProduce {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
String msg = null;
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producerprogramatically
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
System.out.print("Enter message to send to kafka broker : ");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
msg = reader.readLine();
} catch (IOException e) {
e.printStackTrace();
}
ProducerRecord<String, String> record =
newProducerRecord<String, String>("first_Program",msg);
producer.send(record);
producer.flush();
producer.close();
}
}
Output:
Recommended Articles
This is a guide to Kafka Console Producer. Here we discuss introducing Kafka Console Producer, How does it work, Examples, different options, and dependencies. You can also go through our other related articles to learn more –