我在配置单元中有一个表,我想在循环中的条件下查询它,并将结果动态存储在多个pyspark数据帧中.

Base Query

g1 = """
    select * from db.hive_table where group =  1
"""

group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()

总共有80个组,目前分别为Group=2、Group=3等运行上述代码.

My useless iteration code

    # changes the geometry type to obj

df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
         group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
         group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
         group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
         group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
         group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
         group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
         group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,
         
# num_list=[1,2,3,4,5,5,6,6]

for d in df_list:
    for i in range(1,80):
         gi = """
        select * from db.hive_table where group =  $i
        """
    
        group_i = spk.sql(gi)
        print(group_i.show(3))
        print(group_i.printSchema())
        print((group_i.count(), len(group_i.columns)))
        return group_i = group_i.toPandas()

请求帮助以指导我解决此问题,并帮助我增加编码知识.

提前谢谢.

推荐答案

使用列表

python/pyspark不允许动态创建变量名.然而,您可以创建一个数据帧列表,可以像sdf_list[0].show()sdf_list[1].toPandas()一样使用.

sdf_list = []

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_list.append((i, filtered_sdf))  # (<filter/group identifier>, <spark dataframe>)
    del filtered_sdf

现在,sdf_list有一个可以使用列表索引访问的spark数据帧列表.e、 例如,可以使用[0]访问第一个数据帧,打印将验证它是否为数据帧.

print(sdf_list[0])
# (1, DataFrame[col1: bigint, dt: date, col3: bigint])
# (<filter/group identifier>, <spark dataframe>)

列表可以迭代,其中的所有数据帧可以单独使用.例如.,

for (i, sdf) in sdf_list[:2]:
    print("dataframe {0}'s count:".format(i), sdf.count())

# dataframe 1's count: 20
# dataframe 2's count: 30

随意使用它.

sdf_list[0][1].count()  # [0] returns the tuple - (<sdf identifier>, <sdf>)
# 20

sdf_list[0][1].show(2)
# etc...

假设您还希望所有spark数据帧都作为pandas数据帧.如果希望数据帧是动态的,则需要再次创建数据帧列表.或者只是使用索引访问spark数据帧.

# using indices
group1_pdf = sdf_list[0][1].toPandas()

# creating list of pandas dataframes
pdf_list = []

for (i, sdf) in sdf_list:
    pdf_list.append((i, sdf.toPandas()))  # (<filter/group identifier>, <pandas dataframe>)

type(pdf_list)
# list

type(pdf_list[0])
# tuple

type(pdf_list[0][1])
# pandas.core.frame.DataFrame

使用词典

我们还可以使用字典来存储数据帧,并使用键跟踪数据帧.因此,键可以用作数据帧名称.

sdf_dict = {}

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_dict['group'+str(i)] = filtered_sdf
    del filtered_sdf

字典将具有可以使用键访问的数据帧.我们只需打印前2个键,然后判断我们有哪些值.

list(sdf_dict.keys())[:2]
# ['group1', 'group2']

sdf_dict['group1']
# DataFrame[col1: bigint, dt: date, col3: bigint]

sdf_dict['group1'].count()
# 20

您可以 Select 迭代dict键并使用spark数据帧.

for sdf_key in list(sdf_dict.keys())[:2]:
    print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())

# group1's record count: 20
# group2's record count: 30

您可以查看type()以更好地理解.

type(sdf_dict)
# dict

type(sdf_dict['group1'])
# pyspark.sql.dataframe.DataFrame

转换为Pandas 数据帧将很简单

# single df manually
group1_pdf = sdf_dict['group1'].toPandas()

# with iteration
pdf_dict = {}

for sdf_key in sdf_dict.keys():
    pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()

type(pdf_dict)
# dict

type(pdf_dict['group1'])
# pandas.core.frame.DataFrame

Python相关问答推荐

在Polars(Python库)中将二进制转换为具有非UTF-8字符的字符串变量

切片包括面具的第一个实例在内的眼镜的最佳方法是什么?

为什么默认情况下所有Python类都是可调用的?

log 1 p numpy的意外行为

如何在Django基于类的视图中有效地使用UTE和RST HTIP方法?

将pandas Dataframe转换为3D numpy矩阵

无法使用DBFS File API路径附加到CSV In Datricks(OSError Errno 95操作不支持)

如何将多进程池声明为变量并将其导入到另一个Python文件

计算每个IP的平均值

如何根据一列的值有条件地 Select 前N个组,然后按两列分组?

使用NeuralProphet绘制置信区间时出错

SQLAlchemy bindparam在mssql上失败(但在mysql上工作)

与命令行相比,相同的Python代码在Companyter Notebook中运行速度慢20倍

在单次扫描中创建列表

为什么调用函数的值和次数不同,递归在代码中是如何工作的?

当单元测试失败时,是否有一个惯例会抛出许多类似的错误消息?

如何在Python Pandas中填充外部连接后的列中填充DDL值

当输入是字典时,`pandas. concat`如何工作?

在第一次调用时使用不同行为的re. sub的最佳方式

用考克斯回归的生存分析系列的真值是模棱两可的.