你的方法出了什么问题?
忽略了对pythonUDF的性能影响,在您的方法中有两个地方出了问题.
apply
在您try 使用的上下文中现在是map_rows
,它期望输出是一个元组,其中元组的每个元素都是一个输出列.您的函数不会输出元组.如果将返回行更改为return (ratings_for_common_movies,)
,则它将输出一个元组并将起作用.
您不能使用方括号表示法向极点数据帧添加列.唯一可以在=
的左边的是一辆东风,而不是df['new_column']=<something>
.如果您使用的旧版本确实允许这样做,那么您就不应该这样做,部分原因是新版本不允许这样做.这意味着你必须做一些类似df.with_columns(new_column=<some_expression>)
的事情
在使用map_rows
向现有DF添加列的情况下,您可以使用hstack
,如下所示:
df=df.hstack(df.map_rows(get_common_movie_ratings)).rename({'column_0':'common_movie_ratings'})
上面的方法实际上是一种反模式,因为当本机方法可以工作时,使用map_rows
、map_elements
等中的任何一种都会变得更慢、效率更低.滚动到底部以获得map_elements
方法.
本机解决方案前言
如果我们假设列表总是3长,那么你可以这样做…
# this is the length of the user_movies lists
n_count=3
df.with_columns(
# first gather the items from user_movies based on (yet to be created)
# indices list
pl.col('user_movies').list.gather(
# use walrus operator to create new list which is the indices where
# user_movies are in common_movies this works by looping through
# each element and checking if it's in common_movies. When it is in common_movies
# then it stores its place in the loop n variable. The n_count is the list size
(indices:=pl.concat_list(
pl.when(
pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
)
.then(pl.lit(n))
for n in range(n_count)
).list.drop_nulls())
),
# use the same indicies list to gather the corresponding elements from user_ratings
pl.col('user_ratings').list.gather(indices)
)
注意,我们通过循环从0到列表长度为n
的范围来生成索引列表,并且当与user_movies
的第n
个位置相关联的项在common_movies
中时,则将该n
放入索引列表中.不幸的是,对于list
个类型的列,在Polars中没有类似于.index
的方法,因此,在不分解列表的情况下,这是我能想到的创建索引列表的最好方法.
本机解决方案答案
极点本身不能递归地设置n_count
,所以我们需要手动设置.通过使用延迟求值,这比其他方法更快,因为它可以并行计算每n_count
个 case .
(
pl.concat([ # this is a list comprehension
# From here to the "for n_count..." line is the same as the previous code
# snippet except that, here, it's called inner_df and it's being
# made into a lazy frame
inner_df.lazy().with_columns(
pl.col('user_movies').list.gather(
(indices:=pl.concat_list(
pl.when(
pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
)
.then(pl.lit(n))
for n in range(n_count)
).list.drop_nulls())
),
pl.col('user_ratings').list.gather(indices)
)
# this is the iterating part of the list comprehension
# it takes the original df, creates a column which is
# a row index, then it creates a column which is the
# length of the list, it then partitions up the df into
# multiple dfs where each of the inner_dfs only has rows
# where the list length is the same. By using as_dict=True
# and .items(), it gives a convenient way to unpack the
# n_count (length of the list) and the inner_df
for n_count, inner_df in (
df
.with_row_count('i') # original row position
.with_columns(n_count=pl.col('user_movies').list.len())
.partition_by('n_count', as_dict=True, include_key=False)
.items())
])
.sort('i') # sort by original row position
.drop('i') # drop the row position column
.collect() # run all of the queries in parallel
)
shape: (5, 3)
┌───────────────┬──────────────┬───────────────┐
│ user_movies ┆ user_ratings ┆ common_movies │
│ --- ┆ --- ┆ --- │
│ list[i64] ┆ list[f64] ┆ list[i64] │
╞═══════════════╪══════════════╪═══════════════╡
│ [7064, 7153] ┆ [5.0, 5.0] ┆ [7064, 7153] │
│ [7] ┆ [2.0] ┆ [7] │
│ [110, 3927] ┆ [4.0, 3.0] ┆ [110, 3927] │
│ [2] ┆ [3.5] ┆ [2] │
│ [260, 195627] ┆ [1.0, 0.5] ┆ [260, 195627] │
└───────────────┴──────────────┴───────────────┘
通过在concat
的第一部分转换为lazy
,它允许并行计算每个帧,其中每个帧是基于列表长度的子集.它还允许indices
成为CSER,这意味着它只计算一次,即使有2个引用.
顺便说一句,对于更少的代码但更多的处理/时间,您可以简单地将前导码部分中的n_counts
设置为n_count=df.select(n_count=pl.col('user_movies').list.len().max()).item()
,然后只运行该部分中的其余部分.这种方法将比这种方法慢得多,因为对于每一行,它都会迭代元素,直到最大列表长度,这会增加不必要的判断.它也没有得到同样的并行性.换句话说,它用更少的CPU内核做更多的工作.
基准
虚假数据创建
n=10_000_000
df = (
pl.DataFrame({
'user':np.random.randint(1,int(n/10),size=n),
'user_movies':np.random.randint(1,50,n),
'user_ratings':np.random.uniform(1,5, n),
'keep':np.random.randint(1,100,n)
})
.group_by('user')
.agg(
pl.col('user_movies'),
pl.col('user_ratings').round(1),
common_movies=pl.col('user_movies').filter(pl.col('keep')>75)
)
.filter(pl.col('common_movies').list.len()>0)
.drop('user')
)
print(df.head(10))
shape: (5, 3)
┌────────────────┬───────────────────┬───────────────┐
│ user_movies ┆ user_ratings ┆ common_movies │
│ --- ┆ --- ┆ --- │
│ list[i64] ┆ list[f64] ┆ list[i64] │
╞════════════════╪═══════════════════╪═══════════════╡
│ [23, 35, … 22] ┆ [3.4, 1.6, … 4.0] ┆ [35] │
│ [30, 18, … 26] ┆ [4.9, 1.9, … 2.3] ┆ [10] │
│ [25, 19, … 29] ┆ [1.7, 1.7, … 1.1] ┆ [18, 40, 38] │
│ [31, 15, … 42] ┆ [2.9, 1.8, … 4.3] ┆ [31, 4, … 42] │
│ [36, 16, … 49] ┆ [1.0, 2.0, … 4.2] ┆ [36] │
└────────────────┴───────────────────┴───────────────┘
我的方法(16个线程):每循环1.92 s ± 195 ms(平均值±标准差).dev.共7次运行,每次1个循环)
我的方法(8个线程):每循环2.31 s ± 175 ms(平均值±标准差).dev.共7次运行,每次1个循环)
我的方法(4个线程):3.14 S±221ms/循环(平均值±标准.戴夫.共7次运行,每次1次循环)
@jqurn:2.73 S±130ms/环(平均值±标准戴夫.共7次运行,每次1次循环)
S:每循环9.12ms±195ms(平均值±标准戴夫.共7次运行,每次1次循环)
大n_计数前导:9·77 S±1·61 S(平均值±标准戴夫.共7次运行,每次1次循环)
我的方法和map_row各自使用大约1 GB的RAM,但爆炸性的方法更接近3 GB.
struct and map_elements
与其使用map_rows
,这是非常笨拙的,你可以使用map_elements
.它有自己的笨拙,因为您经常需要将输入包装在 struct 中,但您可以更干净地添加列,并且不必依赖于列位置.
例如,您可以按如下方式定义和使用您的函数:
def get_common_movie_ratings(row): #Each row is a tuple of arrays.
common_movies = row['common_movies'] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
user_ratings = row['user_ratings']
ratings_for_common_movies= [user_ratings[list(row['user_movies']).index(movie)] for movie in common_movies]
return ratings_for_common_movies
df.with_columns(user_ratings=pl.struct(pl.all()).map_elements(get_common_movie_ratings))
这里发生的情况是,只能从单个列调用map_elements
,因此如果您的定制函数需要多个输入,您可以将它们包装在一个 struct 中.该 struct 将被转换为一个字典,其中键具有列名.与map_rows
相比,这种方法没有任何固有的性能优势,它只是更好的语法.
最后一点
正如@jqury在他的回答的 comments 中提到的那样,通过将这一逻辑与这些列表的形成结合起来,几乎肯定可以在语法和性能方面简化这一点.换句话说,你有第一步:_第二步:这个问题.虽然我只能猜测步骤1中发生了什么,但将这两个步骤结合起来很可能是一项值得的努力.