kafka

$ sudo apt update 
[sudo] password for kafka: 

kafka@kafkaubuntu2004:~$ sudo apt install default-jdk
Reading package lists... Done
Building dependency tree   


kafka@kafkaubuntu2004:~$ java --version
openjdk 11.0.21 2023-10-17
OpenJDK Runtime Environment (build 11.0.21+9-post-Ubuntu-0ubuntu120.04)
OpenJDK 64-Bit Server VM (build 11.0.21+9-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
kafka@kafkaubuntu2004:~$ mkdir ~/kafka && cd ~/kafka
kafka@kafkaubuntu2004:~/kafka$ tar -xvzf ~/Downloads/kafka.tgz --strip 1
kafka_2.12-3.4.1/LICENSE
kafka_2.12-3.4.1/NOTICE



kafka@kafkaubuntu2004:~/kafka$ vi ~/kafka/config/server.properties
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ pwd
/home/kafka/kafka
kafka@kafkaubuntu2004:~/kafka$ sudo vi /etc/systemd/system/zookeeper.service
kafka@kafkaubuntu2004:~/kafka$ sudo nano /etc/systemd/system/kafka.service
kafka@kafkaubuntu2004:~/kafka$ sudo vi /etc/systemd/system/kafka.service
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ sudo systemctl start kafka
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ sudo systemctl status kafka
● kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
     Active: active (running) since Wed 2023-12-13 00:04:33 UTC; 8s ago
   Main PID: 17310 (sh)
      Tasks: 72 (limit: 19173)
     Memory: 341.5M
     CGroup: /system.slice/kafka.service
             ├─17310 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.>
             └─17311 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccup>

Dec 13 00:04:33 kafkaubuntu2004 systemd[1]: Started kafka.service.
...skipping...
● kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
     Active: active (running) since Wed 2023-12-13 00:04:33 UTC; 8s ago
   Main PID: 17310 (sh)
      Tasks: 72 (limit: 19173)
     Memory: 341.5M
     CGroup: /system.slice/kafka.service
             ├─17310 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.>
             └─17311 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccup>

Dec 13 00:04:33 kafkaubuntu2004 systemd[1]: Started kafka.service.
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~

kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ sudo systemctl enable zookeeper
Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ sudo systemctl enable kafka
Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
kafka@kafkaubuntu2004:~/kafka$ cat ~/kafka/bin/kafka-topics.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#    http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
kafka@kafkaubuntu2004:~/kafka$ ./kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
-bash: ./kafka-topics.sh: No such file or directory
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
Created topic test-topic.
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --topic TutorialTopi^Cbootstrap-server localhost:9092 --replication-factor 1 --partitions 4
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Tut^C
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create  --replication-factor 1 --partitions 1 --topic TutorialTopic
Exception in thread "main" java.lang.IllegalArgumentException: --bootstrap-server must be specified
        at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:619)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:48)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create  --topic test-topic ^Creplication-factor 1 --partitions 1 --topic TutorialTopic
kafka@kafkaubuntu2004:~/kafka$ 
kafka@kafkaubuntu2004:~/kafka$ ./kafka-topics.sh --create --topic TutorialTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
-bash: ./kafka-topics.sh: No such file or directory
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-topics.sh --create --topic TutorialTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Created topic TutorialTopic.
kafka@kafkaubuntu2004:~/kafka$ echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
kafka@kafkaubuntu2004:~/kafka$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
Hello, World
Hello World from Sammy at DigitalOcean!







kafka@kafkaubuntu2004:~$ history
    1  mkdir ~/Downloads
    2  curl "https://downloads.apache.org/kafka/3.4.1/kafka_2.12-3.4.1.tgz" -o ~/Downloads/kafka.tgz
    3  sudo apt update 
    4  java --version
    5  sudo apt install default-jdk
    6  java --version
    7  mkdir ~/kafka && cd ~/kafka
    8  tar -xvzf ~/Downloads/kafka.tgz --strip 1
    9  vi ~/kafka/config/server.properties
   10  pwd
   11  sudo vi /etc/systemd/system/zookeeper.service
   12  sudo nano /etc/systemd/system/kafka.service
   13  sudo vi /etc/systemd/system/kafka.service
   14  sudo systemctl start kafka
   15  sudo systemctl status kafka
   16  sudo systemctl enable zookeeper
   17  sudo systemctl enable kafka
   18  ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
   19  cat ~/kafka/bin/kafka-topics.sh
   20  ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
   21  ./kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
   22  ~/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
   23  ~/kafka/bin/kafka-topics.sh --create  --replication-factor 1 --partitions 1 --topic TutorialTopic
   24  ./kafka-topics.sh --create --topic TutorialTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
   25  ~/kafka/bin/kafka-topics.sh --create --topic TutorialTopic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
   26  echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
   27  ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
   28  history

kafka@kafkaubuntu2004:~$ kafkat

kafkat 0.3.0: Simplified command-line administration for Kafka brokers
usage: kafkat [command] [options]

Here's a list of supported commands:

  brokers                                                             Print available brokers from Zookeeper.
  clean-indexes                                                       Delete untruncated Kafka log indexes from the filesystem.
  cluster-restart help                                                Determine the server restart sequence for kafka
  controller                                                          Print the current controller.
  drain <broker id> [--topic <t>] [--brokers <ids>]                   Reassign partitions from a specific broker to destination brokers.
  elect-leaders [topic]                                               Begin election of the preferred leaders.
  partitions [topic]                                                  Print partitions by topic.
  partitions [topic] --under-replicated                               Print partitions by topic (only under-replicated).
  partitions [topic] --unavailable                                    Print partitions by topic (only unavailable).
  reassign [topics] [--brokers <ids>] [--replicas <n>]                Begin reassignment of partitions.
  resign-rewrite <broker id>                                          Forcibly rewrite leaderships to exclude a broker.
  resign-rewrite <broker id> --force                                  Same as above but proceed if there are no available ISRs.
  set-replication-factor [topic] [--newrf <n>] [--brokers id[,id]]    Set the replication factor of
  shutdown <broker id>                                                Gracefully remove leaderships from a broker (requires JMX).
  topics                                                              Print all topics.
  verify-replicas  [--topics] [--broker <id>] [--print-details] [--print-summary]Check if all partitions in a topic have same number of replicas.


kafka@kafkaubuntu2004:~$ kafkat topics


TutorialTopic
test-topic
__consumer_offsets


kafka@kafkaubuntu2004:~$ kafkat brokers

Broker          Socket
0               kafkaubuntu2004.us-west1-a.c.timebase-ts.internal:9092
kafka@kafkaubuntu2004:~$ 

kafka@kafkaubuntu2004:~$ kafkat controller

The current controller is '0' (kafkaubuntu2004.us-west1-a.c.timebase-ts.internal:9092).

kafka@kafkaubuntu2004:~$ kafkat partitions TutorialTopic

Topic           Partition       Leader          Replicas                                                ISRs
TutorialTopic   0               0               [0]                                                     [0]

kafka@kafkaubuntu2004:~$ kafkat partitions test-topic


Topic           Partition       Leader          Replicas                                                ISRs
test-topic      0               0               [0]                                                     [0]
test-topic      1               0               [0]                                                     [0]
test-topic      2               0               [0]                                                     [0]
test-topic      3               0               [0]                                                     [0]

kafka@kafkaubuntu2004:~$ 










Apache Kafka Server on Ubuntu 20.04

Task 1. Set up Kafka


In the Cloud Console, open the Navigation menu and click Marketplace.

Locate the Apache Kafka® deployment by searching for Apache Kafka.

Click on Apache Kafka Server on Ubuntu Server 20.04. It should look like this:










While you're waiting for deployment, you can check out this quick start which shows how to run the WordCount demo application that is included in Kafka.

Here's the gist of the code, converted to use Java 8 lambda expressions so that it is easier to read (taken from the variant WordCountLambdaExample):

Start the Kafka environment


In the SSH window, you will run the following commands to start all services in the correct order.


Run the following command to start the ZooKeeper service:

cd /opt/kafka/

sudo bin/zookeeper-server-start.sh config/zookeeper.properties

Open another SSH session

Start the Kafka broker service

Run the following command to first change your current path to the Kafka installation directory and start the Kafka broker service:


cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

Note: The Kafka application is now configured to use the connector. 

Open another SSH session.

Task 2. Prepare the topics and the input data

You will now send some input data to a Kafka topic, which will be subsequently processed by a Kafka Streams application.

First change your current path to the Kafka installation directory:

cd /opt/kafka/

Now you'll need to create the input topic streams-plaintext-input.


In the same SSH window, execute the following command:


sudo bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input

Next, create the output topic streams-wordcount-output:


sudo bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output

Task 3. Process the input data with Kafka streams


Now that you have generated some input data, you can run your first Kafka Streams based Java application.

You will run the WordCount demo application, which is included in Kafka. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text.

However, unlike other WordCount examples you might have seen before that operate on finite, bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of input data.

Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.

This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once you inspect the actual output data later on.

Kafka's WordCount demo application is bundled with Confluent Platform, which means you can run it without further ado, i.e. you do not need to compile any Java sources and so on.

Now, execute the following command to run the WordCount demo application. You can safely ignore any warn log messages:


ketan_patel@kafka-ubuntu-1-vm:~$ cd /opt/kafka
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-topics.sh --create \
>     --bootstrap-server localhost:9092 \
>     --replication-factor 1 \
>     --partitions 1 \
>     --topic streams-plaintext-input
Created topic streams-plaintext-input.
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-topics.sh --create \
>     --bootstrap-server localhost:9092 \
>     --replication-factor 1 \
>     --partitions 1 \
>     --topic streams-wordcount-output
Created topic streams-wordcount-output.
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ tail /tmp/file-input.txt 
all streams lead to kafka
hello kafka streams
join kafka summit
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ cat /tmp/file-input.txt | sudo bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
[2023-12-12 21:06:06,959] WARN Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [/tmp/kafka-streams] (org.apache.kafka.streams.processor.internals.StateDirectory)
[2023-12-12 21:06:07,287] WARN Error while loading kafka-streams-version.properties (org.apache.kafka.streams.internals.metrics.ClientMetrics)
java.lang.NullPointerException: inStream parameter is null
        at java.base/java.util.Objects.requireNonNull(Objects.java:246)
        at java.base/java.util.Properties.load(Properties.java:406)
        at org.apache.kafka.streams.internals.metrics.ClientMetrics.<clinit>(ClientMetrics.java:53)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:894)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:856)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:826)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:738)
        at org.apache.kafka.streams.examples.wordcount.WordCountDemo.main(WordCountDemo.java:92)
[2023-12-12 21:06:07,774] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 2 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:07,882] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 4 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:07,985] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 7 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,088] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 10 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,191] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 13 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,293] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 16 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,396] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 18 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,497] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 20 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2023-12-12 21:06:08,600] WARN [Consumer clientId=streams-wordcount-49801b8a-a532-40e8-85c1-9dd57f442a40-StreamThread-1-consumer, groupId=streams-wordcount] Error while fetching metadata with correlation id 25 : {streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)


^Cketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 


Task 4. Inspect the output data


You can now inspect the output of the WordCount demo application by reading from its output topic streams-wordcount-output:



ANOTHER SSH WINDOW:

ketan_patel@kafka-ubuntu-1-vm:~$ cd /opt/kafka
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
>     --topic streams-wordcount-output \
>     --from-beginning \
>     --formatter kafka.tools.DefaultMessageFormatter \
>     --property print.key=true \
>     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1



^C^C
Processed a total of 11 messages
ketan_patel@kafka-ubuntu-1-vm:/opt/kafka$ 


AppEngine - Python

tudent_04_347b5286260a@cloudshell:~/python-docs-samples/appengine/standard_python3/hello_world (qwiklabs-gcp-00-88834e0beca1)$ sudo apt upda...