Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Det här dokumentet beskriver hur du skapar en anpassad blocktyp för meddelanden som beställer inkommande meddelanden efter prioritet.
Även om de inbyggda typerna av meddelandeblock ger en mängd olika funktioner kan du skapa en egen typ av meddelandeblock och anpassa den så att den uppfyller kraven för ditt program. En beskrivning av de inbyggda typerna av meddelandeblock som tillhandahålls av biblioteket Asynkrona agenter finns i Asynkrona meddelandeblock.
Förutsättningar
Läs följande dokument innan du påbörjar den här genomgången:
Sektioner
Den här genomgången innehåller följande avsnitt:
Utforma ett anpassat meddelandeblock
Meddelandeblock deltar i åtgärden att skicka och ta emot meddelanden. Ett meddelandeblock som skickar meddelanden kallas för ett källblock. Ett meddelandeblock som tar emot meddelanden kallas för ett målblock. Ett meddelandeblock som både skickar och tar emot meddelanden kallas för ett spridningsblock. Agentbiblioteket använder den abstrakta klassens samtidighet::ISource för att representera källblock och den abstrakta klassens samtidighet::ITarget för att representera målblock. Typer av meddelandeblock som fungerar som källor härleds från ISource; typer av meddelandeblock som fungerar som mål härleds från ITarget.
Även om du kan härleda meddelandeblocktypen direkt från ISource och ITargetdefinierar agentbiblioteket tre basklasser som utför mycket av de funktioner som är gemensamma för alla typer av meddelandeblock, till exempel att hantera fel och ansluta meddelandeblock tillsammans på ett samtidighetssäkert sätt. Klassen concurrency::source_block härleds från ISource och skickar meddelanden till andra block. Klassen concurrency::target_block härleds från ITarget och tar emot meddelanden från andra block. Klassen concurrency::propagator_block ärver från ISource och ITarget och skickar meddelanden till andra block och tar emot meddelanden från andra block. Vi rekommenderar att du använder dessa tre basklasser för att hantera infrastrukturinformation så att du kan fokusera på beteendet för meddelandeblocket.
Klasserna source_block, target_blockoch propagator_block är mallar som parametriseras på en typ som hanterar anslutningarna, eller länkarna, mellan käll- och målblock och på en typ som hanterar hur meddelanden bearbetas. Agentbiblioteket definierar två typer som utför hantering av länkar, concurrency::single_link_registry och concurrency::multi_link_registry. Klassen single_link_registry gör att ett meddelandeblock kan länkas till en källa eller till ett mål. Klassen multi_link_registry gör att ett meddelandeblock kan länkas till flera källor eller flera mål. Agentbiblioteket definierar en klass som utför meddelandehantering, konkurrens::ordered_message_processor. Klassen ordered_message_processor gör det möjligt för meddelandeblock att bearbeta meddelanden i den ordning de tar emot dem.
Tänk på följande exempel för att bättre förstå hur meddelandeblock relaterar till sina källor och mål. Det här exemplet visar deklarationen för klassen concurrency::transformer .
template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;
Klassen transformer härleds från propagator_blockoch fungerar därför både som ett källblock och som ett målblock. Den accepterar meddelanden av typen _Input och skickar meddelanden av typen _Output. Klassen transformer anger single_link_registry som länkhanterare för alla målblock och multi_link_registry som länkhanterare för alla källblock. Därför kan ett transformer objekt ha upp till ett mål och ett obegränsat antal källor.
En klass som härleds från source_block måste implementera sex metoder: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message och resume_propagation. En klass som härleds från target_block måste implementera metoden propagate_message och kan eventuellt implementera metoden send_message . Att härleda från propagator_block är funktionellt likvärdigt med härledning från både source_block och target_block.
Metoden propagate_to_any_targets anropas av körmiljön för att asynkront eller synkront bearbeta inkommande meddelanden och hantera utgående meddelanden. Metoden accept_message anropas av målblock för att acceptera meddelanden. Många typer av meddelandeblock, till exempel unbounded_buffer, skickar endast meddelanden till det första målet som skulle ta emot det. Därför överförs ägarskapet för meddelandet till målet. Andra typer av meddelandeblock, till exempel samtidighet::overwrite_buffer, erbjuder meddelanden till var och en av dess målblock. Därför overwrite_buffer skapar en kopia av meddelandet för vart och ett av dess mål.
Metoderna reserve_message, consume_message, release_messageoch resume_propagation aktiverar meddelandeblock för att delta i meddelandereservationen. Målblocken anropar metoden reserve_message när de erbjuds ett meddelande och behöver reservera det för senare användning. När ett målblock reserverar ett meddelande kan det anropa consume_message metoden för att använda meddelandet eller release_message metoden för att avbryta reservationen. Precis som accept_message med metoden kan implementeringen av consume_message antingen överföra ägarskapet för meddelandet eller returnera en kopia av meddelandet. När ett målblock antingen förbrukar eller släpper ett reserverat meddelande anropar körningen resume_propagation metoden. Den här metoden fortsätter vanligtvis meddelandespridningen, med början i nästa meddelande i kön.
Körningsmiljön anropar metoden propagate_message för att asynkront överföra ett meddelande från ett annat block till det aktuella. Metoden send_message liknar propagate_message, förutom att den synkront, i stället för asynkront, skickar meddelandet till målblocken. Standardimplementeringen av send_message avvisar alla inkommande meddelanden. Körmiljön anropar inte någon av dessa metoder om meddelandet inte passerar den valfria filterfunktionen som är associerad med målblocket. Mer information om meddelandefilter finns i Asynkrona meddelandeblock.
[Topp]
Definiera priority_buffer-klassen
Klassen priority_buffer är en typ av anpassat meddelandeblock som beställer inkommande meddelanden först efter prioritet och sedan efter den ordning i vilken meddelanden tas emot. Klassen priority_buffer liknar samtidighet::unbounded_buffer-klassen eftersom den innehåller en kö med meddelanden, och även eftersom den fungerar som både en källa och ett målmeddelandeblock och kan ha både flera källor och flera mål. Dock unbounded_buffer baserar meddelandets spridning endast på den ordning i vilken den tar emot meddelanden från sina källor.
Klassen priority_buffer tar emot meddelanden av typen std::tuppeln som innehåller PriorityType och Type element. 
              PriorityType refererar till den typ som har prioriteten för varje meddelande. Type refererar till datadelen av meddelandet. Klassen priority_buffer skickar meddelanden av typen Type. Klassen priority_buffer hanterar också två meddelandeköer: ett std::p riority_queue-objekt för inkommande meddelanden och ett std::queue-objekt för utgående meddelanden. Att beställa meddelanden efter prioritet är användbart när ett priority_buffer objekt tar emot flera meddelanden samtidigt eller när det tar emot flera meddelanden innan några meddelanden läss av konsumenter.
Förutom de sju metoder som en klass som härleds från propagator_block måste implementera, åsidosätter klassen priority_buffer även metoderna link_target_notification och send_message. Klassen priority_buffer definierar också två offentliga hjälpmetoder, enqueue och dequeue, och en privat hjälpmetod, propagate_priority_order.
Följande procedur beskriver hur du implementerar priority_buffer klassen.
Så här definierar du klassen priority_buffer
- Skapa en C++-huvudfil och ge den - priority_buffer.hnamnet . Du kan också använda en befintlig rubrikfil som ingår i projektet.
- I - priority_buffer.hlägger du till följande kod.- #pragma once #include <agents.h> #include <queue>
- stdI namnområde definierar du specialiseringar för std::less och std::greater som verkar på concurrency::message-objekt.- namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }- Klassen - priority_bufferlagrar- messageobjekt i ett- priority_queueobjekt. Med de här typspecialiseringarna kan prioritetskön sortera meddelanden efter prioritet. Prioriteten är det första elementet i- tupleobjektet.
- Deklarera klassen i - concurrencyex- priority_buffernamnområdet.- namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }- Klassen - priority_bufferhärleds från- propagator_block. Därför kan den både skicka och ta emot meddelanden. Klassen- priority_bufferkan ha flera mål som tar emot meddelanden av typen- Type. Det kan också ha flera källor som skickar meddelanden av typen- tuple<PriorityType, Type>.
- I avsnittet - privatei- priority_bufferklassen lägger du till följande medlemsvariabler.- // Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;- Objektet - priority_queueinnehåller inkommande meddelanden.- queueObjektet innehåller utgående meddelanden. Ett- priority_bufferobjekt kan ta emot flera meddelanden samtidigt.- critical_sectionObjektet synkroniserar åtkomsten till kön med indatameddelanden.
- I avsnittet - privatedefinierar du kopieringskonstruktorn och tilldelningsoperatorn. Detta förhindrar- priority_queueatt objekt kan tilldelas.- // Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
- I avsnittet - publicdefinierar du de konstruktorer som är gemensamma för många typer av meddelandeblock. Definiera även destruktor.- // Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
- I avsnittet - publicdefinierar du metoderna- enqueueoch- dequeue. De här hjälpmetoderna är ett alternativt sätt att skicka meddelanden till och ta emot meddelanden från ett- priority_bufferobjekt.- // Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
- I avsnittet - protecteddefinierar du- propagate_to_any_targetsmetoden.- // Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }- Metoden - propagate_to_any_targetsöverför meddelandet som finns längst fram i indatakön till utdatakön och sprider ut alla meddelanden i utdatakön.
- I avsnittet - protecteddefinierar du- accept_messagemetoden.- // Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }- När ett målblock anropar - accept_messagemetoden- priority_bufferöverför klassen ägarskapet för meddelandet till det första målblocket som accepterar det. (Detta liknar beteendet hos- unbounded_buffer.)
- I avsnittet - protecteddefinierar du- reserve_messagemetoden.- // Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }- Klassen - priority_buffertillåter att ett målblock reserverar ett meddelande när den angivna meddelandeidentifieraren matchar identifieraren för meddelandet som finns längst fram i kön. Med andra ord kan ett mål reservera meddelandet om- priority_bufferobjektet ännu inte har tagit emot ytterligare ett meddelande och ännu inte har spridit ut det aktuella.
- I avsnittet - protecteddefinierar du- consume_messagemetoden.- // Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }- Ett målblock anropar - consume_messageför att överföra ägarskapet för meddelandet som det reserverade.
- I avsnittet - protecteddefinierar du- release_messagemetoden.- // Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }- Ett målblock anropar - release_messageför att avbryta reservationen av ett meddelande.
- I avsnittet - protecteddefinierar du- resume_propagationmetoden.- // Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }- Körningsanropen - resume_propagationefter ett målblock förbrukar eller släpper ett reserverat meddelande. Den här metoden sprider ut alla meddelanden som finns i utdatakön.
- I avsnittet - protecteddefinierar du- link_target_notificationmetoden.- // Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }- Medlemsvariabeln - _M_pReservedFordefinieras av basklassen ,- source_block. Den här medlemsvariabeln pekar på det eventuella målblocket som har en reservation till meddelandet som finns längst fram i utdatakön. Körtiden anropar- link_target_notificationnär ett nytt mål länkas till objektet- priority_buffer. Den här metoden sprider ut alla meddelanden som finns i utdatakön om inget mål innehåller en reservation.
- I avsnittet - privatedefinierar du- propagate_priority_ordermetoden.- // Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }- Den här metoden sprider ut alla meddelanden från utdatakön. Varje meddelande i kön erbjuds till varje målblock tills något av målblocken accepterar meddelandet. Klassen - priority_bufferbevarar ordningen på de utgående meddelandena. Därför måste det första meddelandet i utdatakön accepteras av ett målblock innan den här metoden erbjuder andra meddelanden till målblocken.
- I avsnittet - protecteddefinierar du- propagate_messagemetoden.- // Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }- Metoden - propagate_messagemöjliggör för- priority_bufferklassen att fungera som meddelandemottagare eller som mål. Den här metoden tar emot det meddelande som erbjuds av det angivna källblocket och infogar meddelandet i prioritetskön. Metoden- propagate_messageskickar sedan asynkront alla utdatameddelanden till målblocken.- Körtiden anropar den här metoden när du använder funktionen concurrency::asend eller när meddelandeblocket är anslutet till andra meddelandeblock. 
- I avsnittet - protecteddefinierar du- send_messagemetoden.- // Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }- Metoden - send_messageliknar- propagate_message. Den skickar dock utdatameddelandena synkront i stället för asynkront.- Körtiden anropar den här metoden under en synkroniserad sändning, till exempel när du anropar funktionen concurrency::send. 
Klassen priority_buffer innehåller konstruktoröverlagringar som är typiska i många typer av meddelandeblock. Vissa konstruktoröverlagringar tar concurrency::Scheduler eller concurrency::ScheduleGroup-objekt, vilket gör att meddelandeblocket kan hanteras av en specifik schemaläggare. Andra konstruktoröverlagringar tar en filterfunktion. Med filterfunktioner kan meddelandeblock acceptera eller avvisa ett meddelande på grundval av nyttolasten. Mer information om meddelandefilter finns i Asynkrona meddelandeblock. Mer information om schemaläggare finns i Schemaläggaren.
              priority_buffer Eftersom klassen beställer meddelanden efter prioritet och sedan i den ordning som meddelanden tas emot är den här klassen mest användbar när den tar emot meddelanden asynkront, till exempel när du anropar funktionen samtidighet::asend eller när meddelandeblocket är anslutet till andra meddelandeblock.
[Topp]
Det fullständiga exemplet
I följande exempel visas den fullständiga definitionen av priority_buffer klassen.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}
namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }
        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }
        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }
        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }
        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }
        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }
        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }
        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }
        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }
    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);
            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }
                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }
        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);
            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }
                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }
        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;
            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }
            return message;
        }
        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }
        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }
        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }
        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }
        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }
            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }
        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }
            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);
                // Free the memory for the input message.
                delete input_message;
                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }
            // Propagate out the output messages.
            propagate_priority_order();
        }
    private:
        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }
            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();
                concurrency::message_status status = declined;
                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);
                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }
                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }
                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }
    private:
        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;
        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;
        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;
    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };
}
I följande exempel utförs ett antal asend- och concurrency::receive-operationer samtidigt på ett priority_buffer-objekt.
// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.
   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}
Det här exemplet genererar följande exempelutdata.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Klassen priority_buffer beställer meddelanden först efter prioritet och sedan efter i vilken ordning den tar emot meddelanden. I det här exemplet infogas meddelanden med högre numerisk prioritet framför kön.
[Topp]
Kompilera koden
Kopiera exempelkoden och klistra in den i ett Visual Studio-projekt, eller klistra in definitionen av priority_buffer klassen i en fil med namnet priority_buffer.h och testprogrammet i en fil med namnet priority_buffer.cpp och kör sedan följande kommando i ett Visual Studio-kommandotolkfönster.
cl.exe /EHsc priority_buffer.cpp
Se även
              Genomgång av samtidighetskörning
              Asynkrona meddelandeblock
              Funktioner för meddelandeöverföring