所以,我使用Kafka Stream和Twitter API来获取推文并将它们发送到MySQL,但它不起作用

def insert_tweet(tweet,username,pnr,prediction,tweet_id):
   query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('"+tweet+"','"+username+"',"+str(pnr)+","+str(int(prediction))+","+str(tweet_id)+");"
try:
    conn = MySQLdb.connect("localhost","root","","twitter" )
    cursor = conn.cursor()
    cursor.execute(query)
    print("Database insertion SUCCESSFUL!!")
    conn.commit()
except MySQLdb.Error as e:
    print(e)
    print("Database insertion unsuccessful!!")
finally:
    conn.close()


def process_data(data):

        print("Processing data ...")        

        if (not data.isEmpty()):
            nbModel=bc_model.value
            hashingTF = HashingTF(100000)
            tf = hashingTF.transform(data.map(lambda x: x[0].encode('utf-8','ignore')))
            tf.cache()
            idf = IDF(minDocFreq=2).fit(tf)
            tfidf = idf.transform(tf)
            tfidf.cache()
            prediction=nbModel.predict(tfidf)

            temp = []
            i=0
            for p,q,r in data.collect():
                temp.append([])
                temp[i].append(p.encode('utf-8','ignore'))
                temp[i].append(q)
                temp[i].append(r)
                i+=1
            i=0
            for p in prediction.collect():
                temp[i].append(p)
                i+=1        

            print(temp)
            for i in temp:
                insert_tweet(str(i[0]),str(i[1]),"0",int(i[3]),int(i[2]))
        else:
            print("Empty RDD !!!")        
            pass

twitter=tweets.map(lambda tweet: tweet['data']['id'])
tweet_text = tweets.map(lambda tweet: tweet['data']['text'])

txt = tweets.map(lambda x: (x['data']['text'], x['data']['id'], x['data']['id']))
txt.foreachRDD(process_data)

我试图在MySQL中手动运行查询,但它起作用了,所以我不知道有什么问题. 我使用的是XAMPP 3.3.0版 This is my table

推荐答案

在您的代码中,如果您的tweet包含文字'字符,则会导致您的SQL格式化方式出现问题.

query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('"+tweet+"'

如果tweet的值为:

He said: 'Sir, I am travelling from Rewa to Bhopal...

然后,生成的SQL将如下所示.注意,多出来的'个字符在MySQL中显示为字符串的末尾!

INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('He said: 'Sir, I am travelling from Rewa to Bhopal...
                                                                             ^

这混淆了SQL语法.MySQL不明白为什么引号字符串的末尾跟着"先生,我在旅行……"

如果您使用的是Python字符串格式,它不会有任何帮助.

query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('%s','%s',%s,%s,%s);" % (tweet,username,str(pnr),str(int(prediction)))

仍然有'个字符不匹配,并且在运行查询时会导致SQL语法错误.

查询参数是解决此问题的最佳解决方案.在Python连接器中,它也使用%s占位符,但您不会将字符串值格式化为字符串.此外,不要在SQL查询中将它们放在引号中.

query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES (%s,%s,%s,%s,%s)"
cursor.execute(query, (tweet, username, str(pnr), str(int(prediction)), str(tweet_id)))

%s个占位符保留在query字符串中,您将第二个参数传递给execute()-包含用作参数值的值列表.Python连接器将以一种安全的方式组合这些值,因此如果它们包含'或任何其他特殊字符,它们不会扰乱语法.

下面的代码示例显示了这一点:https://dev.mysql.com/doc/connector-python/en/connector-python-example-cursor-transaction.html

另外,为了便于将来参考,请记住,一个查询参数只能用于一个标量SQL值(例如,在其他情况下,您将使用带引号的字符串文字或数字文字).不能将参数用于SQL标识符,如表名或列名、SQL关键字或表达式或值列表.

Python相关问答推荐

模型序列化器中未调用现场验证器

如何根据情况丢弃大Pandas 的前n行,使大Pandas 的其余部分完好无损

有条件地采样我的大型DF的最有效方法

计算所有前面行(当前行)中列的值

根据给定日期的状态过滤查询集

当密钥是复合且唯一时,Pandas合并抱怨标签不唯一

返回nxon矩阵的diag元素,而不使用for循环

根据不同列的值在收件箱中移动数据

韦尔福德方差与Numpy方差不同

基于索引值的Pandas DataFrame条件填充

使用密钥字典重新配置嵌套字典密钥名

导入...从...混乱

索引到 torch 张量,沿轴具有可变长度索引

基于行条件计算(pandas)

基于多个数组的多个条件将值添加到numpy数组

Pandas:填充行并删除重复项,但保留不同的值

在numpy数组中寻找楼梯状 struct

Python日志(log)库如何有效地获取lineno和funcName?

Python OPCUA,modbus通信代码运行3小时后出现RuntimeError

对数据帧进行分组,并按组间等概率抽样n行