Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Även om du kan använda metoderna DataflowBlock.Receive, DataflowBlock.ReceiveAsync och DataflowBlock.TryReceive för att ta emot meddelanden från källblock kan du också ansluta meddelandeblock för att bilda en dataflödesrörledning. En dataflödespipeline är en serie komponenter, eller dataflödesblock, som var och en utför en specifik uppgift som bidrar till ett större mål. Varje dataflödesblock i en dataflödespipeline utför arbete när det tar emot ett meddelande från ett annat dataflödesblock. En analogi till detta är en monteringslinje för biltillverkning. När varje fordon passerar genom monteringslinjen monterar en station ramen, nästa installerar motorn och så vidare. Eftersom en monteringslinje gör att flera fordon kan monteras samtidigt ger den bättre dataflöde än att montera kompletta fordon en i taget.
Det här dokumentet visar en dataflödespipeline som laddar ned boken Homeros Iliaden från en webbplats och söker i texten för att matcha enskilda ord med ord som är det första ordets tecken omvända. Bildandet av dataflödespipelinen i det här dokumentet består av följande steg:
Skapa de dataflödesblock som ingår i pipelinen.
Anslut varje dataflödesblock till nästa block i pipelinen. Varje block får som indata utdata från föregående block i pipelinen.
För varje dataflödesblock skapar du en fortsättningsaktivitet som anger nästa block till slutfört tillstånd när föregående block har slutförts.
Skicka data i början av pipelinen.
Markera pipelinens huvud som slutfört.
Vänta tills pipelinen har slutfört allt sitt arbete.
Förutsättningar
Läs Dataflöde innan du påbörjar den här genomgången.
Skapa ett konsolprogram
Skapa ett Visual C#- eller Visual Basic Console Application-projekt i Visual Studio. Installera NuGet-paketet System.Threading.Tasks.Dataflow.
Anmärkning
TPL-dataflödesbiblioteket (System.Threading.Tasks.Dataflow-namnområdet) distribueras inte med .NET. Om du vill installera System.Threading.Tasks.Dataflow-namnområdet i Visual Studio öppnar du projektet, väljer Hantera NuGet-paket från menyn Project och söker online efter System.Threading.Tasks.Dataflow-paketet. Alternativt, för att installera det med hjälp av .NET Core CLI, kör dotnet add package System.Threading.Tasks.Dataflow.
Lägg till följande kod i projektet för att skapa det grundläggande programmet.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Skapa dataflödesblocken
Lägg till följande kod i Main-metoden för att skapa de dataflödesblock som ingår i pipelinen. Tabellen som följer sammanfattar rollen för varje medlem i pipelinen.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
| Medlem | Typ | Beskrivning |
|---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Laddar ned boktexten från webben. |
createWordList |
TransformBlock<TInput,TOutput> | Separerar boktexten i en matris med ord. |
filterWordList |
TransformBlock<TInput,TOutput> | Tar bort korta ord och dubbletter från ordmatrisen. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Hittar alla ord i den filtrerade samlingen av ordmatriser vars omvända också finns i ordmatrisen. |
printReversedWords |
ActionBlock<TInput> | Visar ord och motsvarande omvända ord i konsolen. |
Även om du kan kombinera flera steg i dataflödespipelinen i det här exemplet i ett steg, illustrerar exemplet begreppet att skapa flera oberoende dataflödesuppgifter för att utföra en större uppgift. I exemplet används TransformBlock<TInput,TOutput> för att möjliggöra för varje medlem i pipelinen att utföra en operation på sina indata och skicka resultatet till nästa steg i pipelinen. Den #D0 medlemmen i pipelinen är ett #D1 objekt eftersom det genererar flera oberoende utdata för varje indata. Pipelinens slut, printReversedWords, är ett ActionBlock<TInput>-objekt eftersom det utför en åtgärd på dess indata och inte ger något resultat.
Skapa pipelinen
Lägg till följande kod för att ansluta varje block till nästa block i pipelinen.
När du anropar metoden #D0 för att ansluta ett källdataflödesblock till ett måldataflödesblock, sprider källdataflödesblocket data till målblocket när data blir tillgängliga. Om du även anger DataflowLinkOptions med PropagateCompletion inställt på sant, kommer lyckat eller misslyckat slutförande av ett block i pipelinen att göra att nästa block i pipelinen slutförs.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Publicera data till pipelinen
Lägg till följande kod för att publicera URL:en för boken Homers Iliad till huvudet av dataflödespipelinen.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
I det här exemplet används DataflowBlock.Post för att synkront skicka data till toppen av pipelinen. Använd metoden DataflowBlock.SendAsync när du måste skicka data asynkront till en dataflödesnod.
Slutförande av pipelineaktivitet
Lägg till följande kod för att markera pipelinens huvud som slutfört. Huvudet av pipelinen meddelar att den är klar när den har bearbetat alla buffrade meddelanden.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
Det här exemplet skickar en URL via dataflödespipelinen som ska bearbetas. Om du skickar fler än en indata via en pipeline anropar du metoden IDataflowBlock.Complete när du har skickat alla indata. Du kan utelämna det här steget om ditt program inte har någon väldefinierad punkt där data inte längre är tillgängliga eller om programmet inte behöver vänta tills pipelinen har slutförts.
Väntar på att pipelinen ska slutföras
Lägg till följande kod för att vänta tills rörledningen har slutförts. Den övergripande processen är klar när slutet av pipelinen är nådd.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Du kan vänta på att dataflödet ska slutföras från valfri tråd eller från flera trådar samtidigt.
Det fullständiga exemplet
I följande exempel visas den fullständiga koden för den här genomgången.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Nästa steg
Det här exemplet skickar en URL för att bearbeta via dataflödespipelinen. Om du skickar fler än ett indatavärde via en pipeline kan du introducera en form av parallellitet i ditt program som liknar hur delar kan röra sig genom en bilfabrik. När den första medlemmen i pipelinen skickar sitt resultat till den andra medlemmen kan den bearbeta ett annat objekt parallellt eftersom den andra medlemmen bearbetar det första resultatet.
Parallelliteten som uppnås med hjälp av dataflödespipelines kallas grovkornig parallellism eftersom den vanligtvis består av färre, större uppgifter. Du kan också använda en mer finare uppdelad parallellism av mindre, kortvariga uppgifter i en dataflödespipeline. I det här exemplet använder findReversedWords-medlemmen i pipelinen PLINQ för att bearbeta flera objekt i arbetslistan parallellt. Användningen av finfördelad parallellitet i en grovkornig pipeline kan förbättra den övergripande genomströmningen.
Du kan också ansluta ett källdataflödesblock till flera målblock för att skapa ett dataflödesnätverk. Den överlagrade versionen av metoden #D0 tar ett #D1 objekt som definierar om målblocket accepterar varje meddelande baserat på dess värde. De flesta typer av dataflödesblock som fungerar som källor erbjuder meddelanden till alla anslutna målblock, i den ordning de var anslutna, tills något av blocken accepterar meddelandet. Genom att använda den här filtreringsmekanismen kan du skapa system med anslutna dataflödesblock som dirigerar vissa data via en sökväg och andra data via en annan sökväg. För ett exempel som använder filtrering för att skapa ett dataflödesnätverk, se Genomgång: Använda dataflöde i ett Windows Forms-program.