Task 1. Set up Kafka
In the Cloud Console, open the Navigation menu and click Marketplace.
Locate the Apache Kafka® deployment by searching for Apache Kafka.
Click on Apache Kafka Server on Ubuntu Server 20.04. It should look like this:
While you're waiting for deployment, you can check out this quick start which shows how to run the WordCount demo application that is included in Kafka.
Here's the gist of the code, converted to use Java 8 lambda expressions so that it is easier to read (taken from the variant WordCountLambdaExample):
Start the Kafka environment
In the SSH window, you will run the following commands to start all services in the correct order.
Run the following command to start the ZooKeeper service:
cd /opt/kafka/
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
Open another SSH session
Start the Kafka broker service
Run the following command to first change your current path to the Kafka installation directory and start the Kafka broker service:
cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
Note: The Kafka application is now configured to use the connector.
Open another SSH session.
Task 2. Prepare the topics and the input data
You will now send some input data to a Kafka topic, which will be subsequently processed by a Kafka Streams application.
First change your current path to the Kafka installation directory:
cd /opt/kafka/
Now you'll need to create the input topic streams-plaintext-input.
In the same SSH window, execute the following command:
sudo bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Next, create the output topic streams-wordcount-output:
sudo bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output
Task 3. Process the input data with Kafka streams
Now that you have generated some input data, you can run your first Kafka Streams based Java application.
You will run the WordCount demo application, which is included in Kafka. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text.
However, unlike other WordCount examples you might have seen before that operate on finite, bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of input data.
Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.
This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once you inspect the actual output data later on.
Kafka's WordCount demo application is bundled with Confluent Platform, which means you can run it without further ado, i.e. you do not need to compile any Java sources and so on.
Now, execute the following command to run the WordCount demo application. You can safely ignore any warn log messages:
ketan_patel@kafka-ubuntu-1-vm:~$ cd /opt/kafka
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-topics.sh --create \
> --bootstrap-server localhost:9092 \
> --replication-factor 1 \
> --partitions 1 \
> --topic streams-plaintext-input
Created topic streams-plaintext-input.
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-topics.sh --create \
> --bootstrap-server localhost:9092 \
> --replication-factor 1 \
> --partitions 1 \
> --topic streams-wordcount-output
Created topic streams-wordcount-output.
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ tail /tmp/file-input.txt
all streams lead to kafka
hello kafka streams
join kafka summit
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ cat /tmp/file-input.txt | sudo bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
[2023-12-12 21:06:06,959] WARN Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [/tmp/kafka-streams] (org.apache.kafka.streams.processor.internals.StateDirectory)
[2023-12-12 21:06:07,287] WARN Error while loading kafka-streams-version.properties (org.apache.kafka.streams.internals.metrics.ClientMetrics)
java.lang.NullPointerException: inStream parameter is null
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at java.base/java.util.Properties.load(Properties.java:406)
at org.apache.kafka.streams.internals.metrics.ClientMetrics.<clinit>(ClientMetrics.java:53)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:894)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:856)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:826)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:738)
at org.apache.kafka.streams.examples.wordcount.WordCountDemo.main(WordCountDemo.java:92)
[2023-12-12 21:06:07,774] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 2 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:07,882] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 4 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:07,985] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 7 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,088] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 10 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,191] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 13 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,293] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 16 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,396] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 18 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,497] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 20 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,600] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 25 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
^Cketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
Task 4. Inspect the output data
You can now inspect the output of the WordCount demo application by reading from its output topic streams-wordcount-output:
ANOTHER SSH WINDOW:
ketan_patel@kafka-ubuntu-1-vm:~$ cd /opt/kafka
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
^C^C
Processed a total of 11 messages
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$
No comments:
Post a Comment