我想数一数这current month and the previous 2 months封邮箱的不同数量.我希望语法使用的是PySpark,而不是SQL.
示例输入:
df = spark.createDataFrame(
[('2022-01-01', 'A'),
('2022-01-01', 'A'),
('2022-01-01', 'A'),
('2022-01-01', 'B'),
('2022-01-01', 'Z'),
('2022-01-01', 'Z'),
('2022-02-01', 'A'),
('2022-02-01', 'B'),
('2022-02-01', 'C'),
('2022-02-01', 'D'),
('2022-02-01', 'Z'),
('2022-02-01', 'A'),
('2022-02-01', 'F'),
('2022-03-01', 'A'),
('2022-03-01', 'B'),
('2022-03-01', 'B'),
('2022-03-01', 'C'),
('2022-04-01', 'G'),
('2022-04-01', 'H'),
('2022-05-01', 'G'),
('2022-05-01', 'H'),
('2022-05-01', 'I'),
('2022-06-01', 'I'),
('2022-06-01', 'J'),
('2022-06-01', 'K')],
['yyyy_mm_dd', 'email']
)
请注意,情况并非如此.
yyyy_mm_dd | count_distinct_email |
---|---|
2022-01-01 | 3 |
2022-02-01 | 6 |
2022-03-01 | 6 |
2022-04-01 | 8 |
2022-05-01 | 6 |
2022-06-01 | 5 |
由于rangeBetweet不支持月份,我被迫使用SQL语法来实现滚动DISTINCT计数.我试过这个
users_base.createOrReplaceTempView('users_base')
users_unique = spark.sql(
'SELECT \
yyyy_mm_dd, \
COUNT(DISTINCT soylent_booker_email_id_orig) OVER ( \
ORDER BY CAST(yyyy_mm_dd AS timestamp) ASC\
RANGE BETWEEN INTERVAL 2 MONTHS PRECEDING AND CURRENT ROW \
) AS users_unique_count \
FROM \
users_base'
)
但它不起作用,我发现窗口函数不支持Count DISTINCT.有人知道如何才能生成我想要的输出吗?谢谢!