最近,我将我的云数据流应用程序从Java 11升级到Java 17及其相应的依赖项.应用程序运行得很好,甚至测试用例也运行得很好.我还将我的ApacheBEAM版本从2.35.0升级到2.49.0.

但是,在其中一个定制类RedisWriteIO中,有一些更改,现在测试没有传入新的代码覆盖率.

RedisWriteIO

package com.example.dataflow.io.redis;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class RedisWriteIO {

    public static Write write() {
        return (new AutoValue_RedisWriteIO_Write.Builder())
                .setConnectionConfiguration(CustomRedisConfigurations.create()).build();
    }

    @AutoValue
    public abstract static class Write extends PTransform<PCollection<KV<String,String>>, PDone> {
        public Write() {
        }

        @Nullable
        abstract CustomRedisConfigurations connectionConfiguration();

        @Nullable
        abstract Long expireTime();

        abstract Builder toBuilder();

        public Write withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host can not be null");
            Preconditions.checkArgument(port > 0, "port can not be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public Write withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth can not be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public Write withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout can not be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public Write withConnectionConfiguration(CustomRedisConfigurations connection) {
            Preconditions.checkArgument(connection != null, "connection can not be null");
            return this.toBuilder().setConnectionConfiguration(connection).build();
        }

        public Write withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis can not be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<KV<String, String>> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
            input.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class WriteFn extends DoFn<KV<String, String>, Void>{
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final RedisWriteIO.Write spec;
            private transient Jedis jedis;
            private transient @Nullable Transaction transaction;

            private int batchCount;

            public WriteFn(RedisWriteIO.Write spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                jedis = spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                transaction = jedis.multi();
                batchCount = 0;
            }
            @ProcessElement
            public void processElement(DoFn<KV<String, String>, Void>.ProcessContext c) {

                KV<String, String> record = c.element();

                String fieldKey = record.getKey();
                String fieldValue = record.getValue();

                transaction.sadd(fieldKey,fieldValue);

                batchCount++;

                if (batchCount >= DEFAULT_BATCH_SIZE) {
                    transaction.exec();
                    transaction.multi();
                    batchCount = 0;
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (batchCount > 0) {
                    transaction.exec();
                }
                if (transaction != null) {
                    transaction.close();
                }
                transaction = null;
                batchCount = 0;
            }

            @Teardown
            public void teardown() {
                jedis.close();
            }
        }

        @AutoValue.Builder
        abstract static class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(CustomRedisConfigurations connectionConfiguration);

            abstract Builder setExpireTime(Long expireTimeMillis);

            abstract Write build();

        }
    }
}

测试类如下:

package com.example.dataflow.io.redis;

import com.github.fppt.jedismock.RedisServer;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;

import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.*;


public class RedisWriteIOTest {

    private static final String REDIS_HOST = "localhost";
    private static final String[] INPUT_DATA = new String[]{
            "123456789",
            "Bruce",
            "Wayne"
    };

    @Mock
    static SSLSocketFactory socketFactory;
    private static RedisServer server;
    private static int port;

    @Mock
    private static Jedis jedis;

    @Mock
    private Transaction transaction;

    private int batchCount;

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Mock
    CustomRedisConfigurations connection;

    @Mock
    DoFn.OutputReceiver<KV<String, String>> out;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks(this);
        when(connection.connect()).thenReturn(jedis);
        when(jedis.multi()).thenReturn(transaction);
        batchCount = 0;
    }


    @BeforeClass
    public static void beforeClass() throws Exception {
        server = RedisServer.newRedisServer(8000);
        server.start();
        port = server.getBindPort();
        jedis = new Jedis(server.getHost(), server.getBindPort());
    }

    @AfterClass
    public static void afterClass() throws IOException {
        jedis.close();
        server.stop();
    }

    @Test
    public void WriteMemoryStoreWithEmptyAuth() {
        RedisWriteIO.write()
                .withEndpoint(REDIS_HOST, port).withAuth("");
    }

    @Test
    public void WriteMemoryStoreWithAuth() {
        RedisWriteIO.write()
                .withAuth("AuthString");
    }

    @Test
    public void WriteTimeOut() {
        RedisWriteIO.write()
                .withTimeout(10);
    }

    @Test
    public void WriteMemoryStoreWithExpireTime() {
        RedisWriteIO.Write write = RedisWriteIO.write();
        write = write.withExpireTime(1000L);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithoutExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(0L);
    }


    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithNegativeExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(-10L);
    }

    @Test
    public void WriteMemoryStoryWithConnectionConfiguration() {
        connection = CustomRedisConfigurations.create().withHost(REDIS_HOST).withPort(port);
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(connection);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoryWithNullConnectionConfiguration() {
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(null);
    }


    @Test
    public void testBatchProcessingWithTransactionExecuted() {
        RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
        PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));

        List<KV<String, String>> recordEntries = new ArrayList<>();
        for (int i = 0; i <= 10000; i++) {
            // adding unique entries 10000 times
            recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
        }

        // outputData will be written to Redis (memorystore)
        PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));

        outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
               .apply("Writing the data into Redis database", RedisWriteIO.write()
                    .withConnectionConfiguration(CustomRedisConfigurations
                            .create(REDIS_HOST, port)
                            .withTimeout(100)
                            .withAuth("credentials")
                            .enableSSL()));
        pipeline.run();

    }

}

RedisWriteIO是一个将文件中的数据写入Redis数据库的实用程序类.它按预期工作,编写的测试用例也按预期工作.然而,SonarQube没有涵盖下面的代码块.

if (batchCount >= DEFAULT_BATCH_SIZE) {
     transaction.exec();
     transaction.multi();
     batchCount = 0;
}

当文件有testBatchProcessingWithTransactionExecuted()0条以上的记录时,应该执行上面的块.它在测试课上不起作用.我try 用一个有5000条记录的测试文件来覆盖testBatchProcessingWithTransactionExecuted()方法中的这段代码,但仍然没有执行这段代码.

我需要帮助来编写涵盖所有行的测试用例.

推荐答案

我能够编写涵盖所有行的测试用例.我刚刚将列表的大小增加到20000,通过这样做,RedisWriteIO类的功能可以处理更大的数据集.

批次计数DEFAULT_BATCH_SIZE0用作由DEFAULT_BATCH_SIZE指定的阈值,当达到该阈值时,则执行事务(transaction.exec())并开始新事务(transaction.multi()).

    @Test
    public void testBatchProcessingWithTransactionExecuted() {
        RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
        PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));

        List<KV<String, String>> recordEntries = new ArrayList<>();
        for (int i = 0; i <= 20000; i++) {
            // adding unique entries 20000 times
            recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
        }

        // outputData will be written to Redis (memorystore)
        PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));

        outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
               .apply("Writing the data into Redis database", RedisWriteIO.write()
                    .withConnectionConfiguration(CustomRedisConfigurations
                            .create(REDIS_HOST, port)
                            .withTimeout(100)
                            .withAuth("credentials")
                            .enableSSL()));
        pipeline.run();

    }

Java相关问答推荐

ittext pdf延迟签名,签名无效

嵌入式ActiveMQ Artemis Web控制台加载错误

AssertJ Java:多条件断言

我不能再在Android Studio Hedgehog上用Java语言创建新项目了吗?

Jolt变换JSON数组问题

从ActiveMQ Classic迁移到ActiveMQ Artemis需要进行哪些客户端更改?

Sack()步骤中的合并运算符未按预期工作

Java构造函数分支

有效的公式或值列表必须少于或等于255个字符

是否有一个Java Future实现可以在池繁忙时在调用者线程中执行?

插入中的JOOQ序列,设置为VS值

如何处理两个几乎相同的XSD文件?

在线程Java中调用Interrupt()之后调用Join()

在权限列表中找不到我的应用程序

Spring Mapstruct如何获取Lazy初始化实体字段的信息?

将Optionals/null安全添加到嵌套的flatMap/流

具有 DayOfWeek 列表的 JPA 实体

一条Java记录可以保存多少个字段?

找不到 jar 文件系统提供程序try 使用 jdeps 和 jlink 创建收缩 Java 映像来运行 Minecraft

如何使用 JDBC 更改 Postgres Enum 类型