我有10年的数据(下面的KF)在下面的格式,我试图汇总每个部门的每个月的数据.2023年4月的数据快照(在下面的示例中)应该会给出SALEL_1作为2023年4月的SALE_AMT,SALE_2作为之前1个月或2023年3月的SALES_AMT,依此类推.对于没有值,我填充NULL. 为了从这10年的数据创建主表,我在10年中使用了下面的PySpark代码10次,但我不知道如何转置或垂直堆叠计算,以便如果有人查询我的结果,他们可以根据他们过滤的月份和年份获得快照.(例如)对于在PySpark中更高效地完成这项工作,有什么建议吗?蒂娅!

kf=main file with 10 years data
mf=kf.dropDuplicates(['dept_ID'])
gf=kf
month_list={'1','2','3','4','5','6','7','8','9','10','11','12'}
window = Winddow().partitionBy("dept_ID")
for i in month_list:
   df = gf.filter(gf.month==i).withColumn("sale_"+i, sum(coalesce('sale_amt'), lit(0))).over(window))
   df = df.dropDuplicates(['dept_ID'])
   mf = mf.join(df, mf.dept_ID==df.dept_ID, 'left').drop(df.dept_ID)

kf:

dept_ID sale_amt    sale_date   sale_month  sale_year
1   10  4/1/2023    4   2023
1   60  4/1/2023    4   2023
1   30  3/1/2023    3   2023
1   15  3/1/2023    3   2023
1   12  2/1/2023    2   2023
1   10  1/1/2023    1   2023
1   90  1/1/2023    1   2023
1   40  12/1/2022   12  2022
1   40  11/1/2022   11  2022
1   75  10/1/2022   10  2022
1   30  9/1/2022    9   2022
1   50  9/1/2022    9   2022
1   25  8/1/2022    8   2022
1   40  8/1/2022    8   2022
1   70  7/1/2022    7   2022
1   80  5/1/2022    5   2022
1   10  5/1/2022    5   2022
1   45  4/1/2022    4   2022
1   15  4/1/2022    4   2022
2   10  4/1/2023    4   2023
2   60  4/1/2023    4   2023
2   30  3/1/2023    3   2023
2   15  3/1/2023    3   2023
2   12  2/1/2023    2   2023
2   10  1/1/2023    1   2023
2   90  1/1/2023    1   2023
2   40  12/1/2022   12  2023
2   40  11/1/2022   11  2023
2   80  10/1/2022   10  2023
2   30  9/1/2022    9   2023
3   50  9/1/2022    9   2023
3   25  8/1/2022    8   2023
3   40  8/1/2022    8   2023
3   70  7/1/2022    7   2023
3   80  5/1/2022    5   2023
3   10  5/1/2022    5   2023


预期结果:

enter image description here

推荐答案

这是另一个Spark-SQL解决方案.

假设op希望将给定Year_MONTER_NUMBER的12个月快照作为输入.

创建一个参考视图,其中包含从2010年开始到2025年的所有年月组合的空额值,即12*15.

val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("sale.csv")
val df2=df.withColumn("dt2",to_date(col("sale_date"),"MM/dd/yy"))
df2.createOrReplaceTempView("sale")

val ref_df = spark.sql(" select add_months('2010-01-01',id) yyyymm, cast(null as decimal(15,3)) amt from range(12*15)  order by 1 ")
ref_df.createOrReplaceTempView("ref")

spark.conf.set("spark.sql.crossJoin.enabled","true") // to allow cross join 
// Inputs
val year_month=202304
val month=year_month.toString.drop(4)
val year=year_month.toString.take(4)
val yearp=year.toInt-1
val dfs = spark.sql(s""" with t1 (select dept_id, date_format(dt2,'yyyyMM') yyyymm,  sum(sale_amt) sale_amt 
                                            from sale where year(dt2) in (${year},${yearp}) group by 1,2
                                   union all 
                                    select dept_id, date_format(yyyymm,'yyyyMM') yyyymm, amt from ref , (select distinct(dept_id) dept_id from sale) where year(yyyymm) in (${year},${yearp})
                                 )
 select dept_id, yyyymm, sale_amt amt from t1 
 where months_between(to_date('${year_month}01','yyyyMMdd'),to_date(yyyymm||'01','yyyyMMdd')) < 12
 and   months_between(to_date('${year_month}01','yyyyMMdd'),to_date(yyyymm||'01','yyyyMMdd')) >= 0   
""")

dfs.show(50,false)

dfs.createOrReplaceTempView("sale2")

val vl_ym = spark.sql("select collect_set(yyyymm) from sale2").as[Seq[String]].first.map( x => "'"+x+"'").sorted.reverse.mkString(",")
val dfs2 = spark.sql(s"""
select * from (select dept_id, yyyymm, sum(amt) amt from sale2 group by 1,2 ) t
 PIVOT ( 
  sum(amt) as amt
  FOR yyyymm IN ( ${vl_ym} )
  ) 
""")

dfs2.withColumn("year",lit(year)).withColumn("month",lit(month)).orderBy("dept_id").show(false)

最后一个输出是

+-------+------+------+------+-------+------+------+------+------+------+------+------+------+----+-----+
|dept_id|202304|202303|202302|202301 |202212|202211|202210|202209|202208|202207|202206|202205|year|month|
+-------+------+------+------+-------+------+------+------+------+------+------+------+------+----+-----+
|1      |70.000|45.000|12.000|100.000|40.000|40.000|75.000|80.000|65.000|70.000|null  |90.000|2023|04   |
|2      |70.000|45.000|12.000|100.000|40.000|40.000|80.000|30.000|null  |null  |null  |null  |2023|04   |
|3      |null  |null  |null  |null   |null  |null  |null  |50.000|65.000|70.000|null  |90.000|2023|04   |
+-------+------+------+------+-------+------+------+------+------+------+------+------+------+----+-----+

您可以重命名202304,202303.销售_1,销售_2..等

用于连接的字符串的spark 源部分:

>>> seqString=spark.sql("select array_sort(collect_set(yyyymm)) from ( select date_format(add_months(date'2023-01-01',id),'yyyyMM') yyyymm from range(12)) ").collect()[0][0]
>>> sorted_seq = sorted(seqString, reverse=True)
>>> addPrefix=''.join(map(lambda x: "'" + x + "'",sorted_seq))
>>> addPrefix
"'202312''202311''202310''202309''202308''202307''202306''202305''202304''202303''202302''202301'"
>>>

Python相关问答推荐

使用scipy. optimate.least_squares()用可变数量的参数匹配两条曲线

如何在箱形图中添加绘制线的传奇?

如何在Django基于类的视图中有效地使用UTE和RST HTIP方法?

Python虚拟环境的轻量级使用

无法使用requests或Selenium抓取一个href链接

无法定位元素错误404

如何获取numpy数组的特定索引值?

如何禁用FastAPI应用程序的Swagger UI autodoc中的application/json?

Matplotlib中的字体权重

计算空值

我对这个简单的异步者的例子有什么错误的理解吗?

使用Python异步地持久跟踪用户输入

如何将一组组合框重置回无 Select tkinter?

如何将相同组的值添加到嵌套的Pandas Maprame的倒数第二个索引级别

获取git修订版中每个文件的最后修改时间的最有效方法是什么?

如何训练每一个pandaprame行的线性回归并生成斜率

在不中断格式的情况下在文件的特定部分插入XML标签

函数()参数';代码';必须是代码而不是字符串

来自任务调度程序的作为系统的Python文件

如何计算Pandas 中具有特定条件的行之间的天差