我使用Jupyter notebook with Pyspark和以下docker image:Jupyter all-spark-notebook

现在我想写一篇pyspark streaming application which consumes messages from Kafka.在Spark-Kafka Integration guide篇文章中,他们描述了如何使用spark submit部署这样的应用程序(它需要链接一个外部jar——解释见3. Deploying).但由于我使用的是Jupyter notebook,我从来没有实际运行spark-submit命令,我假设如果我按execute,它会在后面运行.

spark-submit命令中,您可以指定一些参数,其中一个是-jars,但我不清楚如何从笔记本(或外部通过环境变量)设置此参数.我假设我可以通过SparkConfSparkContext对象动态链接这个外部jar.有没有人有过如何从笔记本上正确链接的经验?

推荐答案

我已经设法让它在jupyter笔记本中工作,该笔记本从all spark容器中运行.

我在jupyterhub中启动了一个python3笔记本,并覆盖PYSPARK_SUBMIT_ARGS标志,如下所示.卡夫卡消费者图书馆是从maven存储库下载的,并放在我的主目录/home/jovyan中:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 
  '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'

import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)

broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"],
                        {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

Note:别忘了环境变量中的pysparkshell !

Extension:如果希望包含spark软件包中的代码,可以使用--packages标志.有关如何在all spark笔记本中执行此操作的示例,请参见here

Python-3.x相关问答推荐

我在创建Pandas DataFrame时感到困惑

如何有效地计算Kernel/Matrix

如何通过Pandas为不同的列集垂直设置列数据?

当索引大于一个整数而小于前一个索引时,我如何返回列值?

CSV-DAT 转换时将引号添加到数据中

torch.stack([t1, t1, t1], dim=1)与torch.hstack([t1, t1, t1])之间有什么区别?

无法使用 curve_fit() 在 python 中复制高斯函数的曲线拟合

如何在Pandas 中按条件计算分组?

是否可以将多个 if 转换为数组?

如何在带有 GUI 的 python 游戏中设置回答时间限制?

排队多个子进程

为什么不切换到 Python 3.x?

为什么 virtualenv 会有效地禁用 Python 3 制表符补全?

sys.stdin.readline() 读取时没有提示,返回 'nothing in between'

我们如何获得 __repr__() 的默认行为?

是否在未完成初始化的对象上调用了 del?

如何使用已打开并使用登录凭据登录的浏览器

用于 unicode 大写单词的 Python 正则表达式

使用 python 3.0 的 Numpy

在 Visual Studio Code 中调试 Scrapy 项目