所以,我使用Kafka Stream和Twitter API来获取推文并将它们发送到MySQL,但它不起作用
def insert_tweet(tweet,username,pnr,prediction,tweet_id):
query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('"+tweet+"','"+username+"',"+str(pnr)+","+str(int(prediction))+","+str(tweet_id)+");"
try:
conn = MySQLdb.connect("localhost","root","","twitter" )
cursor = conn.cursor()
cursor.execute(query)
print("Database insertion SUCCESSFUL!!")
conn.commit()
except MySQLdb.Error as e:
print(e)
print("Database insertion unsuccessful!!")
finally:
conn.close()
def process_data(data):
print("Processing data ...")
if (not data.isEmpty()):
nbModel=bc_model.value
hashingTF = HashingTF(100000)
tf = hashingTF.transform(data.map(lambda x: x[0].encode('utf-8','ignore')))
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
tfidf.cache()
prediction=nbModel.predict(tfidf)
temp = []
i=0
for p,q,r in data.collect():
temp.append([])
temp[i].append(p.encode('utf-8','ignore'))
temp[i].append(q)
temp[i].append(r)
i+=1
i=0
for p in prediction.collect():
temp[i].append(p)
i+=1
print(temp)
for i in temp:
insert_tweet(str(i[0]),str(i[1]),"0",int(i[3]),int(i[2]))
else:
print("Empty RDD !!!")
pass
twitter=tweets.map(lambda tweet: tweet['data']['id'])
tweet_text = tweets.map(lambda tweet: tweet['data']['text'])
txt = tweets.map(lambda x: (x['data']['text'], x['data']['id'], x['data']['id']))
txt.foreachRDD(process_data)
我试图在MySQL中手动运行查询,但它起作用了,所以我不知道有什么问题. 我使用的是XAMPP 3.3.0版 This is my table个