我有一个方法可以在Dataset
行上执行多个验证判断.
public boolean validationEtablissement(HistoriqueExecution historique, Row etablissement, boolean actifsSeulement, boolean nomenclaturesNAF2Valides, Map<String, Integer> indexs) {
String siret = getString(indexs, etablissement, "siret");
String siren = getString(indexs, etablissement, "siren");
Boolean diffusable = getBoolean(indexs, etablissement, "diffusable");
String nomenclatureAPE = getString(indexs, etablissement, "nomenclatureActivitePrincipale");
String code = getString(indexs, etablissement, "activitePrincipale");
String typeVoie = getString(indexs, etablissement, "typeDeVoie");
String typeVoieSecondaire = getString(indexs, etablissement, "typeDeVoieSecondaire");
String dateCreation = getString(indexs, etablissement, "dateCreationEtablissement");
String dateDernierTraitement = getString(indexs, etablissement, "dateDernierTraitement");
String dateDebutHistorisation = getString(indexs, etablissement, "dateDebutHistorisation");
List<BooleanSupplier> validationsDemandees = new ArrayList<>();
// Vérifier que le SIRET de l'établissement est correct.
validationsDemandees.add(() -> valider(historique, etablissement, e -> getString(indexs, etablissement, "siret") == null,
"etablissement sans SIRET, peut être étranger : '{}', écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger")}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIRET(siret).valide() == false,
"etablissement au SIRET '{}' invalide, écarté.", () -> new Object[]{siret}));
// Vérifier que le SIREN de l'entreprise mentionnée par l'établissement est correct.
validationsDemandees.add(() -> valider(historique, etablissement, e -> siren == null,
"etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger"), siret}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIREN(siren).valide() == false,
"etablissement au SIREN d'entreprise '{}' invalide, écarté.", () -> new Object[]{siren}));
if (nomenclaturesNAF2Valides) {
// La nomenclature du code APE doit être NAFRev2.
validationsDemandees.add(() -> valider(historique, etablissement, e -> nomenclatureAPE == null,
"établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", () -> new Object[]{siret}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> "NAFRev2".equals(nomenclatureAPE) == false,
"établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.", () -> new Object[]{siret, nomenclatureAPE}));
// Le code APE doit être valide.
validationsDemandees.add(() -> valider(historique, etablissement, e -> code == null,
"établissement de SIRET {} écarté : il n'a pas de code APE.", () -> new Object[]{siret}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> new CodeAPE(code).valide(false) == false,
"établissement de SIRET {} écarté : son code APE {} est invalide.", () -> new Object[]{siret, code}));
}
// Si le type de voie des adresses sont alimentés, vérifier qu'ils sont valides.
validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoie, diffusable),
"établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide", () -> new Object[]{siret, typeVoie}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoieSecondaire, diffusable),
"établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide", () -> new Object[]{siret, typeVoieSecondaire}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateCreation) != null,
"établissement de SIRET {} écarté : sa date de création, {}, est invalide", () -> new Object[]{siret, dateCreation}));
// Contrôler le format des dates
validationsDemandees.add(() -> valider(historique, etablissement, e -> dateTimeInvalide(dateDernierTraitement) != null && dateInvalide(dateDernierTraitement) != null,
"établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}", () -> new Object[]{siret, dateDernierTraitement, dateInvalide(dateDernierTraitement)}));
validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateDebutHistorisation) != null,
"établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide", () -> new Object[]{siret, dateDebutHistorisation}));
return validationsDemandees.stream().allMatch(BooleanSupplier::getAsBoolean);
}
For each checking that fails, I send a warning.
But I also wish to increase, in a longAccumulator
, the number of problems of that kind I've encountered during the validation.
private boolean valider(HistoriqueExecution historique, Row row, Predicate<Row> conditionEchec, String warningFormat, Supplier<Object[]> arguments) {
if (conditionEchec.test(row)) {
LOGGER.warn(warningFormat, arguments.get());
if (historique != null) {
historique.incrementerOccurrences(warningFormat, true);
}
return false;
}
return true;
}
为此,我使用了HistoriqueExecution
个类:
public class HistoriqueExecution implements Serializable {
/** Accumulateurs d'erreurs associés. */
private final Map<String, LongAccumulator> accumulators = new HashMap<>();
/** Spark session */
private SparkSession session;
/**
* Construire un historique d'exécution.
*/
public HistoriqueExecution() {
}
/**
* Construire un historique d'exécution.
* @param session Session Spark.
*/
public HistoriqueExecution(SparkSession session) {
this.session = session;
}
/**
* Construire un historique d'exécution.
* @param session Session Spark.
* @param codesMessages Code des messages qui pourront être émis.
*/
public HistoriqueExecution(SparkSession session, String... codesMessages) {
this(session);
for(String codeMessage : codesMessages) {
accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
}
}
/**
* Incrémenter l'occurence d'un code ou format de message.
* @param session Session Spark.
* @param codeOuFormatMessage Code ou Format de message
* @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
*/
public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) {
LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
if (accumulator == null && creerSiAbsent) {
accumulator = accumulators.put(codeOuFormatMessage, session.sparkContext().longAccumulator(codeOuFormatMessage));
}
if (accumulator != null) {
accumulator.add(1);
}
}
/**
* Incrémenter l'occurence d'un code ou format de message.
* @param codeOuFormatMessage Code ou Format de message
* @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
*/
public void incrementerOccurrences(String codeOuFormatMessage, boolean creerSiAbsent) {
LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
if (accumulator == null && creerSiAbsent && this.session != null) {
accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));
}
if (accumulator != null) {
accumulator.add(1);
}
}
/**
* Fixer les codes messages qui pourront être émis.
* @param session Session Spark.
* @param codesMessages Code des messages qui pourront être émis.
*/
public void setCodesMessages(SparkSession session, String... codesMessages) {
setSparkSession(session);
for(String codeMessage : codesMessages) {
accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
}
}
/**
* Dumper en log les notifications accumulées.
* @param log Logger.
*/
public void dumpNotifications(Logger log) {
if (this.accumulators.isEmpty()) {
log.info("Il n'y a aucune notification dans l'historique d'exécution");
}
for(Map.Entry<String, LongAccumulator> rapport : getNotifications().entrySet()) {
log.info("{} : {}", rapport.getKey(), rapport.getValue().count());
}
}
}
但我有个问题.
如果我用一组可接受的消息代码初始化HistoriqueExecution
,它将在数据集提交给Spark时计数,
我的意思是:对于Spark来说,接收数据集并将其视为一堆懒惰的事情,但它还没有try 构建它,它可以工作.
然后,当它try 构建它时,它会传递以下语句:
LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
始终查找每个消息代码,并增加其计数器.
public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) {
if (historiqueExecution != null) {
historiqueExecution.setCodesMessages(this.session,
"etablissement sans SIRET, peut être étranger : '{}'",
"etablissement au SIRET '{}' invalide, écarté.",
"etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.",
"etablissement au SIREN d'entreprise '{}' invalide, écarté.",
"établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
"établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
"établissement de SIRET {} écarté : il n'a pas de code APE.",
"établissement de SIRET {} écarté : son code APE {} est invalide.",
"établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide",
"établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide",
"établissement de SIRET {} écarté : sa date de création, {}, est invalide",
"établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}",
"établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");
}
Supplier<Dataset<Row>> worker = () -> {
// [...Creating the dataset...]
etablissements = etablissements.filter(
(FilterFunction<Row>) etablissement -> this.validator.validationEtablissement(historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));
return etablissements;
};
return constitutionStandard(options, () -> worker.get()
.withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
new CacheParqueteur<>(options, this.session,
"etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET,
anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
}
和日志(log)(如果我使用dumpNotifications
):
etablissement au SIRET '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0
etablissement sans SIRET, peut être étranger : '{}' : 0
établissement de SIRET {} écarté : son code APE {} est invalide. : 0
établissement de SIRET {} écarté : il n'a pas de code APE. : 0
établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 1
établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0
etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0
établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0
但如果我不用消息代码初始化我的HistoriqueExecution
,
historiqueExecution.setCodesMessages(this.session);
Spark必须在这Dataset
个计数器的实际创建过程中,在需要时通过以下语句请求创建每个缺失计数器:
accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));
但后来它在NullPointerException
分的时候失败了.
Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null
个
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 57.0 failed 1 times, most recent failure: Lost task 12.0 in stage 57.0 (TID 2913) (192.168.1.153 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null
at fr.ecoemploi.adapters.outbound.spark.dataset.core.HistoriqueExecution.incrementerOccurrences(HistoriqueExecution.java:169)
at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.valider(EtablissementRowValidator.java:137)
at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.lambda$validationEtablissement$26(EtablissementRowValidator.java:91)
at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.validationEtablissement(EtablissementRowValidator.java:107)
at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementDataset.lambda$rowEtablissements$420108c5$1(EtablissementDataset.java:160)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:385)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
SparkSession
不是空的,从一开始就没有改变,但提供了null
上下文()?
Fearing an hidden serialization issue, I've made a try, removing the private SparkSession session;
member variable of HistoriqueExecution
(in case it wouldn't be serializable)
and passing that SparkSession
as a parameter all along the methods calls.
But the problem is still here.