重要编辑

在进一步调查后,我发现包的大小实际上比所说的1024字节要大得多,1024字节只是我使用的标准输出(Android Studio/Ffltter)的限制.

一些接收到的信息包现在最大可达27,000字节,但与实际传输的超过10倍的大小相go 甚远.

我正在try 通过DART中的套接字连接发送长度最大为5MB的单个包.为此,我使用以下代码:

Socket socket = await Socket.connect(globals.serverUrl, globals.serverPort);
Stream<Uint8List> stream = socket?.asBroadcastStream();
Uint8List? response = await stream?.first;
String responseString = String.fromCharCodes(response);

请注意,我的服务器正在运行Java,而客户端正在使用DART.

从服务器向客户端发送数据包后,成功接收到包的前1024个字节,其余的都找不到了,即使在读了stream.first次之后,他们也继续读取新发送的包,而不是旧包的剩余字节.

所以我的问题是,我如何要求套接字流读取包的所有字节直到完成,而不仅仅是前1024个字节?

编辑:

使用以下命令解析客户端上收到的数据包:

String? decrypt(String cipherText, String keyString) {
  final key = Key.fromUtf8(keyString);
  final iv = IV.fromBase64(cipherText.split(":")[1]);
  final encrypter = Encrypter(AES(key, mode: AESMode.cbc, padding: null));
  final encrypted = Encrypted.fromBase64(cipherText.split(":")[0]);

  final decrypted = encrypter.decrypt(encrypted, iv: iv);

  globals.log.i("DECRYPTED: $decrypted");
  return decrypted;
}

我收到的错误源于获取IV,因为消息在1024字节处被截断,并且":"出现在字符串中很晚的位置.

推荐答案

问题是DART插座split messages bigger than 1024 bytes into multiple packets of 1024 bytes.因此,您可以使用一些方法在客户端中将它们组合在一起:

By extending Socket class

我不认为这是一个正确的解决方案:

  • 很难扩展,因为它是一个平台实现(您可以看到SDK实现dart:io,几乎任何类方法都是external).
  • 很难维护.
  • 因为它依赖于定制的平台实现,所以您需要在多个平台上进行.
  • 很容易造成未经记录的内存泄漏.

如果你仍然喜欢这个方法,让我知道,我会做进一步的研究.

By using Stream<T>.reduce function

在您的上下文中,这种方法的问题在于,当使用socket.write('Your message')发送消息时,套接字不会发出done事件.

因此,除非您使用套接字发送单个消息,否则该函数无法帮助您,因为它将返回一个永远不会完成的Future<T>(仅当套接字连接关闭时).

通过从服务器发出EOF消息

这是一个我发现甚至不那么优雅的解决方案,改进是受欢迎的.

其 idea 是将所有客户端接收到的包连接成一个单独的包,并在收到预定终止(EOF)字符串时停止接收.

Implementation

下面是一个服务器实现,它在每次连接新的客户端时发出一条5MB的消息,后跟一个message:end字符串.

import 'dart:io';

Future<void> main() async {
  final ServerSocket serverSocket =
      await ServerSocket.bind(InternetAddress.anyIPv4, 5050);

  final Stream<Socket> serverStream = serverSocket.asBroadcastStream();

  serverStream.listen((client) async {
    print(
        'New client connected: ${client.address}:${client.port} ${client.done} Remote address: ${client.remoteAddress}');

    const int k1byte = 8;
    const int k2bytes = k1byte * 2;
    const int k1kb = k1byte * 1000;
    const int k1mb = k1kb * 1000;
    const int k5mb = k1mb * 5;

    // Create a 5mb string that follows: '1000.....0001'
    final String k1mbMessage = '1${createStringOf(k5mb - k2bytes, '0')}1';

    client.write(k1mbMessage);
    client.write('message:end');
  });

  print('Listening on: ${serverSocket.address} ${serverSocket.port}');
}

String createStringOf(int size, [String char = ' ']) {
  // https://api.dart.dev/stable/2.17.3/dart-core/String-class.html it says:
  // > A sequence of UTF-16 code units.
  // And from https://www.ibm.com/docs/en/db2-for-zos/12?topic=unicode-utfs says:
  // > UTF-16 is based on 16-bit code units. Each character is encoded as at least 2 bytes.
  int dartStringEncodingSize = 2;

  assert(size >= dartStringEncodingSize && size.isEven,
      '''Dart char contains 2 bytes so we can only create Strings (with exact size) of even N bytes''');
  assert(char.length == 1, '''[char] must be a single char String''');

  int charCount = size ~/ dartStringEncodingSize;

  return char * charCount;
}

这里我们可以看到一个客户端实现,其中我们使用"我们自己的reduce"函数,在没有找到终止字符串的情况下组合所有数据包.

import 'dart:io';

Future<void> main() async {
  final Socket server = await Socket.connect('localhost', 5050);

  final Stream<String> serverSocket =
      server.asBroadcastStream().map(String.fromCharCodes); // Map to String by default

  const kMessageEof = 'message:end';

  String message = '';

  await for (String packet in serverSocket) {
    // If you are using [message] as a List of bytes (Uint8List):
    // message = [...Uint8List.fromList(message), ...Uint8List(packet)]
    message += packet;

    // Do not compare directly packet == kMessageEof
    // cause it can be 'broken' into multiple packets:
    // -> 00000 (packet 1)
    // -> 00000 (packet 2)
    // -> 00mes (packet 3)
    // -> sage: (packet 4)
    // -> end   (packet 5)
    if (message.endsWith(kMessageEof)) {
      // remove termination string
      message = message.replaceRange(
        message.length - kMessageEof.length,
        message.length,
        '',
      );
    }

    print('Received: $message'); // Prints '1000000......0000001'
  }
}

如果需要,您可以通过使用扩展名使其更通用:

import 'dart:io';

/// This was created since the native [reduce] says:
/// > When this stream is done, the returned future is completed with the value at that time.
///
/// The problem is that socket connections does not emits the [done] event after
/// each message but after the socket disconnection.
///
/// So here is a implementation that combines [reduce] and [takeWhile].
extension ReduceWhile<T> on Stream<T> {
  Future<T> reduceWhile({
    required T Function(T previous, T element) combine,
    required bool Function(T) combineWhile,
    T? initialValue,
  }) async {
    T initial = initialValue ?? await first;

    await for (T element in this) {
      initial = combine(initial, element);
      if (!combineWhile(initial)) break;
    }

    return initial;
  }
}

Future<void> main() async {
  final Socket server = await Socket.connect('localhost', 5050);

  final Stream<String> serverSocket =
      server.asBroadcastStream().map(String.fromCharCodes);

  const kMessageEof = 'message:end';

  // Reduce with a condition [combineWhile]
  String message = await serverSocket.reduceWhile(
    combine: (previous, element) => '$previous$element',
    combineWhile: (message) => !message.endsWith(kMessageEof),
  );

  // Remove termination string
  message = message.replaceRange(
    message.length - kMessageEof.length,
    message.length,
    '',
  );

  print('Received: $message');
}

因为套接字本身不发送完成事件,所以我发现将所有包减少到一个单独的包的方法是通过发出"我们自己的完成事件".

Dart相关问答推荐

如何找到这个Currate函数的返回类型?

如何使用 dart 将 Uint8list 转换为 List

在Dart中进行系统调用?

Task 'app:processDebugResources'的Flutter执行失败

GPS是否激活

如何在 Flutter 中忽略整个文件的 lint 规则?

为http POST请求从Flutter上的TextFormField捕获数据

Flutter 以编程方式触发 FutureBuilder

Flutter url_launcher 未在发布模式下启动 url

Flutter 错误:The _ScaffoldLayout custom multichild layout delegate forgot to lay out the following child

从Dart中的另一个文件导入扩展名方法

有可用的 Dart VM 吗?

如何在 timeLimit 之后使future 的计算超时?

如何使用 Dart 和 web 以 60fps 的速度驱动动画循环?

Dart如何将逗号添加到字符串编号

使用 Dart 解析 URI 以提取查询参数

Dart MD5 字符串

你如何命名一个 Dart 类?

如何初始化构造函数主体中的最终字段?

List firstWhere Bad state: No element