Dela via


Metodtips i biblioteket för asynkrona agenter

I det här dokumentet beskrivs hur du effektivt använder biblioteket asynkrona agenter. Agentbiblioteket höjer upp en aktörsbaserad programmeringsmodell och en processbaserad meddelandeöverföring för grovkorniga dataflöden och pipelining-uppgifter.

Mer information om agentbiblioteket finns i Asynkront agentbibliotek.

Sektioner

Det här dokumentet innehåller följande avsnitt:

Använd agenter för att isolera tillstånd

Agentbiblioteket tillhandahåller alternativ till delat tillstånd genom att du kan ansluta isolerade komponenter via en asynkron mekanism för meddelandeöverföring. Asynkrona agenter är mest effektiva när de isolerar sitt interna tillstånd från andra komponenter. Genom att isolera tillstånd agerar flera komponenter vanligtvis inte på delade data. Tillståndsisolering kan göra det möjligt för ditt program att skala eftersom det minskar konkurrensen om delat minne. Tillståndsisolering minskar också risken för dödläge och konkurrensvillkor eftersom komponenter inte behöver synkronisera åtkomst till delade data.

Du isolerar vanligtvis tillstånd i en agent genom att hålla datamedlemmar i agentklassens private eller protected avsnitten och genom att använda meddelandebuffertar för att kommunicera tillståndsändringar. I följande exempel visas klassen basic_agent, som härleds från concurrency::agent. Klassen basic_agent använder två meddelandebuffertar för att kommunicera med externa komponenter. En meddelandebuffert innehåller inkommande meddelanden. den andra meddelandebufferten innehåller utgående meddelanden.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Fullständiga exempel på hur du definierar och använder agenter finns i Genomgång: Skapa ett Agent-Based-program och genomgång: Skapa en dataflödesagent.

[Topp]

Använd en begränsningsmekanism för att begränsa antalet meddelanden i en datapipeline

Många typer av meddelandebuffertar, till exempel samtidighet::unbounded_buffer, kan innehålla ett obegränsat antal meddelanden. När en meddelandeproducent skickar meddelanden till en datapipeline snabbare än konsumenten kan bearbeta dessa meddelanden kan programmet ange ett tillstånd med lite minne eller slut på minne. Du kan använda en begränsningsmekanism, till exempel en semafor, för att begränsa antalet meddelanden som samtidigt är aktiva i en datapipeline.

Följande grundläggande exempel visar hur du använder en semafor för att begränsa antalet meddelanden i en datapipeline. Datapipelinen använder funktionen concurrency::wait för att simulera en åtgärd som tar minst 100 millisekunder. Eftersom avsändaren genererar meddelanden snabbare än konsumenten kan bearbeta dessa meddelanden definierar semaphore det här exemplet klassen så att programmet kan begränsa antalet aktiva meddelanden.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Objektet semaphore begränsar pipelinen till att bearbeta högst två meddelanden samtidigt.

Producenten i det här exemplet skickar relativt få meddelanden till konsumenten. Därför visar det här exemplet inte på ett potentiellt lågminnes- eller minnesbristtillstånd. Den här mekanismen är dock användbar när en datapipeline innehåller ett relativt stort antal meddelanden.

Mer information om hur du skapar semaforklassen som används i det här exemplet finns i How to: Use the Context Class to Implement a Cooperative Semaphore (Använda kontextklassen för att implementera en kooperativ semafor).

[Topp]

Utför inte Fine-Grained-arbete i en datapipeline

Agentbiblioteket är mest användbart när det arbete som utförs av en datapipeline är ganska grovt kornigt. En programkomponent kan till exempel läsa data från en fil eller en nätverksanslutning och ibland skicka dessa data till en annan komponent. Protokollet som agentbiblioteket använder för att sprida meddelanden gör att mekanismen för meddelandeöverföring har mer omkostnader än de parallella aktivitetskonstruktioner som tillhandahålls av PPL ( Parallel Patterns Library ). Se därför till att det arbete som utförs av en datapipeline är tillräckligt långt för att kompensera för den här kostnaden.

Även om en data-pipeline är mest effektiv när dess uppgifter är grovkorniga kan varje steg i data-pipelinen använda PPL-konstruktioner såsom aktivitetsgrupper och parallella algoritmer för att genomföra mer finkornigt arbete. Ett exempel på ett grovt kornigt datanätverk som använder detaljerad parallellitet i varje bearbetningssteg finns i Genomgång: Skapa ett Image-Processing nätverk.

[Topp]

Skicka inte stora meddelandenyttolaster efter värde

I vissa fall skapar programkörningen en kopia av varje meddelande som skickas från en meddelandebuffert till en annan meddelandebuffert. Till exempel erbjuder klassen concurrency::overwrite_buffer en kopia av varje meddelande som den tar emot till vart och ett av dess mål. Körmiljön skapar också en kopia av meddelandedata när du använder funktioner för meddelandeöverföring, som concurrency::send och concurrency::receive, för att skriva och läsa meddelanden från en meddelandebuffert. Även om den här mekanismen hjälper till att eliminera risken för att samtidigt skriva till delade data kan det leda till sämre minnesprestanda när meddelandets nyttolast är relativt stor.

Du kan använda pekare eller referenser för att förbättra minnesprestanda när du skickar meddelanden som har en stor nyttolast. I följande exempel jämförs överföring av stora meddelanden efter värde med att skicka pekare till samma meddelandetyp. Exemplet definierar två agenttyper, producer och consumer, som agerar på message_data-objekt. I exemplet jämförs den tid som krävs för att producenten ska skicka flera message_data objekt till konsumenten med den tid som krävs för att producentagenten ska skicka flera pekare till message_data objekt till konsumenten.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

I det här exemplet skapas följande exempelutdata:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Den version som använder pekare presterar bättre eftersom den eliminerar kravet på att körtid ska skapa en fullständig kopia av varje objekt message_data som skickas från producenten till konsumenten.

[Topp]

Använda shared_ptr i ett datanätverk när ägarskapet är odefinierat

När du skickar meddelanden med pekare via en pipeline eller ett nätverk som skickar meddelanden allokerar du vanligtvis minnet för varje meddelande längst fram i nätverket och frigör det minnet i slutet av nätverket. Även om den här mekanismen ofta fungerar bra finns det fall där det är svårt eller inte möjligt att använda den. Tänk till exempel på det fall då datanätverket innehåller flera slutnoder. I det här fallet finns det ingen tydlig plats för att frigöra minnet för meddelandena.

För att lösa det här problemet kan du använda en mekanism, till exempel std::shared_ptr, som gör att en pekare kan ägas av flera komponenter. När det sista shared_ptr objektet som äger en resurs förstörs frigörs även resursen.

I följande exempel visas hur du använder shared_ptr för att dela pekarvärden mellan flera meddelandebuffertar. Exemplet ansluter ett concurrency::overwrite_buffer-objekt till tre concurrency::call-objekt. Klassen overwrite_buffer erbjuder meddelanden till vart och ett av sina mål. Eftersom det finns flera ägare av data i slutet av datanätverket används shared_ptr det här exemplet för att göra det möjligt för varje call objekt att dela ägarskapet för meddelandena.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

I det här exemplet skapas följande exempelutdata:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Se även

Bästa praxis för samtidighetskörning
Asynkront agentbibliotek
Genomgång: Skapa ett Agent-Based-program
Genomgång: Skapa en dataflödesagent
Genomgång: Skapa ett Image-Processing nätverk
Bästa praxis i Biblioteket för parallella mönster
Allmänna metodtips i Concurrency Runtime