Disclaimer该问题是包括这两个so问题(q1,q2)的主题的一部分

这些数据类似于ML最新数据集Rating ings.csv file(~891MB)中的电影评分.

有一次我读了csv文件与polars库一样:

movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))

Let's assume we want to compute the similarity between movies seen by user=1 (so for example 62 movies) with the rest of the movies in the dataset. FYI, the dataset has ~83,000 movies so for each other_movie (82,938) compute a similarity with each movie seen by user 1 (62 movies). The complexity is 62x82938 (iterations).

在本例中,报告的基准仅为400/82,938 other_movies

为此,我创建了两个polars数据帧.一个数据帧具有other_movies行(~82,938行),第二个数据帧仅具有用户观看的电影(62行).

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

结果是两个polars个行(电影)和

enter image description here

第一个数据帧只包含other_movies个,我们可以潜在地推荐给用户1他/她还没有看过它们.

第二个数据帧仅包含用户观看的电影.

接下来,我的方法是通过应用UDF函数迭代第一个数据帧的每一行.

item_metadata_similarity = (
    items_metadata.with_columns(
        similarity_score=pl.struct(pl.all()).map_elements(
            lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
            return_dtype=pl.List(pl.List(pl.Float64)),
            strategy="threading"
        )
    )
)

,其中item_compute_similarity_scoring_V2定义为:

def item_compute_similarity_scoring_V2(
    row,
    target_movies_metadata:pl.DataFrame
):
    users_item1 = np.asarray(row["users_seen_movie"])
    ratings_item1 = np.asarray(row["user_ratings"])
    computed_similarity: list=[]
    for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
        users_item2=np.asarray(row2["users_seen_movie"])
        ratings_item2=np.asarray(row2["user_ratings"])
        r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
        if r1.shape[0] != 0 and r2.shape[0] != 0:
            similarity_score = compute_similarity_score(r1, r2)
            if similarity_score > 0.0: #filter out negative or zero similarity scores
                computed_similarity.append((row2["movieId"], similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
    return most_similar_pairs

item_ratings&compute_similarity_score定义为

def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
    common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
    sr1 = r1[indices1]
    sr2 = r2[indices2]
    assert len(sr1)==len(sr2), "ratings don't have same lengths"
    return sr1, sr2

@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    cos_theta = 1.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return cos_theta

该函数基本上迭代第二个框架的每一行,并为每一行计算other_movie和用户所看电影之间的相似度.因此 我们对400部电影进行400*62次迭代,每other_movie次迭代产生62个相似度分数.

每次计算的结果都是一个模式为[[1, 0.20], [110, 0.34]]...的数组(每other_movie个数组长度为62对)

enter image description here

400部电影的基准

  1. INFO-ITEM-ITEM:计算出的400部电影的相似度得分:0:01:49.887032
  2. ~2分钟.
  3. 5GB的RAM使用

我想知道如何通过使用原生polars命令或利用numba框架的并行性来改进计算.

Update - 2nd approach using to_numpy() operations without iter_rows() and map_elements()

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(items_metadata.head(5))
target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(target_items_metadata.head(5))

对于第二种方法,items_metadatatarget_items_metadata是两个大的极表.

然后,我的下一步是使用to_numpy()命令将两个表都保存到numpy.ndarrays.

items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
    mask = items_metadata_array[:, 1] == other_movie
    other_movies_chunk = items_metadata_array[mask]
    u1 = other_movies_chunk[:,0].astype(np.int32)
    r1 = other_movies_chunk[:,2].astype(np.float32)
    computed_similarity: list=[]
    for i, user_movie in enumerate(user_rated_movies):
        print(user_movie)
        mask = target_items_metadata_array[:, 1] == user_movie
        target_movie_chunk = target_items_metadata_array[mask]
        u2 = target_movie_chunk[:,0].astype(np.int32)
        r2 = target_movie_chunk[:,2].astype(np.float32)
        common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
        if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
            similarity_score = compute_similarity_score(common_r1, common_r2)
            if similarity_score > 0.0:
                computed_similarity.append((user_movie, similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
    computed_similarity_scores[str(other_movie)] = most_similar_pairs

第二种进场方式的基准

  • Item-Item:计算出的400部电影的相似度得分:0:08:50.537102

推荐答案

我对Numba不太熟悉,所以 在try 比较计时之前,我要做的第一件事是创建一种"完全原生的"极点方法:

这是对当前方法的直接翻译(即它仍然包含"Double for循环"),因此它只是一个基准try .

因为它使用Lazy API,所以不计算循环中的任何内容.

当调用.collect()时,这一切都完成了(这允许Polar并行化工作).

在收集结果之后,将执行对simily_core的> 0.0过滤.

input_id = 1

is_user_rating = pl.col("userId") == input_id

can_recommend = (
    pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)

cosine_similarity = (
    pl.col('rating').dot('rating_right') /  
    ( pl.col('rating').pow(2).sum().sqrt() * 
      pl.col('rating_right').pow(2).sum().sqrt() ) 
)

user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()

potential_movies_to_recommend = (
    movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)

# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()

computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
    for user_movie in user_rated_movies:
        score = (
            df.filter(pl.col("movieId") == user_movie)
              .join(
                 df.filter(pl.col("movieId") == other_movie),
                 on = "userId"
              )
              .select(cosine = cosine_similarity)
              .select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
        )
        computed_similarity_scores.append(score)
        
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
shape: (62, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i32        ┆ i32         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 110        ┆ 2           ┆ 0.950086         │
│ 158        ┆ 2           ┆ 0.957631         │
│ 260        ┆ 2           ┆ 0.945542         │
│ …          ┆ …           ┆ …                │
│ 49647      ┆ 2           ┆ 0.9411           │
│ 52458      ┆ 2           ┆ 0.955353         │
│ 53996      ┆ 2           ┆ 0.930388         │
│ 54259      ┆ 2           ┆ 0.95469          │
└────────────┴─────────────┴──────────────────┘

测试.head(100)I获得58s运行时与测试111s运行时相比,内存消耗是相同的.

鸭子

相比之下,鸭子数据库的.head(400)分跑到了5s

import 鸭子

df = 鸭子.sql("""
with 
   df     as (from 'imdb.parquet'),
   user   as (from df where movieId in (from df select movieId where userId = 1)),
   movies as (from df where movieId not in (from df select movieId where userId = 1)),
   other  as (from df where movieId in (from movies select distinct movieId order by movieId limit 400))
   
from
   user join other using (userId)
   
select   
   user.movieId user_movie,
   other.movieId other_movie,
   list_cosine_similarity(
      list(user.rating), list(other.rating)
   ) similarity_score
   
group by 
   user_movie, other_movie   
order by 
   user_movie, other_movie
""").pl()
shape: (24_764, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i64        ┆ i64         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 1          ┆ 3           ┆ 0.941348         │
│ 1          ┆ 4           ┆ 0.92169          │
│ 1          ┆ 5           ┆ 0.943999         │
│ …          ┆ …           ┆ …                │
│ 54259      ┆ 407         ┆ 0.941241         │
│ 54259      ┆ 408         ┆ 0.934745         │
│ 54259      ┆ 409         ┆ 0.937361         │
│ 54259      ┆ 410         ┆ 0.94937          │
└────────────┴─────────────┴──────────────────┘
Elapsed time: 5.02638 seconds

Python相关问答推荐

如何在WTForm中使用back_plumates参考brand_id?

Flask主机持续 bootstrap 本地IP| Python

如何在矩阵上并行化简单循环?

PyQt5如何将pyuic 5生成的Python类添加到QStackedWidget中?

请从Python访问kivy子部件的功能需要帮助

使用多个性能指标执行循环特征消除

查找下一个值=实际值加上使用极点的50%

acme错误-Veritas错误:模块收件箱没有属性linear_util'

try 与gemini-pro进行多轮聊天时出错

处理(潜在)不断增长的任务队列的并行/并行方法

Mistral模型为不同的输入文本生成相同的嵌入

NP.round解算数据后NP.unique

如何在Python中并行化以下搜索?

实现神经网络代码时的TypeError

Python列表不会在条件while循环中正确随机化'

如何指定列数据类型

Python Pandas获取层次路径直到顶层管理

如何使regex代码只适用于空的目标单元格

网格基于1.Y轴与2.x轴显示在matplotlib中

python panda ExcelWriter切换动态公式到数组公式