关于Spark的小问题.

我想要实现的目标非常简单:

我的期望是: Select 10行,结果是第1、2、3、4、5、6、7、8、9、10行.

注意,我的操作OP有正确的日志(log)记录和正确的KPI.

因此,我try 了以下方法:

 public static void main(String[] args) {
        final String query = "SELECT TOP(10) id, last_name, first_name FROM mytable WHERE ...";
        final SparkSession sparkSession = SparkSession.builder().getOrCreate();
        final Properties   dbConnectionProperties   = new Properties();
        dbConnectionProperties.putAll([...]);

        final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
        topTenDataSet.show();
        
        final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

        LOGGER.info("the count is expected to be 10 " + topTenDataSetAfterMap.count() + topTenDataSetAfterMap.showString(100000, 1000000, false));
        sparkSession.stop();
    }

有了这段代码,会有一个奇怪的结果.

topTenDataSet.show();topTenDataSetAfterMap.count()都显示了10行,快乐.

但我看看Op performOperationWithLogAndKPI 行动的日志(log),我可以看到超过10个日志(log),超过10个指标.意思是,我可以看到执行器1执行10次操作,但执行器2也执行10次操作,以此类推.

似乎每个执行器都会运行自己的"从数据库中 Select TOP(10)",并在每个数据集上应用map函数.

我可以问一下:我在代码中犯了什么错误吗?

我的理解不对吗?

如何实现预期的结果,查询一次,并让每个执行者对结果集的一部分应用一个函数?

非常感谢.

推荐答案

如果试图在同一数据集上执行多个操作,请try cache it.这样," Select 前10名结果"查询只应执行一次:

final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.cache();
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

进一步信息here

Java相关问答推荐

Java Stream,需要更新列表对象列表

Java函数式编程中的双值单值映射

使用意向过滤器从另一个应用程序启动服务

如何使用Maven和Spring Boot将构建时初始化、跟踪类初始化正确传递到本机编译

暂停计时器

为什么Java编译器不区分不同类型的方法?

使用UTC时区将startDatetime转换为本地时间

如何将Pane的图像快照保存为BMP?

舰队运行配置Maven版本

根本不显示JavaFX阿拉伯字母

一对多关系和ID生成

如何根据配置动态创建N个bean

对角线填充二维数组

为什么相同的数据条码在视觉上看起来不同?

将BlockingQueue+守护程序线程替换为执行器

为什么使用lo索引来解决二进制搜索问题不同于使用hi索引?

如何使用Hibernate v6.2构建NamingStrategy,以表名作为所有列的前缀?

为什么我得到默认方法的值而不是被覆盖的方法的值?

如何转换Vector<;对象>;转换为int?

固定时钟和 LocalDateTime