在本例中,try 使用内置的pyspark函数,如stack
和unnest,该 struct 将作为新列添加.
100
from pyspark.sql.functions import *
json = """{"labels1":{"A":1,"B":2, "C":3},"labels2":{"A":1,"B":2, "C":3}}"""
df = spark.read.json(sc.parallelize([json]), multiLine=True)
df.select(expr("stack(2,'labels1',labels1,'labels2',labels2)")).\
select(col("col0").alias("tagname"),col("col1.*")).\
select("tagname",expr("stack(3,'A',A,'B',B,'C',C) as (key,value)")).show()
#+-------+---+-----+
#|tagname|key|value|
#+-------+---+-----+
#|labels1| A| 1|
#|labels1| B| 2|
#|labels1| C| 3|
#|labels2| A| 1|
#|labels2| B| 2|
#|labels2| C| 3|
#+-------+---+-----+
使用100函数的另一种方式:
df.withColumn("n",lit(1)).\
unpivot("n",["labels1", "labels2"],"new","new1").select(col("new").alias("tagname"),col("new1.*")).\
unpivot("tagname",["A","B","C"],"key","value").\
show()