Skip to content

Commit

Permalink
producer finished
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielZuany committed Nov 29, 2023
1 parent 806f1f6 commit d191be9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 29 deletions.
6 changes: 4 additions & 2 deletions kafka/create_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
print(str(e))
exit(1)

# delete topics
admin_client.delete_topics([TOPIC])

if TOPIC in admin_client.list_topics():
print("Topic already exists")
exit(0)

try:
topic_list = []
Expand Down
11 changes: 0 additions & 11 deletions kafka/producer.py

This file was deleted.

File renamed without changes.
32 changes: 16 additions & 16 deletions kafka/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ GREEN='\033[0;32m'
NC='\033[0m' # No Color
BLUE='\033[0;34m'

echo -e "${GREEN}|=======================|Downloading Kafka|=======================|${NC}"
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
# echo -e "${GREEN}|=======================|Downloading Kafka|=======================|${NC}"
# wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

echo -e "${GREEN}|=======================|Extracting Kafka|=======================|${NC}"
tar -xzf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 kafka
rm kafka_2.12-2.8.0.tgz
# echo -e "${GREEN}|=======================|Extracting Kafka|=======================|${NC}"
# tar -xzf kafka_2.12-2.8.0.tgz
# mv kafka_2.12-2.8.0 kafka
# rm kafka_2.12-2.8.0.tgz

echo -e "${GREEN}|=======================|Starting Zookeeper|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/zookeeper-server-start.sh kafka_2.12-2.8.0/config/zookeeper.properties
sleep 10
# echo -e "${GREEN}|=======================|Starting Zookeeper|=======================|${NC}"
# gnome-terminal -- kafka_2.12-2.8.0/bin/zookeeper-server-start.sh kafka_2.12-2.8.0/config/zookeeper.properties
# sleep 10

echo -e "${GREEN}|=======================|Starting Server|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
sleep 10
# echo -e "${GREEN}|=======================|Starting Server|=======================|${NC}"
# gnome-terminal -- kafka_2.12-2.8.0/bin/kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
# sleep 10

# echo -e "${GREEN}|=======================|Creating Topic|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
echo -e "${GREEN}|=======================|Creating Topic|=======================|${NC}"
python3 kafka/create_topic.py

# echo -e "${GREEN}|=======================|Starting Producer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
echo -e "${GREEN}|=======================|Starting Producer|=======================|${NC}"
gnome-terminal -- python3 kafka/write_in_queue.py

# echo -e "${GREEN}|=======================|Starting Consumer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Expand Down
21 changes: 21 additions & 0 deletions kafka/write_in_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from time import sleep
from kafka import KafkaProducer
import json
import pandas as pd

TOPIC = "SensorsDataStream"
INPUT_FILE = "data/full.csv"

df = pd.read_csv(INPUT_FILE)

producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for idx in range(len(df)):
print(df.iloc[idx].to_dict())
producer.send(TOPIC, df.iloc[idx].to_dict())
producer.flush()
sleep(1)

producer.close()

0 comments on commit d191be9

Please sign in to comment.