我有一个方法可以在Dataset行上执行多个验证判断.

public boolean validationEtablissement(HistoriqueExecution historique, Row etablissement, boolean actifsSeulement, boolean nomenclaturesNAF2Valides, Map<String, Integer> indexs) {
   String siret = getString(indexs, etablissement, "siret");
   String siren = getString(indexs, etablissement, "siren");
   Boolean diffusable = getBoolean(indexs, etablissement, "diffusable");
   String nomenclatureAPE = getString(indexs, etablissement, "nomenclatureActivitePrincipale");
   String code = getString(indexs, etablissement, "activitePrincipale");
   String typeVoie = getString(indexs, etablissement, "typeDeVoie");
   String typeVoieSecondaire = getString(indexs, etablissement, "typeDeVoieSecondaire");
   String dateCreation = getString(indexs, etablissement, "dateCreationEtablissement");
   String dateDernierTraitement = getString(indexs, etablissement, "dateDernierTraitement");
   String dateDebutHistorisation = getString(indexs, etablissement, "dateDebutHistorisation");

   List<BooleanSupplier> validationsDemandees = new ArrayList<>();

   // Vérifier que le SIRET de l'établissement est correct.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> getString(indexs, etablissement, "siret") == null,
      "etablissement sans SIRET, peut être étranger : '{}', écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger")}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIRET(siret).valide() == false,
      "etablissement au SIRET '{}' invalide, écarté.", () -> new Object[]{siret}));

   // Vérifier que le SIREN de l'entreprise mentionnée par l'établissement est correct.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> siren == null,
      "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger"), siret}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIREN(siren).valide() == false,
      "etablissement au SIREN d'entreprise '{}' invalide, écarté.", () -> new Object[]{siren}));

   if (nomenclaturesNAF2Valides) {
      // La nomenclature du code APE doit être NAFRev2.
      validationsDemandees.add(() -> valider(historique, etablissement, e -> nomenclatureAPE == null,
         "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", () -> new Object[]{siret}));

      validationsDemandees.add(() -> valider(historique, etablissement, e -> "NAFRev2".equals(nomenclatureAPE) == false,
         "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.", () -> new Object[]{siret, nomenclatureAPE}));

      // Le code APE doit être valide.
      validationsDemandees.add(() -> valider(historique, etablissement, e -> code == null,
         "établissement de SIRET {} écarté : il n'a pas de code APE.", () -> new Object[]{siret}));

      validationsDemandees.add(() -> valider(historique, etablissement, e -> new CodeAPE(code).valide(false) == false,
         "établissement de SIRET {} écarté : son code APE {} est invalide.", () -> new Object[]{siret, code}));
   }

   // Si le type de voie des adresses sont alimentés, vérifier qu'ils sont valides.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoie, diffusable),
      "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide", () -> new Object[]{siret, typeVoie}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoieSecondaire, diffusable),
      "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide", () -> new Object[]{siret, typeVoieSecondaire}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateCreation) != null,
      "établissement de SIRET {} écarté : sa date de création, {}, est invalide", () -> new Object[]{siret, dateCreation}));

   // Contrôler le format des dates
   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateTimeInvalide(dateDernierTraitement) != null && dateInvalide(dateDernierTraitement) != null,
      "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}", () -> new Object[]{siret, dateDernierTraitement, dateInvalide(dateDernierTraitement)}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateDebutHistorisation) != null,
      "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide", () -> new Object[]{siret, dateDebutHistorisation}));

   return validationsDemandees.stream().allMatch(BooleanSupplier::getAsBoolean);
}

For each checking that fails, I send a warning.
But I also wish to increase, in a longAccumulator, the number of problems of that kind I've encountered during the validation.

private boolean valider(HistoriqueExecution historique, Row row, Predicate<Row> conditionEchec, String warningFormat, Supplier<Object[]> arguments) {
   if (conditionEchec.test(row)) {
      LOGGER.warn(warningFormat, arguments.get());

      if (historique != null) {
         historique.incrementerOccurrences(warningFormat, true);
      }

      return false;
   }

   return true;
}

为此,我使用了HistoriqueExecution个类:

public class HistoriqueExecution implements Serializable {
   /** Accumulateurs d'erreurs associés. */
   private final Map<String, LongAccumulator> accumulators = new HashMap<>();

   /** Spark session */
   private SparkSession session;

   /**
    * Construire un historique d'exécution.
    */
   public HistoriqueExecution() {
   }

   /**
    * Construire un historique d'exécution.
    * @param session Session Spark.
    */
   public HistoriqueExecution(SparkSession session) {
      this.session = session;
   }

   /**
    * Construire un historique d'exécution.
    * @param session Session Spark.
    * @param codesMessages Code des messages qui pourront être émis.
    */
   public HistoriqueExecution(SparkSession session, String... codesMessages) {
      this(session);

      for(String codeMessage : codesMessages) {
         accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
      }
   }

   /**
    * Incrémenter l'occurence d'un code ou format de message.
    * @param session Session Spark.
    * @param codeOuFormatMessage Code ou Format de message
    * @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
    */
   public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) {
      LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

      if (accumulator == null && creerSiAbsent) {
         accumulator = accumulators.put(codeOuFormatMessage, session.sparkContext().longAccumulator(codeOuFormatMessage));
      }

      if (accumulator != null) {
         accumulator.add(1);
      }
   }

   /**
    * Incrémenter l'occurence d'un code ou format de message.
    * @param codeOuFormatMessage Code ou Format de message
    * @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
    */
   public void incrementerOccurrences(String codeOuFormatMessage, boolean creerSiAbsent) {
      LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

      if (accumulator == null && creerSiAbsent && this.session != null) {
         accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));
      }

      if (accumulator != null) {
         accumulator.add(1);
      }
   }

   /**
    * Fixer les codes messages qui pourront être émis.
    * @param session Session Spark.
    * @param codesMessages Code des messages qui pourront être émis.
    */
   public void setCodesMessages(SparkSession session, String... codesMessages) {
      setSparkSession(session);

      for(String codeMessage : codesMessages) {
         accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
      }
   }

   /**
    * Dumper en log les notifications accumulées.
    * @param log Logger.
    */
   public void dumpNotifications(Logger log) {
      if (this.accumulators.isEmpty()) {
         log.info("Il n'y a aucune notification dans l'historique d'exécution");
      }

      for(Map.Entry<String, LongAccumulator> rapport : getNotifications().entrySet()) {
         log.info("{} : {}", rapport.getKey(), rapport.getValue().count());
      }
   }
}

但我有个问题.

如果我用一组可接受的消息代码初始化HistoriqueExecution,它将在数据集提交给Spark时计数,

我的意思是:对于Spark来说,接收数据集并将其视为一堆懒惰的事情,但它还没有try 构建它,它可以工作.

然后,当它try 构建它时,它会传递以下语句:

LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

始终查找每个消息代码,并增加其计数器.

public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) {
   if (historiqueExecution != null) {
      historiqueExecution.setCodesMessages(this.session,
         "etablissement sans SIRET, peut être étranger : '{}'",
         "etablissement au SIRET '{}' invalide, écarté.",
         "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.",
         "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
         "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
         "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
         "établissement de SIRET {} écarté : il n'a pas de code APE.",
         "établissement de SIRET {} écarté : son code APE {} est invalide.",
         "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide",
         "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}",
         "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");
   }

   Supplier<Dataset<Row>> worker = () -> {
      // [...Creating the dataset...]
      etablissements = etablissements.filter(
         (FilterFunction<Row>) etablissement -> this.validator.validationEtablissement(historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));

      return etablissements;
   };

   return constitutionStandard(options, () -> worker.get()
         .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
      new CacheParqueteur<>(options, this.session,
         "etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET,
         anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
}

和日志(log)(如果我使用dumpNotifications):

etablissement au SIRET '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0
etablissement sans SIRET, peut être étranger : '{}' : 0
établissement de SIRET {} écarté : son code APE {} est invalide. : 0
établissement de SIRET {} écarté : il n'a pas de code APE. : 0
établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 1
établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0
etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0
établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0

但如果我不用消息代码初始化我的HistoriqueExecution

historiqueExecution.setCodesMessages(this.session);

Spark必须在这Dataset个计数器的实际创建过程中,在需要时通过以下语句请求创建每个缺失计数器:

accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));

但后来它在NullPointerException分的时候失败了.

Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null

org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 57.0 failed 1 times, most recent failure: Lost task 12.0 in stage 57.0 (TID 2913) (192.168.1.153 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null
    at fr.ecoemploi.adapters.outbound.spark.dataset.core.HistoriqueExecution.incrementerOccurrences(HistoriqueExecution.java:169)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.valider(EtablissementRowValidator.java:137)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.lambda$validationEtablissement$26(EtablissementRowValidator.java:91)
    at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
    at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
    at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.validationEtablissement(EtablissementRowValidator.java:107)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementDataset.lambda$rowEtablissements$420108c5$1(EtablissementDataset.java:160)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:385)
    at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

SparkSession不是空的,从一开始就没有改变,但提供了null上下文()?

Fearing an hidden serialization issue, I've made a try, removing the private SparkSession session; member variable of HistoriqueExecution (in case it wouldn't be serializable)
and passing that SparkSession as a parameter all along the methods calls.
But the problem is still here.

推荐答案

SparkContext/SparkSession不存在于执行器上,您的代码假设驱动程序上有一个可从执行器 node 访问的实例,但事实并非如此.

(侧面注意,像这样验证每一行都会对性能产生影响,您希望尽可能多地推送到SPARK SQL/DSL层-Quality可以做到这一点)

为了计算失败次数,您必须执行两次遍历(尽管您可以缓存结果数据集--如果结果对您有价值,我建议将其写出来),一次是创建一个PASS/FAIL列,另一次是对结果求和.

您还可以 for each 行验证创建将消息发送回驱动程序的插件,但执行两次传递要容易得多.

Java相关问答推荐

使用json参数通过单击jSP文件中的按钮来运行server时出现问题

基于仅存在于父级中的字段查询子文档?

int Array Stream System. out. print方法在打印Java8时在末尾添加% sign

如何打印本系列的第n项y=-(1)-(1+2)+(1+2+3)+(1+2+3+4)-(1+2+3+4+5)...Java中的(1+2+3+4...+n)

连接Quarkus中的两个异步操作

Spark忽略Iceberg Nessie目录

Java流传输一个列表并创建单个对象

当Volatile关键字真的是必要的时候?

通过Spring Security公开Spring Boot执行器端点

WebSockets和Spring Boot安全性出现错误401

Domino Designer 14中的保存代理添加了重影库

在macOS上读取文件会导致FileNotFound,即使文件存在(并且具有权限)

如何在运行docker的应用程序中获取指定的配置文件

泛型与泛型问题的完美解决方案?

判断重复的两个二维表算法?

如何在Maven Central上部署?

TinyDB问题,无法解析符号';上下文&

如何在JSP中从select中获取值并将其放入另一个select

如何转换Vector<;对象>;转换为int?

睡眠在 Spring Boot 中