Dela via


Genomgång: Skapa en dataflödesagent

Det här dokumentet visar hur du skapar agentbaserade program som baseras på dataflöde i stället för kontrollflöde.

Kontrollflödet refererar till körningsordningen för åtgärder i ett program. Kontrollflödet regleras med hjälp av kontrollstrukturer som villkorssatser, loopar och så vidare. Alternativt refererar dataflödet till en programmeringsmodell där beräkningar endast görs när alla nödvändiga data är tillgängliga. Programmeringsmodellen för dataflöde är relaterad till begreppet meddelandeöverföring, där oberoende komponenter i ett program kommunicerar med varandra genom att skicka meddelanden.

Asynkrona agenter stöder både programmeringsmodellerna för kontrollflöde och dataflöde. Även om kontrollflödesmodellen är lämplig i många fall är dataflödesmodellen lämplig i andra, till exempel när en agent tar emot data och utför en åtgärd som baseras på nyttolasten för dessa data.

Förutsättningar

Läs följande dokument innan du påbörjar den här genomgången:

Sektioner

Den här genomgången innehåller följande avsnitt:

Skapa en grundläggande Control-Flow-agent

Tänk på följande exempel som definierar control_flow_agent klassen. Klassen control_flow_agent fungerar på tre meddelandebuffertar: en indatabuffert och två utdatabuffertar. Metoden run läser från källmeddelandebufferten i en loop och använder en villkorssats för att dirigera flödet av programkörning. Agenten ökar en räknare för negativa värden som inte är noll och ökar en annan räknare för positiva värden som inte är noll. När agenten har fått sentinel-värdet noll skickar den värdena för räknarna till utdatameddelandebuffertarna. Med negatives metoderna och positives kan programmet läsa antalet negativa och positiva värden från agenten.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Även om det här exemplet använder kontrollflödet i en agent på ett grundläggande sätt, visar det den seriella karaktären hos kontrollflödesbaserad programmering. Varje meddelande måste bearbetas sekventiellt, även om flera meddelanden kan vara tillgängliga i indatameddelandebufferten. Dataflödesmodellen gör att båda grenarna i villkorssatsen kan utvärderas samtidigt. Med dataflödesmodellen kan du också skapa mer komplexa meddelandenätverk som fungerar på data när de blir tillgängliga.

[Topp]

Skapa en Basic Dataflow-agent

Det här avsnittet visar hur du konverterar control_flow_agent klassen till att använda dataflödesmodellen för att utföra samma uppgift.

Dataflödesagenten fungerar genom att skapa ett nätverk av meddelandebuffertar, som var och en har ett specifikt syfte. Vissa meddelandeblock använder en filterfunktion för att acceptera eller avvisa ett meddelande på grundval av dess nyttolast. En filterfunktion säkerställer att ett meddelandeblock endast tar emot vissa värden.

Konvertera kontrollflödesagenten till en dataflödesagent

  1. Kopiera texten i control_flow_agent klassen till en annan klass, till exempel dataflow_agent. Du kan också byta namn på control_flow_agent klassen.

  2. Ta bort brödtexten i loopen som anropar receive från run -metoden.

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. run I -metoden lägger du till ett negative_count objekt som spårar antalet aktiva åtgärder efter initieringen av variablerna positive_count och countdown_event.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Klassen countdown_event visas senare i det här avsnittet.

  1. Skapa de meddelandebuffertobjekt som ska ingå i dataflödesnätverket.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Anslut meddelandebuffertarna för att skapa ett nätverk.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Vänta tills objekten event och countdown event har angetts. Dessa händelser signalerar att agenten har tagit emot sentinel-värdet och att alla operationer har slutförts.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Följande diagram visar hela dataflödesnätverket för dataflow_agent klassen:

Dataflödesnätverket.

I följande tabell beskrivs medlemmarna i nätverket.

Medlem Beskrivning
increment_active Ett samtidighetsobjekt::transformeringsobjekt som ökar den aktiva händelseräknaren och skickar indatavärdet till resten av nätverket.
negatives, positives concurrency::call objekt som ökar antalet nummer och minskar räknaren för aktiva händelser. Objekten använder ett filter för att acceptera antingen negativa tal eller positiva tal.
sentinel Ett samtidighet::anrop-objekt som endast accepterar sentinelvärdet noll och minskar räknaren för aktiva händelser.
connector Ett samtidighet::unbounded_buffer-objekt som ansluter källmeddelandebufferten till det interna nätverket.

run Eftersom metoden anropas på en separat tråd kan andra trådar skicka meddelanden till nätverket innan nätverket är helt anslutet. Datamedlemmen _source är ett unbounded_buffer objekt som buffrar alla indata som skickas från programmet till agenten. För att säkerställa att nätverket bearbetar alla indatameddelanden länkar agenten först de interna noderna i nätverket och länkar sedan starten av nätverket, , connectortill _source datamedlemmen. Detta garanterar att meddelanden inte bearbetas när nätverket skapas.

Eftersom nätverket i det här exemplet baseras på dataflöde, i stället för på kontrollflöde, måste nätverket kommunicera med agenten att det har slutfört bearbetningen av varje indatavärde och att sentinel-noden har tagit emot sitt värde. I det här exemplet används ett countdown_event objekt för att signalera att alla indatavärden har bearbetats och ett samtidighetsobjekt::händelseobjekt för att indikera att sentinel-noden har tagit emot sitt värde. Klassen countdown_event använder ett event objekt för att signalera när ett räknarvärde når noll. Chefen för dataflödesnätverket ökar räknaren varje gång den tar emot ett värde. Varje terminalnod i nätverket minskar räknaren när den har bearbetat indatavärdet. När agenten har skapat dataflödesnätverket väntar den på att sentinel-noden ska ange event objektet och att countdown_event objektet ska signalera att dess räknare har nått noll.

I följande exempel visas klasserna control_flow_agent, dataflow_agentoch countdown_event . Funktionen wmain skapar ett control_flow_agent och ett dataflow_agent objekt och använder send_values funktionen för att skicka en serie slumpmässiga värden till agenterna.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

I det här exemplet skapas följande exempelutdata:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Kompilera koden

Kopiera exempelkoden och klistra in den i ett Visual Studio-projekt, eller klistra in den i en fil med namnet dataflow-agent.cpp och kör sedan följande kommando i ett Visual Studio-kommandotolkfönster.

cl.exe /EHsc dataflow-agent.cpp

[Topp]

Skapa en Message-Logging-agent

I följande exempel visas log_agent klassen, som liknar dataflow_agent klassen. Klassen log_agent implementerar en asynkron loggningsagent som skriver loggmeddelanden till en fil och till konsolen. Klassen log_agent gör det möjligt för programmet att kategorisera meddelanden som information, varning eller fel. Det gör också att programmet kan ange om varje loggkategori skrivs till en fil, konsolen eller båda. Det här exemplet skriver alla loggmeddelanden till en fil och endast felmeddelanden till konsolen.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

I det här exemplet skrivs följande utdata till konsolen.

error: This is a sample error message.

I det här exemplet skapas även filen log.txt, som innehåller följande text.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Kompilera koden

Kopiera exempelkoden och klistra in den i ett Visual Studio-projekt, eller klistra in den i en fil med namnet log-filter.cpp och kör sedan följande kommando i ett Visual Studio-kommandotolkfönster.

cl.exe /EHsc log-filter.cpp

[Topp]

Se även

Genomgång av samtidighetskörning