Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Hoewel u de DataflowBlock.Receive, DataflowBlock.ReceiveAsync en DataflowBlock.TryReceive methoden kunt gebruiken om berichten van bronblokken te ontvangen, kunt u ook berichtblokken verbinden om een gegevenspijplijn te vormen. Een gegevensstroom is een reeks componenten ofwel gegevensstroomblokken, waarvan elk een specifieke taak uitvoert die bijdraagt aan een groter doel. Elk gegevensstroomblok in een gegevensstroompijplijn werkt wanneer er een bericht wordt ontvangen van een ander gegevensstroomblok. Een analogie hiervan is een assemblagelijn voor de productie van auto's. Wanneer elk voertuig door de montagelijn gaat, wordt het frame door één station gemonteerd, de volgende installeert de motor, enzovoort. Omdat met een montagelijn meerdere voertuigen tegelijkertijd kunnen worden gemonteerd, biedt deze een betere doorvoer dan het monteren van complete voertuigen één voor één.
In dit document wordt een gegevensstroompijplijn gedemonstreerd die het boek The Iliad of Homer van een website downloadt en de tekst doorzoekt om afzonderlijke woorden te koppelen aan woorden die de letters van het eerste woord omdraaien. De vorming van de gegevensstroompijplijn in dit document bestaat uit de volgende stappen:
Creëer de dataflow-blokken die deelnemen aan de pijplijn.
Verbind elk gegevensstroomblok met het volgende blok in de pijplijn. Elk blok ontvangt als invoer de uitvoer van het vorige blok in de pijplijn.
Maak voor elk gegevensstroomblok een vervolgtaak waarmee het volgende blok wordt ingesteld op de voltooide status nadat het vorige blok is voltooid.
Plaats gegevens op het hoofd van de pijplijn.
Markeer de kop van de pijplijn als voltooid.
Wacht totdat de pijplijn al het werk heeft voltooid.
Vereiste voorwaarden
Lees Dataflow voordat u aan deze handleiding begint.
Een consoletoepassing maken
Maak in Visual Studio een Visual C# of Visual Basic Console Application-project. Installeer het NuGet-pakket System.Threading.Tasks.Dataflow.
Notitie
De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow-naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow-pakket. Om deze te installeren met behulp van de .NET Core CLI, voert u dotnet add package System.Threading.Tasks.Dataflowuit.
Voeg de volgende code toe aan uw project om de basistoepassing te maken.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
De gegevensstroomblokken maken
Voeg de volgende code toe aan de Main-methode om de datastroomblokken te maken die deelnemen aan de pijplijn. De volgende tabel bevat een overzicht van de rol van elk lid van de pijplijn.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
| Lid | Typ | Beschrijving |
|---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Hiermee downloadt u de boektekst van het web. |
createWordList |
TransformBlock<TInput,TOutput> | Scheidt de boektekst in een matrix met woorden. |
filterWordList |
TransformBlock<TInput,TOutput> | Hiermee verwijdert u korte woorden en duplicaten uit de woordmatrix. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Hiermee vindt u alle woorden in de gefilterde woordmatrixverzameling waarvan de omgekeerde ook voorkomt in de woordmatrix. |
printReversedWords |
ActionBlock<TInput> | Geeft woorden en de bijbehorende omgekeerde woorden weer op de console. |
Hoewel u in dit voorbeeld meerdere stappen in de gegevensstroompijplijn in één stap kunt combineren, illustreert het voorbeeld het concept van het opstellen van meerdere onafhankelijke gegevensstroomtaken om een grotere taak uit te voeren. In het voorbeeld wordt #D0 gebruikt om elk lid van de pijplijn in staat te stellen een bewerking uit te voeren op de invoergegevens en de resultaten te verzenden naar de volgende stap in de pijplijn. Het #D0 lid van de pijplijn is een #D1 object omdat er meerdere onafhankelijke uitvoer voor elke invoer wordt geproduceerd. De staart van de pijplijn, printReversedWords, is een ActionBlock<TInput> object omdat het een actie uitvoert op de invoer en geen resultaat produceert.
De pijplijn vormen
Voeg de volgende code toe om elk blok te verbinden met het volgende blok in de pijplijn.
Wanneer u de #D0 methode aanroept om een brongegevensstroomblok te verbinden met een doelgegevensstroomblok, worden gegevens door het brongegevensstroomblok doorgegeven aan het doelblok zodra er gegevens beschikbaar komen. Als u ook #D0 met #D1 ingesteld op waar aanbiedt, zal het geslaagd of mislukt voltooien van één blok in de pijplijn leiden tot de voltooiing van het volgende blok in de pijplijn.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Gegevens naar de pijplijn sturen
Voeg de volgende code toe om de URL van het boek The Iliad of Homer naar het begin van de gegevenspijplijn te verzenden.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
In dit voorbeeld wordt DataflowBlock.Post gebruikt om gegevens synchroon naar het hoofd van de pijplijn te verzenden. Gebruik de DataflowBlock.SendAsync methode wanneer u asynchroon gegevens moet verzenden naar een datastroomknooppunt.
Pijplijnactiviteit voltooien
Voeg de volgende code toe om het hoofd van de pijplijn te markeren als voltooid. De kop van de pijplijn geeft zijn voltooiing door nadat alle gebufferde berichten zijn verwerkt.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
In dit voorbeeld wordt één URL verzonden via de gegevensstroompijplijn die moet worden verwerkt. Als u meer dan één invoer via een pijplijn verzendt, roept u de IDataflowBlock.Complete methode aan nadat u alle invoer hebt ingediend. U kunt deze stap weglaten als uw toepassing geen goed gedefinieerd punt heeft waarop gegevens niet meer beschikbaar zijn of als de toepassing niet hoeft te wachten tot de pijplijn is voltooid.
Wachten tot de pijplijn is voltooid
Voeg de volgende code toe om te wachten tot de pijplijn is voltooid. De algehele bewerking is voltooid wanneer de staart van de pijplijn is voltooid.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
U kunt wachten op voltooiing van de gegevensstroom vanuit een thread of van meerdere threads tegelijk.
Het volledige voorbeeld
In het volgende voorbeeld ziet u de volledige code voor dit scenario.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Volgende stappen
In dit voorbeeld wordt één URL verzonden die moet worden verwerkt via de gegevensstroompijplijn. Als u meer dan één invoerwaarde via een pijplijn verzendt, kunt u een vorm van parallelle uitvoering in uw toepassing introduceren die lijkt op hoe onderdelen door een autofabriek kunnen worden verplaatst. Wanneer het eerste lid van de pijplijn het resultaat naar het tweede lid verzendt, kan het een ander item parallel verwerken terwijl het tweede lid het eerste resultaat verwerkt.
Het parallellisme dat wordt bereikt met behulp van datastroompijplijnen wordt grof parallellisme genoemd, omdat het doorgaans uit minder, grotere taken bestaat. U kunt ook een meer fijner gestructureerde parallelle verwerking van kleinere, kortlopende taken in een dataflowpijplijn gebruiken. In dit voorbeeld gebruikt het findReversedWords-lid van de pijplijn PLINQ om meerdere items in de werklijst parallel te verwerken. Het gebruik van fijnmazige parallelle uitvoering in een grof korrelige pijplijn kan de totale doorvoer verbeteren.
U kunt ook een brongegevensstroomblok verbinden met meerdere doelblokken om een dataflownetwerk te maken. De overbelaste versie van de methode #D0 gebruikt een #D1 object dat bepaalt of het doelblok elk bericht accepteert op basis van de waarde. De meeste typen gegevensstroomblokken die fungeren als bronnen bieden berichten aan alle verbonden doelblokken, in de volgorde waarin ze zijn verbonden, totdat een van de blokken dat bericht accepteert. Met behulp van dit filtermechanisme kunt u systemen van verbonden gegevensstroomblokken maken die bepaalde gegevens door het ene pad en andere gegevens door een ander pad leiden. Zie Stapsgewijze instructie: Gegevensstroom gebruiken in een Windows Forms-toepassing voor een voorbeeld waarin het gebruik van filters wordt getoond om een gegevensstroomnetwerk te creëren.