我们遇到了与Spark-CDM-Connector的兼容性问题,为了给出一点背景,我有一个ADLS格式的CDM数据,我正try 将其读取到数据库中

Databricks Runtime Version
12.1 (includes Apache Spark 3.3.1, Scala 2.12)

,我已经安装了com.microsoft.azure:spark-cdm-connector:0.19.1

我运行了以下代码:

AccountName = "<AccountName>"
container = "<container>"
account_key = "<account_key>"
Storage_Account = f"account={AccountName};key={account_key}"


# Implicit write case
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime
# Write a CDM entity with Parquet data files, entity definition is derived from the dataframe schema
data = [
  [1, "Alex", "Lai", "alex.lai@adatis.co.uk", "Consultant", "Delivery", datetime.strptime("2018-07-03", '%Y-%m-%d'), datetime.now()],
  [2, "James", "Russel", "james.russel@adatis.co.uk", "Senior Consultant", "Delivery", datetime.strptime("2014-05-14", '%Y-%m-%d'), datetime.now()]
]

schema = (StructType()
  .add(StructField("EmployeeId", StringType(), True))
  .add(StructField("FirstName", StringType(), True))
  .add(StructField("LastName", StringType(), True))
  .add(StructField("EmailAddress", StringType(), True))
  .add(StructField("Position", StringType(), True))
  .add(StructField("Department", StringType(), True))
  .add(StructField("HiringDate", DateType(), True))
  .add(StructField("CreatedDateTime", TimestampType(), True))
)

df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

display(df)


# Creates the CDM manifest and adds the entity to it with parquet partitions
# with both physical and logical entity definitions 
(df.write.format("com.microsoft.cdm")
  .option("storage", Storage_Account)
  .option("manifestPath", container + "<path to manifest.cdm.json file>")
  .option("entity", "Employee")
  .option("format", "parquet")
  .mode("overwrite")
  .save()
)

它抛出了这个错误:java.lang.NoClassDefFoundError:org/apache/spark/sql/sources/v2/ReadSupport.

Py4JJavaError                             Traceback (most recent call last)
File <command-2314057479770273>:3
      1 # Creates the CDM manifest and adds the entity to it with parquet partitions
      2 # with both physical and logical entity definitions 
----> 3 (df.write.format("com.microsoft.cdm")
      4   .option("storage", Storage_Account)
      5   .option("manifestPath", container + "<path to manifest.cdm.json file>")
      6   .option("entity", "Employee")
      7   .option("format", "parquet")
      8   .mode("overwrite")
      9   .save()
     10 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:1193, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1191     self.format(format)
   1192 if path is None:
-> 1193     self._jwrite.save()
   1194 else:
   1195     self._jwrite.save(path)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /databricks/spark/python/pyspark/sql/utils.py:209, in capture_sql_exception.<locals>.deco(*a, **kw)
    207 def deco(*a: Any, **kw: Any) -> Any:
    208     try:
--> 209         return f(*a, **kw)
    210     except Py4JJavaError as e:
    211         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o464.save.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader.loadClass(ClassLoaders.scala:65)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:717)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:717)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:717)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:781)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:988)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:293)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    ... 36 more

推荐答案

您使用的连接器版本不正确-0.19.1用于Spark 3.1,但您使用的是Spark 3.3.您需要try Release spark3.3-1.19.5,但它可能在Maven Central上不可用.

Python相关问答推荐

如何在Python中使用ijson解析SON期间检索文件位置?

Python无法在已导入的目录中看到新模块

跟踪我已从数组中 Select 的样本的最有效方法

如何使用scipy从频谱图中回归多个高斯峰?

Pytest两个具有无限循环和await命令的Deliverc函数

沿着数组中的轴计算真实条目

运行终端命令时出现问题:pip start anonymous"

为什么sys.exit()不能与subproccess.run()或subprocess.call()一起使用

Julia CSV for Python中的等效性Pandas index_col参数

如果条件不满足,我如何获得掩码的第一个索引并获得None?

网格基于1.Y轴与2.x轴显示在matplotlib中

如何创建引用列表并分配值的Systemrame列

循环浏览每个客户记录,以获取他们来自的第一个/最后一个渠道

BeautifulSoup-Screper有时运行得很好,很健壮--但有时它失败了::可能这里需要一些更多的异常处理?

什么是一种快速而优雅的方式来转换一个包含一串重复的列,而不对同一个值多次运行转换,

如何使用Azure Function将xlsb转换为xlsx?

在Django中重命名我的表后,旧表中的项目不会被移动或删除

Scipy差分进化:如何传递矩阵作为参数进行优化?

类型对象';敌人';没有属性';损害';

PYTHON中的selenium不会打开 chromium URL