我来自Pandas 的背景,习惯于将CSV文件中的数据读取到数据框中,然后使用简单的命令将列名更改为有用的内容:

df.columns = new_column_name_list

但是,在使用sqlContext创建的pyspark数据帧中,这一点不起作用.

df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)

这基本上是两次定义变量,首先推断模式,然后重命名列名,然后用更新的模式再次加载数据帧.

有没有更好、更有效的方法来做这件事,就像我们在Pandas 身上做的那样?

我的spark版本是1.5.0

推荐答案

有很多方法可以做到这一点:

  • 选项1.使用selectExpr.

     data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                       ["Name", "askdaosdka"])
     data.show()
     data.printSchema()
    
     # Output
     #+-------+----------+
     #|   Name|askdaosdka|
     #+-------+----------+
     #|Alberto|         2|
     #| Dakota|         2|
     #+-------+----------+
    
     #root
     # |-- Name: string (nullable = true)
     # |-- askdaosdka: long (nullable = true)
    
     df = data.selectExpr("Name as name", "askdaosdka as age")
     df.show()
     df.printSchema()
    
     # Output
     #+-------+---+
     #|   name|age|
     #+-------+---+
     #|Alberto|  2|
     #| Dakota|  2|
     #+-------+---+
    
     #root
     # |-- name: string (nullable = true)
     # |-- age: long (nullable = true)
    
  • Select 2.使用withColumnRenamed,请注意,此方法允许"覆盖"同一列.对于Python3,将xrange替换为range.

     from functools import reduce
    
     oldColumns = data.schema.names
     newColumns = ["name", "age"]
    
     df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
     df.printSchema()
     df.show()
    
  • 备选方案3.使用 alias,在Scala中也可以使用as.

     from pyspark.sql.functions import col
    
     data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
     data.show()
    
     # Output
     #+-------+---+
     #|   name|age|
     #+-------+---+
     #|Alberto|  2|
     #| Dakota|  2|
     #+-------+---+
    
  • Select 4.使用sqlContext.sql,可以对注册为表的DataFrames使用SQL查询.

     sqlContext.registerDataFrameAsTable(data, "myTable")
     df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
    
     df2.show()
    
     # Output
     #+-------+---+
     #|   name|age|
     #+-------+---+
     #|Alberto|  2|
     #| Dakota|  2|
     #+-------+---+
    

Python相关问答推荐

连接两个具有不同标题的收件箱

为什么tkinter框架没有被隐藏?

在Polars(Python库)中将二进制转换为具有非UTF-8字符的字符串变量

如何让Flask 中的请求标签发挥作用

当从Docker的--env-file参数读取Python中的环境变量时,每个\n都会添加一个\'.如何没有额外的?

try 将一行连接到Tensorflow中的矩阵

使用Python从URL下载Excel文件

让函数调用方程

使用Python查找、替换和调整PDF中的图像'

Python全局变量递归得到不同的结果

numpy.unique如何消除重复列?

在不同的帧B中判断帧A中的子字符串,每个帧的大小不同

基于Scipy插值法的三次样条系数

不允许 Select 北极滚动?

freq = inject在pandas中做了什么?''它与freq = D有什么不同?''

使用polars. pivot()旋转一个框架(类似于R中的pivot_longer)

获取git修订版中每个文件的最后修改时间的最有效方法是什么?

为什么我只用exec()函数运行了一次文件,而Python却运行了两次?

启动线程时,Python键盘模块冻结/不工作

极点用特定值替换前n行