Skip to content

Commit

Permalink
finish consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielZuany committed Nov 29, 2023
1 parent 2522ccd commit 023023f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
10 changes: 8 additions & 2 deletions kafka/read_from_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
print(str(e))
exit(1)


try:
consumer = KafkaConsumer(TOPIC,
group_id=None,
Expand Down Expand Up @@ -70,8 +69,15 @@
values_str = values_str[:-1]
values_str = values_str.replace("nan", "NULL")

# check if exists
cursor.execute(f"SELECT EXISTS(SELECT * FROM {TABLE_NAME} WHERE timestamp='{values[0]}')")
if cursor.fetchone()[0] is not None:
print("Data already exists")
continue

cursor.execute(f"INSERT INTO {TABLE_NAME}({columns}) VALUES ({values_str})", values)
conn.commit()
sleep(1)


cursor.close()
conn.close()
28 changes: 12 additions & 16 deletions kafka/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ GREEN='\033[0;32m'
NC='\033[0m' # No Color
BLUE='\033[0;34m'

# echo -e "${GREEN}|=======================|Downloading Kafka|=======================|${NC}"
# wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
echo -e "${GREEN}|=======================|Downloading Kafka|=======================|${NC}"
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

# echo -e "${GREEN}|=======================|Extracting Kafka|=======================|${NC}"
# tar -xzf kafka_2.12-2.8.0.tgz
# mv kafka_2.12-2.8.0 kafka
# rm kafka_2.12-2.8.0.tgz
echo -e "${GREEN}|=======================|Extracting Kafka|=======================|${NC}"
tar -xzf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 kafka
rm kafka_2.12-2.8.0.tgz

# echo -e "${GREEN}|=======================|Starting Zookeeper|=======================|${NC}"
# gnome-terminal -- kafka_2.12-2.8.0/bin/zookeeper-server-start.sh kafka_2.12-2.8.0/config/zookeeper.properties
# sleep 10
echo -e "${GREEN}|=======================|Starting Zookeeper|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/zookeeper-server-start.sh kafka_2.12-2.8.0/config/zookeeper.properties
sleep 10

# echo -e "${GREEN}|=======================|Starting Server|=======================|${NC}"
# gnome-terminal -- kafka_2.12-2.8.0/bin/kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
# sleep 10
echo -e "${GREEN}|=======================|Starting Server|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
sleep 10

echo -e "${GREEN}|=======================|Creating Topic|=======================|${NC}"
python3 kafka/create_topic.py
Expand All @@ -26,7 +26,3 @@ gnome-terminal -- python3 kafka/write_in_queue.py

echo -e "${GREEN}|=======================|Starting Consumer|=======================|${NC}"
gnome-terminal -- python3 kafka/read_from_queue.py

# echo -e "${GREEN}|=======================|Starting Consumer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

0 comments on commit 023023f

Please sign in to comment.