关于Spark的小问题.
我想要实现的目标非常简单:
我的期望是: Select 10行,结果是第1、2、3、4、5、6、7、8、9、10行.
注意,我的操作OP有正确的日志(log)记录和正确的KPI.
因此,我try 了以下方法:
public static void main(String[] args) {
final String query = "SELECT TOP(10) id, last_name, first_name FROM mytable WHERE ...";
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Properties dbConnectionProperties = new Properties();
dbConnectionProperties.putAll([...]);
final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());
LOGGER.info("the count is expected to be 10 " + topTenDataSetAfterMap.count() + topTenDataSetAfterMap.showString(100000, 1000000, false));
sparkSession.stop();
}
有了这段代码,会有一个奇怪的结果.
topTenDataSet.show();
和topTenDataSetAfterMap.count()
都显示了10行,快乐.
但我看看Op performOperationWithLogAndKPI
行动的日志(log),我可以看到超过10个日志(log),超过10个指标.意思是,我可以看到执行器1执行10次操作,但执行器2也执行10次操作,以此类推.
似乎每个执行器都会运行自己的"从数据库中 Select TOP(10)",并在每个数据集上应用map函数.
我可以问一下:我在代码中犯了什么错误吗?
我的理解不对吗?
如何实现预期的结果,查询一次,并让每个执行者对结果集的一部分应用一个函数?
非常感谢.