我想在Spark中写merge_asof个Pandas 的函数.

下面是一个示例:

df1 = spark.createDataFrame(
      [
          (datetime(2019,2,3,13,30,0,23),"GOOG",720.5,720.93),
          (datetime(2019,2,3,13,30,0,23),"MSFT",51.95,51.96),
          (datetime(2019,2,3,13,30,0,20),"MSFT",51.97,51.98),
          (datetime(2019,2,3,13,30,0,41),"MSFT",51.99,52.0),
          (datetime(2019,2,3,13,30,0,48),"GOOG",720.5,720.93),
          (datetime(2019,2,3,13,30,0,49),"AAPL",97.99,98.01),
          (datetime(2019,2,3,13,30,0,72),"GOOG",720.5,720.88),
          (datetime(2019,2,3,13,30,0,75),"MSFT",52.1,52.03)
      ],
      (
          "time", 
          "ticker",
          "bid",
          "ask"
      )
)

df2 = spark.createDataFrame(
      [
          (datetime(2019,2,3,13,30,0,23),"MSFT",51.95,75),
          (datetime(2019,2,3,13,30,0,38),"MSFT",51.95,155),
          (datetime(2019,2,3,13,30,0,48),"GOOG",720.77,100),
           (datetime(2019,2,3,13,30,0,48),"GOOG",720.92,100),
          (datetime(2019,2,3,13,30,0,48),"AAPL",98.0,100),
      ],
      (
          "time", 
          "ticker",
          "price",
          "quantity"
      )
)

Python

d1 = df1.toPandas().sort_values("time", ascending=True)
d2 = df2.toPandas().sort_values("time", ascending=True)

pd.merge_asof(d2, d1, on='time', by='ticker')

输出:

  time    ticker  price   quantity    bid ask
0 2019-02-03 13:30:00.000023  MSFT    51.95   75  51.95   51.96
1 2019-02-03 13:30:00.000038  MSFT    51.95   155 51.95   51.96
2 2019-02-03 13:30:00.000048  GOOG    720.77  100 720.50  720.93
3 2019-02-03 13:30:00.000048  GOOG    720.92  100 720.50  720.93
4 2019-02-03 13:30:00.000048  AAPL    98.00   100 NaN NaN

Using UDF in spark

def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="ticker")

    
df2.sort("time").groupby("ticker").cogroup(df1.sort("time").groupby("ticker")).applyInPandas(
  asof_join, schema="time timestamp, ticker string, price float,quantity int,bid float, ask float"
).show(10, False)

输出:

+--------------------------+------+------+--------+-----+------+
|time                      |ticker|price |quantity|bid  |ask   |
+--------------------------+------+------+--------+-----+------+
|2019-02-03 13:30:00.000048|AAPL  |98.0  |100     |null |null  |
|2019-02-03 13:30:00.000048|GOOG  |720.77|100     |720.5|720.93|
|2019-02-03 13:30:00.000048|GOOG  |720.92|100     |720.5|720.93|
|2019-02-03 13:30:00.000023|MSFT  |51.95 |75      |51.95|51.96 |
|2019-02-03 13:30:00.000038|MSFT  |51.95 |155     |51.95|51.96 |
+--------------------------+------+------+--------+-----+------+

NOTE

udf可以工作并提供正确的结果,但我想知道是否有更有效的方法在PySpark中使用窗口函数?我正在处理 Big Data ,udf是造成瓶颈的原因.

推荐答案

您可以先加入,然后使用last over window:

from pyspark.sql import functions as F, Window as W

df = df2.join(df1, ['time', 'ticker'], 'left')
w = W.partitionBy('ticker').orderBy('time')
df = df.withColumn('bid', F.coalesce('bid', F.last('bid', True).over(w)))
df = df.withColumn('ask', F.coalesce('ask', F.last('ask', True).over(w)))

df.show(truncate=0)
# +--------------------------+------+------+--------+-----+------+
# |time                      |ticker|price |quantity|bid  |ask   |
# +--------------------------+------+------+--------+-----+------+
# |2019-02-03 13:30:00.000048|AAPL  |98.0  |100     |null |null  |
# |2019-02-03 13:30:00.000048|GOOG  |720.77|100     |720.5|720.93|
# |2019-02-03 13:30:00.000048|GOOG  |720.92|100     |720.5|720.93|
# |2019-02-03 13:30:00.000023|MSFT  |51.95 |75      |51.95|51.96 |
# |2019-02-03 13:30:00.000038|MSFT  |51.95 |155     |51.95|51.96 |
# +--------------------------+------+------+--------+-----+------+

Python相关问答推荐

使用Python进行网页抓取,没有页面

如何在Power Query中按名称和时间总和进行分组

了解shuffle在NP.random.Generator.choice()中的作用

使用Python Cerberus初始化一个循环数据 struct (例如树)(v1.3.5)

使用matplotlib pcolormesh,如何停止从一行绘制的磁贴连接到上下行?

如何计算列表列行之间的公共元素

配置Sweetviz以分析对象类型列,而无需转换

使用SciPy进行曲线匹配未能给出正确的匹配

即使在可见的情况下也不相互作用

替换字符串中的多个重叠子字符串

管道冻结和管道卸载

如何从在虚拟Python环境中运行的脚本中运行需要宿主Python环境的Shell脚本?

django禁止直接分配到多对多集合的前端.使用user.set()

使用setuptools pyproject.toml和自定义目录树构建PyPi包

Pre—Commit MyPy无法禁用非错误消息

在两极中过滤

为什么常规操作不以其就地对应操作为基础?

python sklearn ValueError:使用序列设置数组元素

计算空值

如何将一组组合框重置回无 Select tkinter?