Dela via


Anvisningar: Implementera olika Producer-Consumer mönster

Det här avsnittet beskriver hur du implementerar producent-konsument-mönstret i ditt program. I det här mönstret skickar producenten meddelanden till ett meddelandeblock och konsumenten läser meddelanden från det blocket.

Ämnet visar två scenarier. I det första scenariot måste konsumenten ta emot varje meddelande som producenten skickar. I det andra scenariot söker konsumenten regelbundet efter data och behöver därför inte ta emot varje meddelande.

Båda exemplen i det här avsnittet använder agenter, meddelandeblock och meddelandeöverföringsfunktioner för att överföra meddelanden från producenten till konsumenten. Producentagenten använder funktionen concurrency::send för att skriva meddelanden till ett concurrency::ITarget objekt. Konsumentagenten använder funktionen concurrency::receive för att läsa meddelanden från ett concurrency::ISource-objekt. Båda agenterna har ett kontrollvärde för att samordna slutet av bearbetningen.

Mer information om asynkrona agenter finns i Asynkrona agenter. Mer information om meddelandeblock och funktioner för meddelandeöverföring finns i Asynkrona meddelandeblock och funktioner för meddelandeöverföring.

Exempel: Skicka nummerserier till konsumentagenten

I det här exemplet skickar producentagenten en serie tal till konsumentagenten. Konsumenten tar emot vart och ett av dessa tal och beräknar sitt genomsnitt. Programmet skriver medelvärdet till konsolen.

I det här exemplet används ett samtidighetsobjekt::unbounded_buffer för att göra det möjligt för producenten att köa meddelanden. Klassen unbounded_buffer implementerar ITarget och ISource så att producenten och konsumenten kan skicka och ta emot meddelanden till och från en delad buffert. Funktionerna send och receive samordnar uppgiften att sprida data från producenten till konsumenten.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Det här exemplet genererar följande utdata.

The average is 50.

Exempel: Skicka en serie med aktiekurser till konsumentagenten

I det här exemplet skickar producentagenten en serie aktiekurser till konsumentagenten. Konsumentagenten läser regelbundet det aktuella citatet och skriver ut det till konsolen.

Det här exemplet liknar det föregående, förutom att det använder ett samtidighetsobjekt::overwrite_buffer för att göra det möjligt för producenten att dela ett meddelande med konsumenten. Precis som i föregående exempel tillämpar overwrite_buffer-klassen ITarget och ISource så att producenten och konsumenten kan arbeta med en delad meddelandebuffert.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Det här exemplet genererar följande exempelutdata.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Till skillnad från ett unbounded_buffer objekt receive tar funktionen inte bort meddelandet från objektet overwrite_buffer . Om konsumenten läser från meddelandebufferten mer än en gång innan producenten skriver över meddelandet får mottagaren samma meddelande varje gång.

Kompilera koden

Kopiera exempelkoden och klistra in den i ett Visual Studio-projekt, eller klistra in den i en fil med namnet producer-consumer.cpp och kör sedan följande kommando i ett Visual Studio-kommandotolkfönster.

cl.exe /EHsc producer-consumer.cpp

Se även

Asynkront agentbibliotek
Asynkrona agenter
Asynkrona meddelandeblock
Funktioner för meddelandeöverföring