我有以下问题:
-
某个producer将Protobuf消息作为二进制数据(字节数组)发送.
-
这些二进制数据进入一个misconfigured Kafka cluster,它将字节数组反序列化为字符串.
-
然后,该集群将数据序列化为字符串,并将其发送到consumer.
-
毫无戒心的消费者期望收到一个二进制字节数组,但得到的却是一个UTF-8编码的MISH.
我试图在JUnit测试中重现它.
假设我们有以下原型文件:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";
option java_package = "com.mycompany.proto";
option java_multiple_files = true;
package com.mycompany;
enum MessageType {
NOT_SET = 0;
TYPE_A = 1;
TYPE_B = 2;
}
message MyMessagePart {
string someValue = 1;
}
message MyMessage {
// Numeric (integer) variable
int32 myNumber = 1;
// Text value
string myText = 2;
// Enum value
MessageType mType = 3;
// Message parts
repeated MyMessagePart messagePart = 4;
// Uint32 value
google.protobuf.UInt32Value uint32Value = 5;
// Timestamp
google.protobuf.Timestamp timestamp = 6;
}
然后,我编写了以下测试.
public class EncodingTest {
@Test
public void dealWithCorruptedBinaryData() throws InvalidProtocolBufferException {
// 1. Create a Protobuf message
final MyMessage msg = MyMessage.newBuilder()
.setMyNumber(42)
.setMyText("Hello")
.setMType(MessageType.TYPE_A)
.setUint32Value(UInt32Value.newBuilder()
.setValue(2067)
.build())
.addMessagePart(MyMessagePart.newBuilder()
.setSomeValue("message part value")
.build())
.build();
// 2. Convert it to bytes
final byte[] bytesSentByProducer = msg.toByteArray();
// 3. Now bytesSentByProducer enter misconfigured Kafka
// where they are deserialized using StringDeserializer
final StringDeserializer deserializer = new StringDeserializer();
final String dataReceivedInsideMisconfiguredKafka = deserializer.deserialize("inputTopic",
bytesSentByProducer);
// 4. Then, misconfigured Kafka serializes the data as String
final StringSerializer serializer = new StringSerializer();
final byte[] dataSentToConsumer = serializer.serialize("outputTopic", dataReceivedInsideMisconfiguredKafka);
// Because dataSentToConsumer have been corrupted during deserialization
// or serialization as string, conversion back to Protobuf does not work.
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
}
}
生产者创建ProtoBuf消息msg
并将其编码为字节数组bytesSentByProducer
.
错误配置的Kafka集群接收该字节数组,将其反序列化为字符串dataReceivedInsideMisconfiguredKafka
,将其序列化为字符串dataSentToConsumer
,并将其发送给消费者.
由于UTF-8编码损坏了二进制数据,因此调用
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
导致异常:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:107)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1245)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1024)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readUInt32(CodedInputStream.java:954)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:58)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:14)
字节数组向回消息的转换与未被 destruct 的字节数组bytesSentByProducer
(MyMessage.parseFrom(bytesSentByProducer)
)一起工作.
问题:
-
能把
dataSentToConsumer
换成bytesSentByProducer
吗? -
如果是,如果我控制的唯一部分是消费者,我如何解决这个问题?我如何才能撤销在错误配置的Kafka集群中发生的UTF-8编码?
注意:显而易见的解决方案是正确配置Kafka集群.同样的消费者在另一个环境中工作得很好,那里有一个正常的Kafka集群,不会进行任何奇怪的转换.由于官僚机构的原因,这个显而易见且最简单的解决方案是不可用的.
What I tried个
Approach 1个
private byte[] convertToOriginalBytes(final byte[] bytesAfter) throws CharacterCodingException {
final Charset charset = StandardCharsets.UTF_8;
final CharsetDecoder decoder = charset.newDecoder();
final CharsetEncoder encoder = charset.newEncoder();
final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesAfter);
final CharBuffer charBuffer = CharBuffer.allocate(bytesAfter.length);
final CoderResult result = decoder.decode(byteBuffer, charBuffer, true);
result.throwException();
final ByteBuffer reversedByteBuffer = encoder.encode(charBuffer);
final byte[] reversedBytes = new byte[reversedByteBuffer.remaining()];
reversedByteBuffer.get(reversedBytes);
return reversedBytes;
}
这一结果是一个例外.
java.nio.BufferUnderflowException
at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:272)
at com.mycompany.EncodingTest.convertToOriginalBytes(EncodingTest.java:67)
at com.mycompany.EncodingTest.dealWithCorruptedBinaryData(EncodingTest.java:54)
Approach 2个
据我所知,UTF-8有多种字节模式:
-
0xxxxxxx
表示单字节字符. - 对于双字节字符等,为
110xxxxx 10xxxxxx
.
我假设修改了StringDeserializer
和/或StringSerializer
个二进制数据中的某个部分,以符合这样的UTF-8规则.
只要这种转换是可逆的,人们就可以通过操纵比特来获得原始消息.