我在Python语言中有以下两极df

df = pl.DataFrame({
    "user_movies": [[7064, 7153, 78009], [6, 7, 1042], [99, 110, 3927], [2, 11, 152081], [260, 318, 195627]],
    "user_ratings": [[5.0, 5.0, 5.0], [4.0, 2.0, 4.0], [4.0, 4.0, 3.0], [3.5, 3.0, 4.0], [1.0, 4.5, 0.5]],
    "common_movies": [[7064, 7153], [7], [110, 3927], [2], [260, 195627]]
})
print(df.head())

我想创建一个名为"COMMOVE_RATIONS"的新列,它将只从每个评级列表中获取在普通电影中被评级的电影的索引.例如,对于第一行,我应该只返回电影的评级[7064,7153,],对于第二行,我应该返回电影的评级[7],依此类推.

为此,我创建了以下函数:

def get_common_movie_ratings(row): #Each row is a tuple of arrays.
    common_movies = row[2] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
    user_ratings = row[1]
    ratings_for_common_movies= [user_ratings[list(row[0]).index(movie)] for movie in common_movies]
    return ratings_for_common_movies

最后,我将UDF函数应用于数据帧,如下所示

df["common_movie_ratings"] = df.apply(get_common_movie_ratings, return_dtype=pl.List(pl.Float64))

每次我应用该函数时,在第3次迭代/行时,我收到以下错误

预期的元组,已获取列表

Output after every 3rd iteration

我还try 了一种不同的UDF函数方法,比如

def get_common_movie_ratings(row):
   common_movies = row[2]
   user_ratings = row[1]
   ratings = [user_ratings[i] for i, movie in enumerate(row[0]) if movie in common_movies]
   return ratings

但是在第三次迭代中,我又收到了同样的错误.

推荐答案

你的方法出了什么问题?

忽略了对pythonUDF的性能影响,在您的方法中有两个地方出了问题.

  1. apply在您try 使用的上下文中现在是map_rows,它期望输出是一个元组,其中元组的每个元素都是一个输出列.您的函数不会输出元组.如果将返回行更改为return (ratings_for_common_movies,),则它将输出一个元组并将起作用.

  2. 您不能使用方括号表示法向极点数据帧添加列.唯一可以在=的左边的是一辆东风,而不是df['new_column']=<something>.如果您使用的旧版本确实允许这样做,那么您就不应该这样做,部分原因是新版本不允许这样做.这意味着你必须做一些类似df.with_columns(new_column=<some_expression>)的事情

在使用map_rows向现有DF添加列的情况下,您可以使用hstack,如下所示:

df=df.hstack(df.map_rows(get_common_movie_ratings)).rename({'column_0':'common_movie_ratings'})

上面的方法实际上是一种反模式,因为当本机方法可以工作时,使用map_rowsmap_elements等中的任何一种都会变得更慢、效率更低.滚动到底部以获得map_elements方法.

本机解决方案前言

如果我们假设列表总是3长,那么你可以这样做…

# this is the length of the user_movies lists
n_count=3

df.with_columns(
    # first gather the items from user_movies based on (yet to be created) 
    # indices list 
    pl.col('user_movies').list.gather(
        # use walrus operator to create new list which is the indices where 
        # user_movies are in common_movies this works by looping through 
        # each element and checking if it's in common_movies. When it is in common_movies
        # then it stores its place in the loop n variable. The n_count is the list size
        (indices:=pl.concat_list(
            pl.when(
                pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
            )
            .then(pl.lit(n))
            for n in range(n_count)
        ).list.drop_nulls())
    ),
    # use the same indicies list to gather the corresponding elements from user_ratings
    pl.col('user_ratings').list.gather(indices)
)

注意,我们通过循环从0到列表长度为n的范围来生成索引列表,并且当与user_movies的第n个位置相关联的项在common_movies中时,则将该n放入索引列表中.不幸的是,对于list个类型的列,在Polars中没有类似于.index的方法,因此,在不分解列表的情况下,这是我能想到的创建索引列表的最好方法.

本机解决方案答案

极点本身不能递归地设置n_count,所以我们需要手动设置.通过使用延迟求值,这比其他方法更快,因为它可以并行计算每n_count个 case .

(
    pl.concat([ # this is a list comprehension
        # From here to the "for n_count..." line is the same as the previous code
        # snippet except that, here, it's called inner_df and it's being 
        # made into a lazy frame 
        inner_df.lazy().with_columns(
        pl.col('user_movies').list.gather(
            (indices:=pl.concat_list(
                pl.when(
                    pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
                )
                .then(pl.lit(n))
                for n in range(n_count)
            ).list.drop_nulls())
        ),
        pl.col('user_ratings').list.gather(indices)
    )
    # this is the iterating part of the list comprehension
    # it takes the original df, creates a column which is
    # a row index, then it creates a column which is the
    # length of the list, it then partitions up the df into
    # multiple dfs where each of the inner_dfs only has rows
    # where the list length is the same. By using as_dict=True
    # and .items(), it gives a convenient way to unpack the
    # n_count (length of the list) and the inner_df  
    for n_count, inner_df in (
        df
        .with_row_count('i') # original row position
        .with_columns(n_count=pl.col('user_movies').list.len())
        .partition_by('n_count', as_dict=True, include_key=False)
        .items())
    ])
    .sort('i') # sort by original row position
    .drop('i') # drop the row position column
    .collect() # run all of the queries in parallel
    )
shape: (5, 3)
┌───────────────┬──────────────┬───────────────┐
│ user_movies   ┆ user_ratings ┆ common_movies │
│ ---           ┆ ---          ┆ ---           │
│ list[i64]     ┆ list[f64]    ┆ list[i64]     │
╞═══════════════╪══════════════╪═══════════════╡
│ [7064, 7153]  ┆ [5.0, 5.0]   ┆ [7064, 7153]  │
│ [7]           ┆ [2.0]        ┆ [7]           │
│ [110, 3927]   ┆ [4.0, 3.0]   ┆ [110, 3927]   │
│ [2]           ┆ [3.5]        ┆ [2]           │
│ [260, 195627] ┆ [1.0, 0.5]   ┆ [260, 195627] │
└───────────────┴──────────────┴───────────────┘

通过在concat的第一部分转换为lazy,它允许并行计算每个帧,其中每个帧是基于列表长度的子集.它还允许indices成为CSER,这意味着它只计算一次,即使有2个引用.

顺便说一句,对于更少的代码但更多的处理/时间,您可以简单地将前导码部分中的n_counts设置为n_count=df.select(n_count=pl.col('user_movies').list.len().max()).item(),然后只运行该部分中的其余部分.这种方法将比这种方法慢得多,因为对于每一行,它都会迭代元素,直到最大列表长度,这会增加不必要的判断.它也没有得到同样的并行性.换句话说,它用更少的CPU内核做更多的工作.

基准

虚假数据创建

n=10_000_000
df = (
    pl.DataFrame({
        'user':np.random.randint(1,int(n/10),size=n),
        'user_movies':np.random.randint(1,50,n),
        'user_ratings':np.random.uniform(1,5, n),
        'keep':np.random.randint(1,100,n)
    })
    .group_by('user')
    .agg(
        pl.col('user_movies'),
        pl.col('user_ratings').round(1),
        common_movies=pl.col('user_movies').filter(pl.col('keep')>75)
        )
    .filter(pl.col('common_movies').list.len()>0)
    .drop('user')
    )
print(df.head(10))
shape: (5, 3)
┌────────────────┬───────────────────┬───────────────┐
│ user_movies    ┆ user_ratings      ┆ common_movies │
│ ---            ┆ ---               ┆ ---           │
│ list[i64]      ┆ list[f64]         ┆ list[i64]     │
╞════════════════╪═══════════════════╪═══════════════╡
│ [23, 35, … 22] ┆ [3.4, 1.6, … 4.0] ┆ [35]          │
│ [30, 18, … 26] ┆ [4.9, 1.9, … 2.3] ┆ [10]          │
│ [25, 19, … 29] ┆ [1.7, 1.7, … 1.1] ┆ [18, 40, 38]  │
│ [31, 15, … 42] ┆ [2.9, 1.8, … 4.3] ┆ [31, 4, … 42] │
│ [36, 16, … 49] ┆ [1.0, 2.0, … 4.2] ┆ [36]          │
└────────────────┴───────────────────┴───────────────┘

我的方法(16个线程):每循环1.92 s ± 195 ms(平均值±标准差).dev.共7次运行,每次1个循环)

我的方法(8个线程):每循环2.31 s ± 175 ms(平均值±标准差).dev.共7次运行,每次1个循环)

我的方法(4个线程):3.14 S±221ms/循环(平均值±标准.戴夫.共7次运行,每次1次循环)

@jqurn:2.73 S±130ms/环(平均值±标准戴夫.共7次运行,每次1次循环)

S:每循环9.12ms±195ms(平均值±标准戴夫.共7次运行,每次1次循环)

大n_计数前导:9·77 S±1·61 S(平均值±标准戴夫.共7次运行,每次1次循环)

我的方法和map_row各自使用大约1 GB的RAM,但爆炸性的方法更接近3 GB.

struct and map_elements

与其使用map_rows,这是非常笨拙的,你可以使用map_elements.它有自己的笨拙,因为您经常需要将输入包装在 struct 中,但您可以更干净地添加列,并且不必依赖于列位置.

例如,您可以按如下方式定义和使用您的函数:

def get_common_movie_ratings(row): #Each row is a tuple of arrays.
    common_movies = row['common_movies'] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
    user_ratings = row['user_ratings']
    ratings_for_common_movies= [user_ratings[list(row['user_movies']).index(movie)] for movie in common_movies]
    return ratings_for_common_movies
df.with_columns(user_ratings=pl.struct(pl.all()).map_elements(get_common_movie_ratings))

这里发生的情况是,只能从单个列调用map_elements,因此如果您的定制函数需要多个输入,您可以将它们包装在一个 struct 中.该 struct 将被转换为一个字典,其中键具有列名.与map_rows相比,这种方法没有任何固有的性能优势,它只是更好的语法.

最后一点

正如@jqury在他的回答的 comments 中提到的那样,通过将这一逻辑与这些列表的形成结合起来,几乎肯定可以在语法和性能方面简化这一点.换句话说,你有第一步:_第二步:这个问题.虽然我只能猜测步骤1中发生了什么,但将这两个步骤结合起来很可能是一项值得的努力.

Python相关问答推荐

Python tkinter关闭第一个窗口,同时打开第二个窗口

Pandas 密集排名具有相同值,按顺序排列

带有计数值的Pandas数据帧

我可以使用极点优化这个面向cpu的pandas代码吗?

Snap 7- read_Area用于类似地址的变量

使用Python OpenCV的文本检测分割

在Docker中运行HAProxy时无法获得503服务

如果我已经使用了time,如何要求Python在12秒后执行另一个操作.sleep

Python:在类对象内的字典中更改所有键的索引,而不是仅更改一个键

更改matplotlib彩色条的字体并勾选标签?

将DF中的名称与另一DF拆分并匹配并返回匹配的公司

可变参数数量的重载类型(args或kwargs)

将两只Pandas rame乘以指数

用Python解密Java加密文件

基于字符串匹配条件合并两个帧

如何在表中添加重复的列?

多指标不同顺序串联大Pandas 模型

Polars asof在下一个可用日期加入

Python Tkinter为特定样式调整所有ttkbootstrap或ttk Button填充的大小,适用于所有主题

为什么调用函数的值和次数不同,递归在代码中是如何工作的?