Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
System.IO.Pipelines är ett bibliotek som är utformat för att göra det enklare att utföra högpresterande I/O i .NET. Det är ett bibliotek som är inriktat på .NET Standard som fungerar på alla .NET-implementeringar.
Biblioteket är tillgängligt i Nuget-paketet System.IO.Pipelines .
Vilket problem löser System.IO.Pipelines?
Appar som parsar strömmande data består av exempelkod med många specialiserade och ovanliga kodflöden. Pannplåten och specialfallskoden är komplexa och svåra att underhålla.
System.IO.Pipelines har konstruerats för att:
- Ha högpresterande parsning av strömmande data.
- Minska kodkomplexiteten.
Följande kod är typisk för en TCP-server som tar emot radavgränsade meddelanden (avgränsade av '\n') från en klient:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Föregående kod har flera problem:
- Hela meddelandet (radslutet) kanske inte tas emot i ett enda anrop till
ReadAsync. - Den ignorerar resultatet av
stream.ReadAsync.stream.ReadAsyncreturnerar hur mycket data som lästes. - Den hanterar inte det fall där flera rader läss i ett enda
ReadAsyncanrop. - Den allokerar en
bytematris med varje läsning.
För att åtgärda de föregående problemen krävs följande ändringar:
Buffring av inkommande data tills en ny rad hittas.
Parsa alla rader som returneras i bufferten.
Det är möjligt att linjen är större än 1 KB (1 024 byte). Koden måste ändra storlek på indatabufferten tills avgränsare hittas för att passa den fullständiga raden inuti bufferten.
- Om bufferten ändras görs fler buffertkopior eftersom längre rader visas i indata.
- För att minska slöseri med utrymme komprimerar du bufferten som används för att läsa linjer.
Överväg att använda buffertpooler för att undvika att allokera minne upprepade gånger.
Följande kod åtgärdar några av dessa problem:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Den tidigare koden är komplex och hanterar inte alla problem som identifierats. Nätverk med höga prestanda innebär vanligtvis att skriva komplex kod för att maximera prestanda.
System.IO.Pipelines har utformats för att göra det enklare att skriva den här typen av kod.
Pipa
Klassen Pipe kan användas för att skapa ett PipeWriter/PipeReader par. Alla data som skrivs PipeWriter till är tillgängliga i PipeReader:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Grundläggande användning av rör
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Det finns två loopar:
-
FillPipeAsyncläser frånSocketoch skriver tillPipeWriter. -
ReadPipeAsyncläser frånPipeReaderoch parsar inkommande rader.
Det finns inga explicita buffertar allokerade. All bufferthantering delegeras till implementeringarna PipeReader och PipeWriter . Om du delegerar bufferthantering blir det enklare för användning av kod att fokusera enbart på affärslogik.
I den första loopen:
- PipeWriter.GetMemory(Int32) anropas för att hämta minne från den underliggande skrivaren.
-
PipeWriter.Advance(Int32) anropas för att berätta
PipeWriterhur mycket data som skrevs till bufferten. -
PipeWriter.FlushAsync anropas för att göra data tillgängliga för
PipeReader.
I den andra loopen PipeReader förbrukar den buffertar som skrivits av PipeWriter. Buffertarna kommer från socketen. Anropet till PipeReader.ReadAsync:
Returnerar en ReadResult som innehåller två viktiga informationsdelar:
- De data som lästes i form av
ReadOnlySequence<byte>. - Ett booleskt värde
IsCompletedsom anger om dataslutet (EOF) har nåtts.
- De data som lästes i form av
Efter att ha hittat slutet av linjens avgränsare (EOL) och parsat linjen:
- Logiken bearbetar bufferten för att hoppa över det som redan bearbetas.
-
PipeReader.AdvanceToanropas för att berättaPipeReaderhur mycket data som har förbrukats och undersökts.
Läsaren och skrivarslingorna slutar med att anropa Complete.
Complete låter den underliggande Pipe frigöra det minne som den allokerade.
Backpressure och flödeskontroll
Helst fungerar läsning och parsning tillsammans:
- Lästråden förbrukar data från nätverket och placerar dem i buffertar.
- Parsningstråden ansvarar för att konstruera lämpliga datastrukturer.
Vanligtvis tar parsning längre tid än att bara kopiera datablock från nätverket:
- Lästråden hamnar före parsningstråden.
- Lästråden måste antingen sakta ned eller allokera mer minne för att lagra data för parsningstråden.
För optimala prestanda finns det en balans mellan frekventa pauser och allokering av mer minne.
För att lösa föregående problem har de Pipe två inställningarna för att styra dataflödet:
- PauseWriterThreshold: Avgör hur mycket data som ska bufferas innan anropen pausas FlushAsync .
-
ResumeWriterThreshold: Avgör hur mycket data läsaren måste observera innan anropen återupptas
PipeWriter.FlushAsync.
- Returnerar en ofullständig
ValueTask<FlushResult>när mängden data iPipekorsarPauseWriterThreshold. - Slutförs
ValueTask<FlushResult>när den blir lägre änResumeWriterThreshold.
Två värden används för att förhindra snabb cykling, vilket kan inträffa om ett värde används.
Exempel
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
rörschemaläggare
Vanligtvis när du använder async och återupptas asynkron kod på antingen en await eller den aktuella TaskSchedulerSynchronizationContext.
När du gör I/O är det viktigt att ha detaljerad kontroll över var I/O utförs. Med den här kontrollen kan du dra nytta av CPU-cacheminnen på ett effektivt sätt. Effektiv cachelagring är avgörande för högpresterande appar som webbservrar. PipeScheduler ger kontroll över var asynkrona motringningar körs. Som standard:
- SynchronizationContext Strömmen används.
- Om det inte finns använder
SynchronizationContextden trådpoolen för att köra återanrop.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool är implementeringen PipeScheduler som köar återanrop till trådpoolen.
PipeScheduler.ThreadPool är standard och det bästa valet.
PipeScheduler.Inline kan orsaka oavsiktliga konsekvenser som dödlägen.
Röråterställning
Det är ofta effektivt att återanvända objektet Pipe . Om du vill återställa röret anropar du PipeReaderReset när både PipeReader och PipeWriter är klara.
PipeReader
PipeReader hanterar minnet åt anroparen.
efter att ha ringt PipeReader.AdvanceTo. Detta meddelar PipeReader när anroparen är klar med minnet så att det kan spåras. Den ReadOnlySequence<byte> returnerade från PipeReader.ReadAsync är endast giltig tills anropet till PipeReader.AdvanceTo. Det är olagligt att använda ReadOnlySequence<byte> efter att ha anropat PipeReader.AdvanceTo.
PipeReader.AdvanceTo tar två SequencePosition argument:
- Det första argumentet avgör hur mycket minne som förbrukades.
- Det andra argumentet avgör hur mycket av bufferten som observerades.
Att markera data som förbrukade innebär att röret kan returnera minnet till den underliggande buffertpoolen. Om data markeras som observerade styrs vad nästa anrop till PipeReader.ReadAsync gör. Att markera allt som observerats innebär att nästa anrop till PipeReader.ReadAsync inte returneras förrän mer data har skrivits till röret. Alla andra värden gör nästa anrop för att PipeReader.ReadAsync returnera omedelbart med observerade och oobserverade data, men inte data som redan har förbrukats.
Läsa scenarier för strömmande data
Det finns ett par typiska mönster som uppstår när du försöker läsa strömmande data:
- Med en dataström parsar du ett enda meddelande.
- Med en dataström parsar du alla tillgängliga meddelanden.
I följande exempel används TryParseLines metoden för att parsa meddelanden från en ReadOnlySequence<byte>.
TryParseLines parsar ett enda meddelande och uppdaterar indatabufferten för att trimma det parsade meddelandet från bufferten.
TryParseLines är inte en del av .NET, det är en användarskriven metod som används i följande avsnitt.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Läsa ett enda meddelande
Följande kod läser ett enda meddelande från en PipeReader och returnerar det till anroparen.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Koden ovan:
- Parsar ett enda meddelande.
- Uppdaterar den förbrukade
SequencePositionoch undersöktaSequencePositionså att den pekar på början av den trimmade indatabufferten.
De två SequencePosition argumenten uppdateras eftersom TryParseLines det tolkade meddelandet tas bort från indatabufferten. När du parsar ett enda meddelande från bufferten bör den undersökta positionen vanligtvis vara något av följande:
- Slutet av meddelandet.
- Slutet på den mottagna bufferten om inget meddelande hittades.
Det enskilda meddelandefallet har störst risk för fel. Om du skickar fel värden som ska undersökas kan det leda till ett undantag från minnet eller en oändlig loop. Mer information finns i avsnittet Om vanliga problem med PipeReader i den här artikeln.
Läsa flera meddelanden
Följande kod läser alla meddelanden från en och anropar PipeReader var och enProcessMessageAsync.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Annullering
PipeReader.ReadAsync:
- Har stöd för att skicka en CancellationToken.
- Utlöser ett OperationCanceledException om det
CancellationTokenavbryts medan det finns en väntande läsning. - Stöder ett sätt att avbryta den aktuella läsåtgärden via PipeReader.CancelPendingRead, vilket undviker att skapa ett undantag. Anrop
PipeReader.CancelPendingReadgör att det aktuella eller nästa anropet tillPipeReader.ReadAsyncreturnerar ett ReadResult medIsCanceledinställt påtrue. Detta kan vara användbart för att stoppa den befintliga läsloopen på ett icke-destruktivt och icke-exceptionellt sätt.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Vanliga problem med PipeReader
Om du skickar fel värden till eller
consumedkan det leda tillexaminedatt du läser redan lästa data.Att skicka
buffer.Endsom undersökt kan resultera i:- Data som har stoppats
- Eventuellt ett undantag från slut på minne (OOM) om data inte används. Till exempel
PipeReader.AdvanceTo(position, buffer.End)när du bearbetar ett enda meddelande i taget från bufferten.
Om du skickar fel värden till
consumedellerexaminedkan det resultera i en oändlig loop. OmPipeReader.AdvanceTo(buffer.Start)det till exempelbuffer.Startinte har ändrats kommer nästa anrop attPipeReader.ReadAsyncreturneras omedelbart innan nya data tas emot.Om fel värden skickas till
consumedellerexaminedkan det resultera i oändlig buffring (eventuell OOM).När du använder efteranropet
ReadOnlySequence<byte>PipeReader.AdvanceTokan det leda till minnesskada (använd efter kostnadsfritt).Om du inte anropar
PipeReader.Complete/CompleteAsynckan det leda till en minnesläcka.Att kontrollera ReadResult.IsCompleted och avsluta läslogik innan bufferten bearbetas resulterar i dataförlust. Loopavslutsvillkoret ska baseras på
ReadResult.Buffer.IsEmptyochReadResult.IsCompleted. Om du gör detta felaktigt kan det resultera i en oändlig loop.
Problematisk kod
❌ Dataförlust
ReadResult Kan returnera det sista datasegmentet när IsCompleted är inställt på true. Att inte läsa dessa data innan läsloopen avslutas resulterar i dataförlust.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌ Oändlig loop
Följande logik kan resultera i en oändlig loop om Result.IsCompleted är true men det finns aldrig ett fullständigt meddelande i bufferten.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
Här är en annan kod med samma problem. Den söker efter en buffert som inte är tom innan du kontrollerar ReadResult.IsCompleted. Eftersom den finns i en else ifloopar den för alltid om det aldrig finns ett fullständigt meddelande i bufferten.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌ Program som inte svarar
Villkorslöst anrop PipeReader.AdvanceTo med buffer.End i examined positionen kan leda till att programmet inte svarar när ett enda meddelande parsas. Nästa anrop till PipeReader.AdvanceTo returnerar inte förrän:
- Mer data skrivs till röret.
- Och de nya data har inte undersökts tidigare.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌ Slut på minne (OOM)
Med följande villkor fortsätter följande kod buffring tills en OutOfMemoryException inträffar:
- Det finns ingen maximal meddelandestorlek.
- De data som returneras från
PipeReadergör inte ett fullständigt meddelande. Det gör till exempel inte ett fullständigt meddelande eftersom den andra sidan skriver ett stort meddelande (till exempel ett 4 GB-meddelande).
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌ Minnesskada
När du skriver hjälppersonal som läser bufferten bör alla returnerade nyttolaster kopieras innan du anropar Advance. I följande exempel returneras minne som Pipe har tagits bort och kan återanvända det för nästa åtgärd (läsa/skriva).
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
PipeWriter
Hanterar PipeWriter buffertar för att skriva åt anroparen.
PipeWriter implementerar IBufferWriter<byte>.
IBufferWriter<byte> gör det möjligt att få åtkomst till buffertar för att utföra skrivningar utan extra buffertkopior.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Föregående kod:
- Begär en buffert på minst 5 byte från
PipeWriteratt använda GetMemory. - Skriver byte för ASCII-strängen
"Hello"till den returneradeMemory<byte>. - Anrop Advance för att ange hur många byte som skrevs till bufferten.
- Tömer
PipeWriter, som skickar byte till den underliggande enheten.
Den tidigare skrivmetoden använder buffertar som tillhandahålls av PipeWriter. Det kunde också ha använt PipeWriter.WriteAsync, vilket:
- Kopierar den befintliga bufferten
PipeWritertill . - Anropar
GetSpan,Advanceefter behov och anropar FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Annullering
FlushAsync har stöd för att skicka en CancellationToken. Om du skickar ett CancellationToken resultat i ett OperationCanceledException om token avbryts medan en tömning väntar.
PipeWriter.FlushAsync stöder ett sätt att avbryta den aktuella tömningsåtgärden via PipeWriter.CancelPendingFlush utan att skapa ett undantag. Anrop PipeWriter.CancelPendingFlush gör att det aktuella eller nästa anropet till PipeWriter.FlushAsync eller PipeWriter.WriteAsync returnerar ett FlushResult med IsCanceled inställt på true. Detta kan vara användbart för att stoppa den resulterande tömningen på ett icke-destruktivt och icke-exceptionellt sätt.
Vanliga problem med PipeWriter
- GetSpan och GetMemory returnera en buffert med minst den begärda mängden minne. Anta inte exakta buffertstorlekar.
- Det finns ingen garanti för att efterföljande anrop returnerar samma buffert eller buffert i samma storlek.
- En ny buffert måste begäras efter anropet Advance för att kunna fortsätta skriva mer data. Det går inte att skriva till den tidigare hämtade bufferten.
- Det är inte säkert att ringa
GetMemoryellerGetSpannär det finns ett ofullständigt samtal tillFlushAsync. - Att anropa
CompleteellerCompleteAsyncnär det finns oupplösade data kan leda till minnesskada.
Tips för att använda PipeReader och PipeWriter
Följande tips hjälper dig att använda klasserna System.IO.Pipelines :
- Slutför alltid PipeReader och PipeWriter, inklusive ett undantag där det är tillämpligt.
- Ring PipeReader.AdvanceTo alltid efter att ha ringt PipeReader.ReadAsync.
- Skriv regelbundet
awaitPipeWriter.FlushAsync och kontrollera FlushResult.IsCompletedalltid . Avbryt skrivning omIsCompletedärtrue, eftersom det indikerar att läsaren har slutförts och inte längre bryr sig om vad som skrivs. - Anropa PipeWriter.FlushAsync när du har skrivit något som du vill
PipeReaderha åtkomst till. - Anropa
FlushAsyncinte om läsaren inte kan starta förränFlushAsyncden är klar, eftersom det kan orsaka ett dödläge. - Se till att endast en kontext "äger" en
PipeReaderellerPipeWritereller kommer åt dem. Dessa typer är inte trådsäkra. - Få aldrig åtkomst till en ReadResult.Buffer när du har anropat
AdvanceToeller slutförtPipeReader.
IDuplexPipe
IDuplexPipe är ett kontrakt för typer som stöder både läsning och skrivning. Till exempel skulle en nätverksanslutning representeras av en IDuplexPipe.
Till skillnad från Pipe, som innehåller en PipeReader och en PipeWriter, IDuplexPipe representerar en enda sida av en fullständig duplex-anslutning. Det innebär att det som skrivs till PipeWriter inte kommer att läsas från PipeReader.
Strömmar
När du läser eller skriver dataström läser du vanligtvis data med en de-serialiserare och skriver data med en serialiserare. De flesta av dessa läs- och skrivström-API:er har en Stream parameter. För att göra det enklare att integrera med dessa befintliga API:er PipeReader och PipeWriter exponera en AsStream metod.
AsStream returnerar en Stream implementering runt PipeReader eller PipeWriter.
Stream-exempel
PipeReader och PipeWriter instanser kan skapas med hjälp av statiska Create metoder givet ett Stream objekt och valfria motsvarande alternativ för skapande.
Tillåt StreamPipeReaderOptions kontroll över skapandet av instansen PipeReader med följande parametrar:
-
StreamPipeReaderOptions.BufferSize är den minsta buffertstorleken i byte som används när du hyr minne från poolen och standardvärdet är
4096. -
StreamPipeReaderOptions.LeaveOpen flagga avgör om den underliggande strömmen lämnas öppen efter slutföranden
PipeReaderoch standardvärdet ärfalse. -
StreamPipeReaderOptions.MinimumReadSize representerar tröskelvärdet för återstående byte i bufferten innan en ny buffert allokeras och standardvärdet
1024är . -
StreamPipeReaderOptions.Pool är den
MemoryPool<byte>som används vid allokering av minne och är standardvärdetnull.
Tillåt StreamPipeWriterOptions kontroll över skapandet av instansen PipeWriter med följande parametrar:
-
StreamPipeWriterOptions.LeaveOpen flagga avgör om den underliggande strömmen lämnas öppen efter slutföranden
PipeWriteroch standardvärdet ärfalse. -
StreamPipeWriterOptions.MinimumBufferSize representerar den minsta buffertstorlek som ska användas när du hyr minne från Pool, och standardvärdet är
4096. -
StreamPipeWriterOptions.Pool är den
MemoryPool<byte>som används vid allokering av minne och är standardvärdetnull.
Viktigt!
När du skapar PipeReader och PipeWriter instanser med hjälp av Create metoderna måste du överväga objektets Stream livslängd. Om du behöver åtkomst till dataströmmen när läsaren eller skrivaren är klar med den måste du ange LeaveOpen flaggan till true för skapandealternativen. Annars stängs strömmen.
Följande kod visar hur du skapar och PipeReader instanser med hjälp PipeWriter av Create metoderna från en dataström.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Programmet använder en StreamReader för att läsa lorem-ipsum.txt-filen som en ström, och den måste avslutas med en tom rad.
FileStream skickas till PipeReader.Create, som instansierar ett PipeReader objekt. Konsolprogrammet skickar sedan sin standardutdataström till PipeWriter.Create med hjälp av Console.OpenStandardOutput(). Exemplet stöder annullering.