我想数一数这current month and the previous 2 months封邮箱的不同数量.我希望语法使用的是PySpark,而不是SQL.

示例输入:

df = spark.createDataFrame(
    [('2022-01-01', 'A'),
     ('2022-01-01', 'A'),
     ('2022-01-01', 'A'),
     ('2022-01-01', 'B'),
     ('2022-01-01', 'Z'),
     ('2022-01-01', 'Z'),
     ('2022-02-01', 'A'),
     ('2022-02-01', 'B'),
     ('2022-02-01', 'C'),
     ('2022-02-01', 'D'),
     ('2022-02-01', 'Z'),
     ('2022-02-01', 'A'),
     ('2022-02-01', 'F'),
     ('2022-03-01', 'A'),
     ('2022-03-01', 'B'),
     ('2022-03-01', 'B'),
     ('2022-03-01', 'C'),
     ('2022-04-01', 'G'),
     ('2022-04-01', 'H'),
     ('2022-05-01', 'G'),
     ('2022-05-01', 'H'),
     ('2022-05-01', 'I'),
     ('2022-06-01', 'I'),
     ('2022-06-01', 'J'),
     ('2022-06-01', 'K')],    
    ['yyyy_mm_dd', 'email']
)

请注意,情况并非如此.

yyyy_mm_dd count_distinct_email
2022-01-01 3
2022-02-01 6
2022-03-01 6
2022-04-01 8
2022-05-01 6
2022-06-01 5

由于rangeBetweet不支持月份,我被迫使用SQL语法来实现滚动DISTINCT计数.我试过这个

users_base.createOrReplaceTempView('users_base')

users_unique = spark.sql(
                         'SELECT \
                             yyyy_mm_dd, \
                             COUNT(DISTINCT soylent_booker_email_id_orig) OVER ( \
                                 ORDER BY CAST(yyyy_mm_dd AS timestamp) ASC\
                                 RANGE BETWEEN INTERVAL 2 MONTHS PRECEDING AND CURRENT ROW \
                                ) AS users_unique_count \
                         FROM \
                             users_base'
                         )

但它不起作用,我发现窗口函数不支持Count DISTINCT.有人知道如何才能生成我想要的输出吗?谢谢!

推荐答案

users_unique = spark.sql("""
    SELECT yyyy_mm_dd,
           size(array_distinct(flatten(collect_set(emails) over (order by cast (yyyy_mm_dd as timestamp) asc
                                                                 range between interval 2 months preceding AND current row)))) as count_distinct_email
      FROM (SELECT yyyy_mm_dd, collect_set(email) as emails
              FROM users_base
             GROUP BY yyyy_mm_dd)
""")

这里的子查询进行分组和过滤重复项,然后外部查询运行窗口函数.现在我们需要表演一些技巧:

  • emails是一个集合,因此collect_set(emails)返回集合的集合
  • flatten返回平面化数组(未设置-因此包括重复项)
  • array_distinct过滤掉重复项
  • size,最后给出最终的非重复计数

Sql相关问答推荐

用于匹配红旗和绿旗的SQL查询

删除MariaDB数据库中的JSON数据

当一个视图在Postgres中失效时?

解析SQL Server中的嵌套JSON

SQL:如何查找聚合满足条件的连续日期

返回UPSERT中的旧行值

Oracle分层查询-两条路径在末尾合并为一条

每个分组最多 Select 最后 2 个值并并排显示它们

根据标识符将两行合并为一行

将具有嵌套 XML 的列转换为 SQL 中的表格格式?

如何判断小数点后千位是否不为0

获取多个开始-结束时间戳集之间经过的时间

如何根据 SQL Server 中 1 条语句中 SELECT 的结果进行 INSERT 或 UPDATE

SQL for Smarties 类型问题:从表中 Select 记录,并对某些值进行分组

SQL:无重复项的两个聚合函数

比使用NOT EXISTS更高效的SQL删除方法是什么?

在 SQL 中使用循环遍历按时间顺序排列的数据

如何根据某个值在where子句中添加某个条件

如何在 RavenDB Studio (RQL) 中插入更新文档

聚合 Athena 中的列