我有一个与apache 电光和PostgreSQL的jdbc连接,我想将一些数据插入到我的数据库中.当我使用append模式时,我需要 for each DataFrame.Row指定id.电光有没有办法创建主键?

推荐答案

Scala:

如果您只需要唯一的数字,您可以使用zipWithUniqueId并重新创建DataFrame.首先是一些导入和虚拟数据:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

提取架构以供进一步使用:

val schema = df.schema

添加ID字段:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

创建数据帧:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

同样的事情在Python年里也是如此:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

如果你更喜欢连续数字,你可以用zipWithIndex代替zipWithUniqueId,但这要贵一点.

Directly with 100 API:

(universal Scala, Python, Java, R with pretty much the same syntax)

以前我错过了monotonicallyIncreasingId函数,只要不需要连续的数字,它就应该可以很好地工作:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

而有用的monotonicallyIncreasingId是非确定性的.当后续操作包含筛选器时,不仅ID可能因执行而不同,而且没有其他技巧,也不能用来标识行.

Note:

也可以使用rowNumber窗口函数:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

不幸的是:

警告窗口:没有为窗口操作定义分区!将所有数据移动到单个分区可能会导致严重的性能下降.

因此,除非您有一种对数据进行分区并确保唯一性的自然方法,否则此时不会特别有用.

Database相关问答推荐

删除Postgres中的JSONB列并在不停机的情况下回收空间

KUST查询指定时间跨度内里程表&值的差值,并将其滚动到0

DynamoDB 扫描 - 具有相同分区键的项目按顺序返回

使用mongoose 在嵌套对象中查找特定字段

mongodb聚合从另一个查询中获取值

管理数据库中的产品计数

我们如何使用 Hibernate 和 JPA 调用存储过程?

从 DbDataReader 读取数据的最快方法是什么?

如何将新架构添加到现有的 Visual Studio 数据库项目?

如何避免使用 LINQ-To-SQL 的内存泄漏?

用于 sql server 的免费国家、城市数据库

ODBC 与 JDBC 与 ADO.NET

将 XML 存储在数据库中是否不好?

单父实体的核心数据性能

在服务器上排序还是在客户端排序?

如何更改 MySQL DB 中所有表的前缀?

MySQL 整数 0 与 NULL

多币种 - 存储什么以及何时转换?

包含 8000 万条记录并添加索引的表需要超过 18 小时(或永远)!怎么办?

使用 oracle sql developer 从一个数据库复制到另一个数据库 - 连接失败