问题是DART插座split messages bigger than 1024 bytes into multiple packets of 1024 bytes.因此,您可以使用一些方法在客户端中将它们组合在一起:
By extending Socket
class
我不认为这是一个正确的解决方案:
- 很难扩展,因为它是一个平台实现(您可以看到SDK实现dart:io,几乎任何类方法都是external).
- 很难维护.
- 因为它依赖于定制的平台实现,所以您需要在多个平台上进行.
- 很容易造成未经记录的内存泄漏.
如果你仍然喜欢这个方法,让我知道,我会做进一步的研究.
By using Stream<T>.reduce
function
在您的上下文中,这种方法的问题在于,当使用socket.write('Your message')
发送消息时,套接字不会发出done事件.
因此,除非您使用套接字发送单个消息,否则该函数无法帮助您,因为它将返回一个永远不会完成的Future<T>
(仅当套接字连接关闭时).
通过从服务器发出EOF消息
这是一个我发现甚至不那么优雅的解决方案,改进是受欢迎的.
其 idea 是将所有客户端接收到的包连接成一个单独的包,并在收到预定终止(EOF)字符串时停止接收.
Implementation
下面是一个服务器实现,它在每次连接新的客户端时发出一条5MB的消息,后跟一个message:end
字符串.
import 'dart:io';
Future<void> main() async {
final ServerSocket serverSocket =
await ServerSocket.bind(InternetAddress.anyIPv4, 5050);
final Stream<Socket> serverStream = serverSocket.asBroadcastStream();
serverStream.listen((client) async {
print(
'New client connected: ${client.address}:${client.port} ${client.done} Remote address: ${client.remoteAddress}');
const int k1byte = 8;
const int k2bytes = k1byte * 2;
const int k1kb = k1byte * 1000;
const int k1mb = k1kb * 1000;
const int k5mb = k1mb * 5;
// Create a 5mb string that follows: '1000.....0001'
final String k1mbMessage = '1${createStringOf(k5mb - k2bytes, '0')}1';
client.write(k1mbMessage);
client.write('message:end');
});
print('Listening on: ${serverSocket.address} ${serverSocket.port}');
}
String createStringOf(int size, [String char = ' ']) {
// https://api.dart.dev/stable/2.17.3/dart-core/String-class.html it says:
// > A sequence of UTF-16 code units.
// And from https://www.ibm.com/docs/en/db2-for-zos/12?topic=unicode-utfs says:
// > UTF-16 is based on 16-bit code units. Each character is encoded as at least 2 bytes.
int dartStringEncodingSize = 2;
assert(size >= dartStringEncodingSize && size.isEven,
'''Dart char contains 2 bytes so we can only create Strings (with exact size) of even N bytes''');
assert(char.length == 1, '''[char] must be a single char String''');
int charCount = size ~/ dartStringEncodingSize;
return char * charCount;
}
这里我们可以看到一个客户端实现,其中我们使用"我们自己的reduce"函数,在没有找到终止字符串的情况下组合所有数据包.
import 'dart:io';
Future<void> main() async {
final Socket server = await Socket.connect('localhost', 5050);
final Stream<String> serverSocket =
server.asBroadcastStream().map(String.fromCharCodes); // Map to String by default
const kMessageEof = 'message:end';
String message = '';
await for (String packet in serverSocket) {
// If you are using [message] as a List of bytes (Uint8List):
// message = [...Uint8List.fromList(message), ...Uint8List(packet)]
message += packet;
// Do not compare directly packet == kMessageEof
// cause it can be 'broken' into multiple packets:
// -> 00000 (packet 1)
// -> 00000 (packet 2)
// -> 00mes (packet 3)
// -> sage: (packet 4)
// -> end (packet 5)
if (message.endsWith(kMessageEof)) {
// remove termination string
message = message.replaceRange(
message.length - kMessageEof.length,
message.length,
'',
);
}
print('Received: $message'); // Prints '1000000......0000001'
}
}
如果需要,您可以通过使用扩展名使其更通用:
import 'dart:io';
/// This was created since the native [reduce] says:
/// > When this stream is done, the returned future is completed with the value at that time.
///
/// The problem is that socket connections does not emits the [done] event after
/// each message but after the socket disconnection.
///
/// So here is a implementation that combines [reduce] and [takeWhile].
extension ReduceWhile<T> on Stream<T> {
Future<T> reduceWhile({
required T Function(T previous, T element) combine,
required bool Function(T) combineWhile,
T? initialValue,
}) async {
T initial = initialValue ?? await first;
await for (T element in this) {
initial = combine(initial, element);
if (!combineWhile(initial)) break;
}
return initial;
}
}
Future<void> main() async {
final Socket server = await Socket.connect('localhost', 5050);
final Stream<String> serverSocket =
server.asBroadcastStream().map(String.fromCharCodes);
const kMessageEof = 'message:end';
// Reduce with a condition [combineWhile]
String message = await serverSocket.reduceWhile(
combine: (previous, element) => '$previous$element',
combineWhile: (message) => !message.endsWith(kMessageEof),
);
// Remove termination string
message = message.replaceRange(
message.length - kMessageEof.length,
message.length,
'',
);
print('Received: $message');
}
因为套接字本身不发送完成事件,所以我发现将所有包减少到一个单独的包的方法是通过发出"我们自己的完成事件".