我有一个简单的Java/Spark应用程序,我try 使用Iceberg的NESSIE目录将Iceberg格式的CSV数据推送到我的本地运行存储(MinIO).

这是我的完整代码(阅读csv-&>;创建表格-&>写入表格):

package com.comarch;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class SparkIcebergNessie {
    private static final String WAREHOUSE_PATH = "s3a://data/test";
    private static final String NAMESPACE_NAME = "default";
    private static final String TABLE_NAME = "twamp";
    private static final String TABLE_PATH = WAREHOUSE_PATH + "/" + NAMESPACE_NAME + "/" + TABLE_NAME;

    public static void main(String[] args) {
        SparkSession spark = createSparkSession();

        Dataset<Row> csv = spark.read()
                .option("inferSchema","true")
                .option("delimiter",",")
                .option("header","true")
                .csv("src/main/resources/csv/twamp.csv");

        try {
            // Create table
            HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
            Schema tableSchema = SparkSchemaUtil.convert(csv.schema());
            PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema).build();
            tables.create(tableSchema, partitionSpec, TABLE_PATH);

            // Write data to table
            csv.write().format("iceberg").mode(SaveMode.Append).save(TABLE_PATH);
            System.out.println("END");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static SparkSession createSparkSession() {
        return SparkSession.builder()
                .appName("Java Spark Iceberg Example")
                .master("local")
                .config("spark.ui.enabled", "false")

                //Filesystem config
                .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
                .config("fs.s3a.endpoint", "http://127.0.0.1:9000")
                .config("fs.s3a.access.key", "VUtRVIf0hg7szCp3k0Pz")
                .config("fs.s3a.secret.key", "lHzYClEjh2AH5mEfRdPS720pMl3UZl7riR3uL4pL")
                .config("spark.sql.warehouse.dir", WAREHOUSE_PATH)

                //Nessie catalog config
                .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.13:0.77.1")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
                .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
                .config("spark.sql.catalog.nessie.authentication.type", "NONE")
                .config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v1")
                .config("spark.sql.catalog.nessie.ref", "main")
                .config("spark.sql.defaultCatalog", "nessie")
                .config("spark.sql.catalog.nessie.ref", "main")
                .config("spark.sql.catalog.nessie.warehouse", "/test")
                .getOrCreate();
    }
}

代码成功执行(我可以在MinIO中看到数据),但在Nessie中什么都没有发生(没有新分支,没有提交,什么都没有).

我试着try 使用尼西目录属性,但都不起作用.例如,当我删除最后一个"spk.sql.Catalog.Nesssie.Warehouse"属性时,我收到一个错误:

线程"main"java.lang.IlLegalStateException中出现异常:未设置参数"WAREARY",Nessie无法存储数据.

所以感觉Spark确实在try 使用(或使用)这个目录,但是为什么在代码执行后我看不到任何元数据?

推荐答案

试试这个:

spark.sql("CREATE DATABASE IF NOT EXISTS default");

Dataset<Row> csv =
    spark
      .read()
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .option("header", "true")
      .csv("src/main/resources/csv/twamp.csv");
csv.writeTo("default.twamp").createOrReplace();

Java相关问答推荐

Gmail Javi API批量处理太多请求

将具有多个未知字段的SON映射到Java POJO

如何使用CSS为选定但未聚焦的表格行设置背景 colored颜色 ?

在Java 8之后,HashMap的最坏情况下时间复杂度仍然是O(n)而不是O(log n)?

为什么Java的代码工作(if condition内部的实例)

为什么JAVA&S清洁器使用链表而不是并发HashSet?

Hibernate EmptyInterceptor可以工作,但不能拦截器

Spring data JPA/Hibernate根据id获取一个列值

Mapstruct不能正确/完全映射属性

安装Java Jar应用程序的Install4j遇到ClassNotFoundException的运行时错误

通过移动一个类解决了潜在的StubbingProblem.它怎麽工作?

与不同顺序的组进行匹配,不重复组但分开

Java构造函数分支

如何在JavaFX中制作鼠标透明stage

无法播放音频:从资源加载库GStreamer-Lite失败

try 使用预准备语句占位符获取信息时出现Try-With-Resources错误

使用for循环时出现堆栈溢出错误,但如果使用if块执行相同的操作,则不会产生错误

如何在运行docker的应用程序中获取指定的配置文件

在Java中将对象&转换为&q;HashMap(&Q)

将Optionals/null安全添加到嵌套的flatMap/流