Apache Kafka Project: Real-Time Twitter Streaming with Python

·

6 min read

Cover Image for Apache Kafka Project: Real-Time Twitter Streaming with Python

💡 Introduction

Welcome to another blog in the Python-for-DevOps series! Today, we’ll explore Apache Kafka and how it can be used to stream tweets between two services.

For those unfamiliar with Kafka, it is an open-source streaming platform that enables high-performance and efficient communication between services—whether in a client-server setup or a microservices architecture.

In this project, we’ll create:
✅ A producer.py script to fetch tweets and send them to Kafka.
✅ A consumer.py script to receive and process the streamed tweets.
✅ A Dockerized setup to run Kafka, Zookeeper, and Elasticsearch for real-time tweet visualization.

Enough with the theory—let’s dive straight into the implementation! 🚀


💡 Pre-Requisities

Before we dive into the implementation, make sure you have the following requirements in place:

🔹 Docker Installed – We’ll use Docker to run Apache Kafka, Zookeeper, and Elasticsearch.
🔹 Basic Knowledge of Docker, Python, and Kafka – A foundational understanding will help you follow along.
🔹 Overview of Elasticsearch & Kibana – These tools will help us store and visualize tweet data in real time.
🔹 Twitter Developer Portal Account – Required to generate a Bearer Token for accessing the Twitter API.

If you’re new to any of these, consider brushing up on them before proceeding. Now, let’s set up our environment! 🚀

Make sure that your Python version is <= 3.10, because Kafka module doesn’t run fine with Python 3.11 or greater.


💡 Project Setup & Dependencies

The complete project files are hosted on my GitHub repository:
🔗 GitHub Repo: twitter-streams

Clone the repository and navigate inside the project directory:

git clone https://github.com/Pravesh-Sudha/twitter-streams.git
cd twitter-streams

1️⃣ Setting Up a Virtual Environment

To ensure module isolation, create a virtual environment named kafka-env using:

python3 -m venv kafka-env
source kafka-env/bin/activate  # On Windows, use kafka-env\Scripts\activate

2️⃣ Installing Required Dependencies

Use pip to install the necessary Python modules:

pip3 install kafka-python tweepy six

3️⃣ Running Kafka, Zookeeper, Elasticsearch, and Kibana with Docker

Before running the project, we need to start the required services using Docker.

Start Zookeeper

docker run -d --name zookeeper -p 2181:2181 zookeeper

Zookeeper manages and coordinates Kafka brokers.

Start Kafka

docker run -d --name kafka \
  --link zookeeper \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka

Kafka will act as our message broker, streaming Twitter data.

Start Elasticsearch

docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.17.10

Elasticsearch will store the streamed tweet data.

Start Kibana

docker run -d --name kibana --link elasticsearch -p 5601:5601 kibana:7.17.10

Kibana will help visualize the tweets stored in Elasticsearch.


💡 Creating a Kafka Topic & Implementing Producer-Consumer

1️⃣ Creating a Kafka Topic

Before running our producer script, we need to create a Kafka topic named twitter-stream. Run the following command:

docker exec -it kafka kafka-topics --create --topic twitter-stream --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

This will create a topic where tweets will be streamed from the producer to the consumer.

After the topic is created, ensure all four containers (Zookeeper, Kafka, Elasticsearch, and Kibana) are running:

docker ps


2️⃣ Implementing Kafka Producer (producer.py)

Now, let’s take a look at our Kafka Producer script:

import tweepy
from kafka import KafkaProducer
import json
import time

# Twitter API credentials
BEARER_TOKEN = "Your-Bearer-Token"

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda x: json.dumps(x).encode("utf-8")
)

# Authenticate Twitter API
client = tweepy.Client(bearer_token=BEARER_TOKEN)

def fetch_tweets():
    query = "#AI OR #Tech -is:retweet"  # Search for tweets with these hashtags, excluding retweets
    tweets = client.search_recent_tweets(query=query, tweet_fields=["created_at", "text", "id"], max_results=10)

    if tweets.data:
        for tweet in tweets.data:
            tweet_data = {
                "id": tweet.id,
                "text": tweet.text,
                "timestamp": str(tweet.created_at)
            }
            producer.send("twitter-stream", value=tweet_data)  # Send to Kafka
            print(f"Tweet sent: {tweet_data}")

# Run every 30 seconds to fetch new tweets
while True:
    fetch_tweets()
    time.sleep(30)  # Wait before fetching again

How It Works?

✅ Imports the Kafka Producer and Tweepy for Twitter API authentication.
✅ Uses a Twitter API bearer token for authentication.
✅ Defines a query for tweets containing #AI or #Tech (excluding retweets).
✅ Fetches tweets every 30 seconds and sends them to Kafka under the twitter-stream topic.

Now, run the producer.py script to start streaming tweets into Kafka:

python3 producer.py


3️⃣ Implementing Kafka Consumer (consumer.py)

Now, let’s build the Kafka Consumer that will process the tweets and store them in Elasticsearch:

from kafka import KafkaConsumer
import json
import requests

# Elasticsearch endpoint
ELASTICSEARCH_URL = "http://localhost:9200/twitter/_doc/"

# Kafka Consumer
consumer = KafkaConsumer(
    "twitter-stream",
    bootstrap_servers="localhost:9092",
    auto_offset_reset="earliest",
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

print("Listening for tweets...")

for message in consumer:
    tweet = message.value
    print(f"Received tweet: {tweet}")

    # Send tweet to Elasticsearch
    response = requests.post(ELASTICSEARCH_URL, json=tweet)
    print(f"Elasticsearch response: {response.json()}")

How It Works?

✅ Connects to Kafka and listens to the twitter-stream topic.
✅ Reads tweets and prints them to the console.
✅ Sends each tweet to Elasticsearch for storage and further visualization in Kibana.

Run the consumer.py script to start consuming tweets:

python3 consumer.py


4️⃣ Verifying Data in Elasticsearch

To check if tweets are successfully stored in Elasticsearch, run:

curl -X GET "http://localhost:9200/twitter/_search?pretty=true"

If everything is working correctly, you should see the stored tweets in JSON format. 🎉


💡 Visualizing Tweets in Kibana

Now that we have streamed tweets into Elasticsearch, let's visualize them in Kibana.

1️⃣ Access Kibana

Open your browser and navigate to:

🔗 http://localhost:5601

2️⃣ Create an Index Pattern

  1. Click on "Management""Stack Management"

  2. Under "Data", select "Index Patterns"

  3. Click "Create Index Pattern"

  4. Set the Index Pattern Name as twitter

  5. Select "timestamp" as the time field (if required)

  6. Click "Create Index Pattern"

3️⃣ Explore Tweets in Kibana

Now, navigate to:
Discover – To browse and filter tweets.
Visualize – To create charts, graphs, and dashboards for deeper insights.


💡 Conclusion

In this blog, we built a real-time Twitter data streaming pipeline using Apache Kafka, Python, Elasticsearch, and Kibana. Here's a quick recap of what we achieved:

✅ Set up Kafka, Zookeeper, Elasticsearch, and Kibana using Docker.
✅ Created a Kafka topic (twitter-stream) to stream tweets.
✅ Built a Kafka Producer (producer.py) to fetch tweets from the Twitter API and send them to Kafka.
✅ Developed a Kafka Consumer (consumer.py) to retrieve tweets from Kafka and store them in Elasticsearch.
✅ Used Kibana to visualize and explore the streamed tweets.

This project demonstrates how Apache Kafka can be leveraged for real-time data streaming and analytics. You can extend it further by:
🚀 Adding Sentiment Analysis to categorize tweets as positive, negative, or neutral.
🚀 Deploying the solution on cloud platforms like AWS, GCP, or Azure.
🚀 Scaling Kafka to handle high-volume Twitter data streams.

I hope you found this project helpful! Feel free to check out the GitHub repo and experiment with enhancements. Happy coding! 🚀

✨ For more informative blog, Follow me on Hashnode, X(Twitter) and LinkedIn.