Delen via


Overzicht: Een gegevensstroompijplijn maken

Hoewel u de DataflowBlock.Receive, DataflowBlock.ReceiveAsync en DataflowBlock.TryReceive methoden kunt gebruiken om berichten van bronblokken te ontvangen, kunt u ook berichtblokken verbinden om een gegevenspijplijn te vormen. Een gegevensstroom is een reeks componenten ofwel gegevensstroomblokken, waarvan elk een specifieke taak uitvoert die bijdraagt aan een groter doel. Elk gegevensstroomblok in een gegevensstroompijplijn werkt wanneer er een bericht wordt ontvangen van een ander gegevensstroomblok. Een analogie hiervan is een assemblagelijn voor de productie van auto's. Wanneer elk voertuig door de montagelijn gaat, wordt het frame door één station gemonteerd, de volgende installeert de motor, enzovoort. Omdat met een montagelijn meerdere voertuigen tegelijkertijd kunnen worden gemonteerd, biedt deze een betere doorvoer dan het monteren van complete voertuigen één voor één.

In dit document wordt een gegevensstroompijplijn gedemonstreerd die het boek The Iliad of Homer van een website downloadt en de tekst doorzoekt om afzonderlijke woorden te koppelen aan woorden die de letters van het eerste woord omdraaien. De vorming van de gegevensstroompijplijn in dit document bestaat uit de volgende stappen:

  1. Creëer de dataflow-blokken die deelnemen aan de pijplijn.

  2. Verbind elk gegevensstroomblok met het volgende blok in de pijplijn. Elk blok ontvangt als invoer de uitvoer van het vorige blok in de pijplijn.

  3. Maak voor elk gegevensstroomblok een vervolgtaak waarmee het volgende blok wordt ingesteld op de voltooide status nadat het vorige blok is voltooid.

  4. Plaats gegevens op het hoofd van de pijplijn.

  5. Markeer de kop van de pijplijn als voltooid.

  6. Wacht totdat de pijplijn al het werk heeft voltooid.

Vereiste voorwaarden

Lees Dataflow voordat u aan deze handleiding begint.

Een consoletoepassing maken

Maak in Visual Studio een Visual C# of Visual Basic Console Application-project. Installeer het NuGet-pakket System.Threading.Tasks.Dataflow.

Notitie

De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow-naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow-pakket. Om deze te installeren met behulp van de .NET Core CLI, voert u dotnet add package System.Threading.Tasks.Dataflowuit.

Voeg de volgende code toe aan uw project om de basistoepassing te maken.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

De gegevensstroomblokken maken

Voeg de volgende code toe aan de Main-methode om de datastroomblokken te maken die deelnemen aan de pijplijn. De volgende tabel bevat een overzicht van de rol van elk lid van de pijplijn.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Lid Typ Beschrijving
downloadString TransformBlock<TInput,TOutput> Hiermee downloadt u de boektekst van het web.
createWordList TransformBlock<TInput,TOutput> Scheidt de boektekst in een matrix met woorden.
filterWordList TransformBlock<TInput,TOutput> Hiermee verwijdert u korte woorden en duplicaten uit de woordmatrix.
findReversedWords TransformManyBlock<TInput,TOutput> Hiermee vindt u alle woorden in de gefilterde woordmatrixverzameling waarvan de omgekeerde ook voorkomt in de woordmatrix.
printReversedWords ActionBlock<TInput> Geeft woorden en de bijbehorende omgekeerde woorden weer op de console.

Hoewel u in dit voorbeeld meerdere stappen in de gegevensstroompijplijn in één stap kunt combineren, illustreert het voorbeeld het concept van het opstellen van meerdere onafhankelijke gegevensstroomtaken om een grotere taak uit te voeren. In het voorbeeld wordt #D0 gebruikt om elk lid van de pijplijn in staat te stellen een bewerking uit te voeren op de invoergegevens en de resultaten te verzenden naar de volgende stap in de pijplijn. Het #D0 lid van de pijplijn is een #D1 object omdat er meerdere onafhankelijke uitvoer voor elke invoer wordt geproduceerd. De staart van de pijplijn, printReversedWords, is een ActionBlock<TInput> object omdat het een actie uitvoert op de invoer en geen resultaat produceert.

De pijplijn vormen

Voeg de volgende code toe om elk blok te verbinden met het volgende blok in de pijplijn.

Wanneer u de #D0 methode aanroept om een brongegevensstroomblok te verbinden met een doelgegevensstroomblok, worden gegevens door het brongegevensstroomblok doorgegeven aan het doelblok zodra er gegevens beschikbaar komen. Als u ook #D0 met #D1 ingesteld op waar aanbiedt, zal het geslaagd of mislukt voltooien van één blok in de pijplijn leiden tot de voltooiing van het volgende blok in de pijplijn.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Gegevens naar de pijplijn sturen

Voeg de volgende code toe om de URL van het boek The Iliad of Homer naar het begin van de gegevenspijplijn te verzenden.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

In dit voorbeeld wordt DataflowBlock.Post gebruikt om gegevens synchroon naar het hoofd van de pijplijn te verzenden. Gebruik de DataflowBlock.SendAsync methode wanneer u asynchroon gegevens moet verzenden naar een datastroomknooppunt.

Pijplijnactiviteit voltooien

Voeg de volgende code toe om het hoofd van de pijplijn te markeren als voltooid. De kop van de pijplijn geeft zijn voltooiing door nadat alle gebufferde berichten zijn verwerkt.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

In dit voorbeeld wordt één URL verzonden via de gegevensstroompijplijn die moet worden verwerkt. Als u meer dan één invoer via een pijplijn verzendt, roept u de IDataflowBlock.Complete methode aan nadat u alle invoer hebt ingediend. U kunt deze stap weglaten als uw toepassing geen goed gedefinieerd punt heeft waarop gegevens niet meer beschikbaar zijn of als de toepassing niet hoeft te wachten tot de pijplijn is voltooid.

Wachten tot de pijplijn is voltooid

Voeg de volgende code toe om te wachten tot de pijplijn is voltooid. De algehele bewerking is voltooid wanneer de staart van de pijplijn is voltooid.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

U kunt wachten op voltooiing van de gegevensstroom vanuit een thread of van meerdere threads tegelijk.

Het volledige voorbeeld

In het volgende voorbeeld ziet u de volledige code voor dit scenario.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Volgende stappen

In dit voorbeeld wordt één URL verzonden die moet worden verwerkt via de gegevensstroompijplijn. Als u meer dan één invoerwaarde via een pijplijn verzendt, kunt u een vorm van parallelle uitvoering in uw toepassing introduceren die lijkt op hoe onderdelen door een autofabriek kunnen worden verplaatst. Wanneer het eerste lid van de pijplijn het resultaat naar het tweede lid verzendt, kan het een ander item parallel verwerken terwijl het tweede lid het eerste resultaat verwerkt.

Het parallellisme dat wordt bereikt met behulp van datastroompijplijnen wordt grof parallellisme genoemd, omdat het doorgaans uit minder, grotere taken bestaat. U kunt ook een meer fijner gestructureerde parallelle verwerking van kleinere, kortlopende taken in een dataflowpijplijn gebruiken. In dit voorbeeld gebruikt het findReversedWords-lid van de pijplijn PLINQ om meerdere items in de werklijst parallel te verwerken. Het gebruik van fijnmazige parallelle uitvoering in een grof korrelige pijplijn kan de totale doorvoer verbeteren.

U kunt ook een brongegevensstroomblok verbinden met meerdere doelblokken om een dataflownetwerk te maken. De overbelaste versie van de methode #D0 gebruikt een #D1 object dat bepaalt of het doelblok elk bericht accepteert op basis van de waarde. De meeste typen gegevensstroomblokken die fungeren als bronnen bieden berichten aan alle verbonden doelblokken, in de volgorde waarin ze zijn verbonden, totdat een van de blokken dat bericht accepteert. Met behulp van dit filtermechanisme kunt u systemen van verbonden gegevensstroomblokken maken die bepaalde gegevens door het ene pad en andere gegevens door een ander pad leiden. Zie Stapsgewijze instructie: Gegevensstroom gebruiken in een Windows Forms-toepassing voor een voorbeeld waarin het gebruik van filters wordt getoond om een gegevensstroomnetwerk te creëren.

Zie ook