我有以下问题:

  1. 某个producer将Protobuf消息作为二进制数据(字节数组)发送.

  2. 这些二进制数据进入一个misconfigured Kafka cluster,它将字节数组反序列化为字符串.

  3. 然后,该集群将数据序列化为字符串,并将其发送到consumer.

  4. 毫无戒心的消费者期望收到一个二进制字节数组,但得到的却是一个UTF-8编码的MISH.

我试图在JUnit测试中重现它.

假设我们有以下原型文件:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

option java_package = "com.mycompany.proto";
option java_multiple_files = true;

package com.mycompany;

enum MessageType {
    NOT_SET = 0;
    TYPE_A = 1;
    TYPE_B = 2;
}

message MyMessagePart {
    string someValue = 1;
}

message MyMessage {
  // Numeric (integer) variable
  int32 myNumber = 1;

  // Text value
  string myText = 2;

  // Enum value
  MessageType mType = 3;

  // Message parts
  repeated MyMessagePart messagePart = 4;

  // Uint32 value
  google.protobuf.UInt32Value uint32Value = 5;

  // Timestamp
  google.protobuf.Timestamp timestamp = 6;
}

然后,我编写了以下测试.

public class EncodingTest {
    @Test
    public void dealWithCorruptedBinaryData() throws InvalidProtocolBufferException {
        // 1. Create a Protobuf message
        final MyMessage msg = MyMessage.newBuilder()
                .setMyNumber(42)
                .setMyText("Hello")
                .setMType(MessageType.TYPE_A)
                .setUint32Value(UInt32Value.newBuilder()
                        .setValue(2067)
                        .build())
                .addMessagePart(MyMessagePart.newBuilder()
                        .setSomeValue("message part value")
                        .build())
                .build();

        // 2. Convert it to bytes
        final byte[] bytesSentByProducer = msg.toByteArray();

        // 3. Now bytesSentByProducer enter misconfigured Kafka
        // where they are deserialized using StringDeserializer
        final StringDeserializer deserializer = new StringDeserializer();
        final String dataReceivedInsideMisconfiguredKafka = deserializer.deserialize("inputTopic",
                bytesSentByProducer);

        // 4. Then, misconfigured Kafka serializes the data as String
        final StringSerializer serializer = new StringSerializer();
        final byte[] dataSentToConsumer = serializer.serialize("outputTopic", dataReceivedInsideMisconfiguredKafka);

        // Because dataSentToConsumer have been corrupted during deserialization
        // or serialization as string, conversion back to Protobuf does not work.

        final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);

    }
}

生产者创建ProtoBuf消息msg并将其编码为字节数组bytesSentByProducer.

错误配置的Kafka集群接收该字节数组,将其反序列化为字符串dataReceivedInsideMisconfiguredKafka,将其序列化为字符串dataSentToConsumer,并将其发送给消费者.

由于UTF-8编码损坏了二进制数据,因此调用

final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);

导致异常:

com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.

    at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:107)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1245)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1024)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readUInt32(CodedInputStream.java:954)
    at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:58)
    at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:14)

字节数组向回消息的转换与未被 destruct 的字节数组bytesSentByProducer(MyMessage.parseFrom(bytesSentByProducer))一起工作.

问题:

  1. 能把dataSentToConsumer换成bytesSentByProducer吗?

  2. 如果是,如果我控制的唯一部分是消费者,我如何解决这个问题?我如何才能撤销在错误配置的Kafka集群中发生的UTF-8编码?

注意:显而易见的解决方案是正确配置Kafka集群.同样的消费者在另一个环境中工作得很好,那里有一个正常的Kafka集群,不会进行任何奇怪的转换.由于官僚机构的原因,这个显而易见且最简单的解决方案是不可用的.

What I tried

Approach 1

private byte[] convertToOriginalBytes(final byte[] bytesAfter) throws CharacterCodingException {
  final Charset charset = StandardCharsets.UTF_8;
  final CharsetDecoder decoder = charset.newDecoder();
  final CharsetEncoder encoder = charset.newEncoder();
  final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesAfter);
  final CharBuffer charBuffer = CharBuffer.allocate(bytesAfter.length);
  final CoderResult result = decoder.decode(byteBuffer, charBuffer, true);
  result.throwException();
  final ByteBuffer reversedByteBuffer = encoder.encode(charBuffer);

  final byte[] reversedBytes = new byte[reversedByteBuffer.remaining()];
  reversedByteBuffer.get(reversedBytes);
  return reversedBytes;
}

这一结果是一个例外.

java.nio.BufferUnderflowException
    at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:272)
    at com.mycompany.EncodingTest.convertToOriginalBytes(EncodingTest.java:67)
    at com.mycompany.EncodingTest.dealWithCorruptedBinaryData(EncodingTest.java:54)

Approach 2

据我所知,UTF-8有多种字节模式:

  1. 0xxxxxxx表示单字节字符.
  2. 对于双字节字符等,为110xxxxx 10xxxxxx.

我假设修改了StringDeserializer和/或StringSerializer个二进制数据中的某个部分,以符合这样的UTF-8规则.

只要这种转换是可逆的,人们就可以通过操纵比特来获得原始消息.

推荐答案

我不想成为坏消息的传播者,但你想要的是不可能的.

关键点位是completeness.是否存在从一个域(这里是原始字节)到目标域(这里是UTF_8)的complete映射,反之亦然.

换句话说:这是一个挑战:给定一个任意 Select 的字节序列,生成一些文本,如果您使用UTF-8字符集编码序列化该文本,它将生成与之完全相同的字节.是否有一个字节序列可供 Select ,使此作业(job)为not possible

不幸的是,答案是yes,从而简单地证明了bytes -> text-via-UTF_8 -> bytesfatal,除非你非常非常幸运,而且字节恰好不包括UTF8无法呈现的任何内容.

许多解码器将采用无效的UTF8(因为,如果使用UTF8将文本转换为字节时某些字节序列不可能出现,这通常意味着存在某些字节序列,如果通过UTF8转换为文本,则这些字节序列是无效的)-并且只是try 它,或者丢弃那里的"损坏的数据"字形,而不是出错.所以,无论是谁管理Kafka服务器,从来没有出现过错误.这一行为(将无效的UTF-8,因为它不是UTF-8)变成了‘UHH,Wha?’符号)为destructive.

一些字符集编码确实实现了这一点.最常用的无疑是ISO-8859-1.这个值是complete--因为它只是一个简单的映射,将从0到255的每个字节值映射到某个唯一的字符.因此,你可以在这一点上一整天都是双向的.

因此,我们得到了一些修复:

  • Base64几乎经得起一切考验,这就是它设计的目的.它的效率为33%(Base64将3字节转换为4字节;输入3MB大容量转换为4MB大输出).把你的Base64编码形式的字节交给Kafka Thing,或者让Kafkago 做.
  • 在应用字符集编码的链接中设置every链(因此everywhere字节转换为字符,反之亦然)以使用ISO-8859-1.这是老生常谈和奇怪的,并不推荐,但可能是一个‘快速’的词的定义的‘快速’修复.
  • 正确地修复它-在这一点上,我相信您已经知道如何做到这一点,您只是在寻求更快的解决方案和/或方法来处理已经损坏的数据.这就是这个答案的第一句话:(

只要这种转变是可逆的

是的.您正确地确定了所有这一切的关键要求,即它是可逆的.不幸的是,事实并非如此.

Java相关问答推荐

如何让TaskView总是添加特定的列来进行排序?

Java Streams在矩阵遍历中的性能影响

在Java中将Charsequence数组更改为String数组或List String<>

无法在org. openjfx:javafx—fxml:21的下列变体之间进行 Select

给定Java枚举类,通过值查找枚举

现场观看Android Studio中的变化

所有 case 一起输入时输出错误,而单独放置时输出正确

为什么JAVA&S清洁器使用链表而不是并发HashSet?

从Spring5迁移到Spring6:无法在雅加达包中找到类

如何使用值中包含与号的查询参数创建一个java.net.URI

我如何解释这个错误?必需类型:供应商R,提供:收集器对象,捕获?,java.util.List java.lang.Object>>

Arrays.hashcode(int[])为不同的元素提供相同的散列

Java.time.OffsetDateTime的SQL Server数据库列类型是什么?

为什么没有加载java.se模块?

Java CDI:@Singleton@Startup@Inject无法实现接口

在Spring Boot中使用咖啡因进行缓存-根据输出控制缓存

没有Google Play服务,Firebase Auth无法工作

URI构造函数错误?

什么是;u〃;平均值;jdku;在java开发工具包中?

如何在 Android Studio 中删除 ImageView 和屏幕/父级边缘之间的额外空间?