我有一个简单的Java/Spark应用程序,我try 使用Iceberg的NESSIE目录将Iceberg格式的CSV数据推送到我的本地运行存储(MinIO).
这是我的完整代码(阅读csv-&>;创建表格-&>写入表格):
package com.comarch;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class SparkIcebergNessie {
private static final String WAREHOUSE_PATH = "s3a://data/test";
private static final String NAMESPACE_NAME = "default";
private static final String TABLE_NAME = "twamp";
private static final String TABLE_PATH = WAREHOUSE_PATH + "/" + NAMESPACE_NAME + "/" + TABLE_NAME;
public static void main(String[] args) {
SparkSession spark = createSparkSession();
Dataset<Row> csv = spark.read()
.option("inferSchema","true")
.option("delimiter",",")
.option("header","true")
.csv("src/main/resources/csv/twamp.csv");
try {
// Create table
HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
Schema tableSchema = SparkSchemaUtil.convert(csv.schema());
PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema).build();
tables.create(tableSchema, partitionSpec, TABLE_PATH);
// Write data to table
csv.write().format("iceberg").mode(SaveMode.Append).save(TABLE_PATH);
System.out.println("END");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static SparkSession createSparkSession() {
return SparkSession.builder()
.appName("Java Spark Iceberg Example")
.master("local")
.config("spark.ui.enabled", "false")
//Filesystem config
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("fs.s3a.access.key", "VUtRVIf0hg7szCp3k0Pz")
.config("fs.s3a.secret.key", "lHzYClEjh2AH5mEfRdPS720pMl3UZl7riR3uL4pL")
.config("spark.sql.warehouse.dir", WAREHOUSE_PATH)
//Nessie catalog config
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.13:0.77.1")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
.config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
.config("spark.sql.catalog.nessie.authentication.type", "NONE")
.config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v1")
.config("spark.sql.catalog.nessie.ref", "main")
.config("spark.sql.defaultCatalog", "nessie")
.config("spark.sql.catalog.nessie.ref", "main")
.config("spark.sql.catalog.nessie.warehouse", "/test")
.getOrCreate();
}
}
代码成功执行(我可以在MinIO中看到数据),但在Nessie中什么都没有发生(没有新分支,没有提交,什么都没有).
我试着try 使用尼西目录属性,但都不起作用.例如,当我删除最后一个"spk.sql.Catalog.Nesssie.Warehouse"属性时,我收到一个错误:
线程"main"java.lang.IlLegalStateException中出现异常:未设置参数"WAREARY",Nessie无法存储数据.
所以感觉Spark确实在try 使用(或使用)这个目录,但是为什么在代码执行后我看不到任何元数据?