我从包含CRLF的流中读取每个X字节(对于我来说,X是2033,因为文件是用bcp生成的,所以我在样例代码中将X设为4).我想把这个流转换成另一个没有这个CRLF的流.新流将被反序列化为XML.

同样,我可以很容易地做到这一点,并以这种方式优雅地运行:

using System.IO;
using System.Text;
using System.Xml.Linq;

public class CREliminator
{
    public const int BcpChunkSize = 4;
    public Stream Run(StreamReader reader)
    {
        var auxstream = new MemoryStream();
        var auxwriter = new StreamWriter(auxstream);
        var chunk = new char[BcpChunkSize];
        do
        {
            var n_bytes =
                reader
               .ReadBlock(chunk, 0, BcpChunkSize);

            auxwriter.Write(chunk[..n_bytes]);
            auxwriter.Flush();

            if (n_bytes == BcpChunkSize)
            {
                char[] chunk2 = new char[2];
                n_bytes = reader.ReadBlock(chunk2, 0, 2);
            }

        } while (!reader.EndOfStream);
        auxstream.Position = 0;
        return auxstream;
    }
}

public class UnitTest1
{
    [Fact]
    public void Test1()
    {
        var CRLF="\r\n";
        var string_data = $"<doc{CRLF}umen{CRLF}t>A<{CRLF}/doc{CRLF}umen{CRLF}t>";
        var expected = string_data.Replace(CRLF, "");
        // to stream
        var memory = new MemoryStream(Encoding.UTF8.GetBytes(string_data));
        var data = new StreamReader(memory);
        // act
        var result = new CREliminator().Run(data);
        // assert
        var x = XDocument.Load(result);
        Assert.Equal(expected, x.ToString());
    }
}

但此代码在loads all stream in memory之前要返回新的流.

我的问题是,如何在懒惰模式下做到这一点?我的意思是,当某个进程正在从新流中读取时处理流.

谢谢.

推荐答案

只需要实现Stream.Read个方法:

  • 以区块为单位从源流读取
  • 在每个区块后跳过2个字节
using System.Buffers;
using System.Text;

var sourceString = string.Concat(
    Enumerable.Range(1, 10).Select(_ => "Foo \r\nBar \r\nBaz!\r\n"));
Console.WriteLine("Source: " + sourceString);

var encoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
var sourceBytes = encoding.GetBytes(sourceString);

using var stream = new MemoryStream(sourceBytes);
using var filter = new CrLfFilteringStream(stream, 4);
using var reader = new StreamReader(filter, encoding);

var res = reader.ReadToEnd();
Console.WriteLine("Result: " + res);

public class CrLfFilteringStream : Stream
{
    private readonly Stream _stream;
    private readonly int _chunkSize;
    private readonly byte[] _chunk;
    private int _chunkPosition;
    private int _chunkLength;

    public CrLfFilteringStream(Stream stream, int chunkSize)
    {
        _stream = stream ?? throw new ArgumentNullException(nameof(stream));
        _chunkSize = chunkSize;
        _chunkPosition = chunkSize;
        _chunkLength = chunkSize;
        _chunk = ArrayPool<byte>.Shared.Rent(chunkSize);
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        var toRead = count;
        var bufferPosition = 0;
        Span<byte> sink = stackalloc byte[2];

        while (toRead > 0 && _chunkLength > 0)
        {
            if (_chunkPosition >= _chunkSize)
            {
                _chunkPosition = 0;
                _chunkLength = _stream.Read(_chunk, 0, _chunkSize);

                // Skip CR LF.
                _stream.Read(sink);
            }

            var currentRead = Math.Min(_chunkLength, toRead);
            Array.Copy(_chunk, _chunkPosition, buffer, bufferPosition, currentRead);
            toRead -= currentRead;
            bufferPosition += currentRead;
            _chunkPosition += currentRead;
        }

        return count - toRead;
    }

    public override void Flush() => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }

    protected override void Dispose(bool disposing)
    {
        ArrayPool<byte>.Shared.Return(_chunk);

        base.Dispose(disposing);
    }
}

这段代码从ArrayPool租用一个区块大小的缓冲区,并且不分配任何其他内容(除了CrLfFilteringStream个实例).

Csharp相关问答推荐

MongoDB将JS查询转换为C#的问题

将列表字符串映射为逗号分隔字符串<>

使用LINQ to XML获取元素值列表是不起作用的

图形API基于appid列表检索多个应用程序

在C#中,DirectoryEntry返回空AuditRules集合,即使审计规则确实存在

有没有办法在WPF文本框中添加复制事件的处理程序?

如何将ASP.NET Core 2.1(在.NET框架上运行)更新到较新的版本?

Razor视图Razor页面指向同一端点时的优先级

Rx.Net窗口内部可观测数据提前完成

Automapper 12.x将GUID映射到字符串

为什么AggregateException的Catch块不足以处理取消?

如何在用户在线时限制令牌生成?

正在寻找新的.NET8 Blazor Web应用程序.如何将.js添加到.razor页面?

我可以查看我们向应用程序洞察发送了多少数据吗?

在C#.NET项目中启动时,如何等待提升的PowerShell进程退出?

如何将%{v_扩展}转换为%{v_扩展}>>

用于获取字符串的最后12个字符的正则表达式(空格除外)

如何将默认区域性更改为fr-FR而不是en-US?

如何在Cake脚本中设置MSBuild.exe的绝对路径

使用postman 测试配置了身份的.NET 6应用程序