Skip to content

Commit

Permalink
add files
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielZuany committed Nov 29, 2023
1 parent fb04820 commit 806f1f6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
10 changes: 10 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from kafka import KafkaConsumer
consumer = KafkaConsumer('bankbranch',
group_id=None,
bootstrap_servers=['localhost:29092'],
auto_offset_reset = 'earliest')
print("Hello")
print(consumer)

for msg in consumer:
print(msg.value.decode("utf-8"))
24 changes: 24 additions & 0 deletions kafka/create_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from kafka.admin import KafkaAdminClient,NewTopic

TOPIC="SensorsDataStream"

try:
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='IndustriALL')
except Exception as e:
print("Exception while connecting Kafka")
print(str(e))
exit(1)

# delete topics
admin_client.delete_topics([TOPIC])

try:
topic_list = []
new_topic = NewTopic(name=TOPIC, num_partitions= 2, replication_factor=1)
topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)
print("Topic created successfully")
except Exception as e:
print("Exception while creating topic")
print(str(e))
exit(1)
11 changes: 11 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:29092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send("bankbranch", {'atmid':1, 'transid':4545})
producer.send("bankbranch", {'atmid':2, 'transid':2525})

producer.flush()

producer.close()
32 changes: 32 additions & 0 deletions kafka/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
GREEN='\033[0;32m'
NC='\033[0m' # No Color
BLUE='\033[0;34m'

echo -e "${GREEN}|=======================|Downloading Kafka|=======================|${NC}"
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

echo -e "${GREEN}|=======================|Extracting Kafka|=======================|${NC}"
tar -xzf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 kafka
rm kafka_2.12-2.8.0.tgz

echo -e "${GREEN}|=======================|Starting Zookeeper|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/zookeeper-server-start.sh kafka_2.12-2.8.0/config/zookeeper.properties
sleep 10

echo -e "${GREEN}|=======================|Starting Server|=======================|${NC}"
gnome-terminal -- kafka_2.12-2.8.0/bin/kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
sleep 10

# echo -e "${GREEN}|=======================|Creating Topic|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

# echo -e "${GREEN}|=======================|Starting Producer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

# echo -e "${GREEN}|=======================|Starting Consumer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# echo -e "${GREEN}|=======================|Starting Consumer|=======================|${NC}"
# ./kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

0 comments on commit 806f1f6

Please sign in to comment.