我有以下 python 脚本,可以从 twitter 中提取推文并将其发送到 kafka 主题。该脚本运行完美,但是当我尝试在 docker 容器内运行它时,它无法导入 kafka 库。它说“语法错误:语法无效”。
以下是 python 脚本(twitter_app.py)的内容:
import socket
import sys
import requests
import requests_oauthlib
import json
import kafka
from kafka import KafkaProducer
import time
from kafka import SimpleProducer
from kafka import KafkaClient
###################################################
# My own twitter access tokens
####################################################
ACCESS_TOKEN = '28778811-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
ACCESS_SECRET = 'HBGjTXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
CONSUMER_KEY = '#################################'
CONSUMER_SECRET = '############################################'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
####################################################
# Kafka Producer
####################################################
twitter_topic="twitter_topic"
client = KafkaClient("10.142.0.2:9092")
producer = SimpleProducer(client)
#producer = kafka.KafkaProducer(bootstrap_servers='10.128.0.2:9092')
def get_tweets():
print("#########################get_tweets called################################")
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
#query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track' ,'#')]
#query_data = [('language', 'en'), ('locations', '-3.7834,40.3735,-3.6233,40.4702'),('track','#')]
query_data = [('language', 'en'), ('locations', '-3.7834,40.3735,-3.6233,40.4702'),('track','Madrid')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
#print("Query url is", query_url)
response = requests.get(query_url, auth=my_auth, stream=True)
print(query_url, response)
return response
def send_tweets_to_kafka(http_resp):
print("########################send_tweets_to_kafka called#################################")
for line in http_resp.iter_lines():
print("reading tweets")
try:
full_tweet = json.loads(line)
tweet_text = full_tweet['text']
print("Tweet Text: " + tweet_text)
print ("------------------------------------------")
tweet_text = tweet_text + '\n'
producer.send_messages(twitter_topic, tweet_text.encode())
#producer.send(twitter_topic, tweet_text.encode())
time.sleep(0.2)
except:
print("Error received")
e = sys.exc_info()[0]
print("Error: %s" % e)
print("Done reading tweets")
##############
# Actual Execution starts here
###############
resp = get_tweets()
send_tweets_to_kafka(resp)
但是,现在我尝试在 docker 容器内运行此脚本,但它失败了,并且出现以下错误:
Traceback (most recent call last):
File "twitter_app.py", line 6, in <module>
import kafka
File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async
^
SyntaxError: invalid syntax
以下是 Dockerfile 的内容供您参考(请注意,当我使用相同的 Dockerfile 和一个不使用 kafka 的简单脚本时,它工作得很好):
FROM python:3
MAINTAINER kamal.nandan@<myemailservice>
RUN apt-get update
RUN apt-get install -y python3
RUN pip install requests
RUN pip install requests_oauthlib
RUN pip install kafka
ADD twitter_app.py /
CMD python3 twitter_app.py
这几天我一直在与它斗争,但我一直无法弄清楚问题所在。任何帮助将非常感激。提前致谢。