Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
A message-processing agent which executes an asynchronous computation.
Namespace/Module Path: Microsoft.FSharp.Control
Assembly: FSharp.Core (in FSharp.Core.dll)
[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end
Remarks
The agent encapsulates a message queue that supports multiple-writers and a single reader agent. Writers send messages to the agent by using the Post method and its variations. The agent may wait for messages using the Receive or TryReceive methods or scan through all available messages using the Scan or TryScan method.
This type is named FSharpMailboxProcessor in the .NET assembly. If accessing the type from a .NET language other than F#, or through reflection, use this name.
Constructors
| Member | Description | 
|---|---|
| Creates an agent. The body function is used to generate the asynchronous computation executed by the agent. This function is not executed until Start is called. | 
Instance Members
| Member | Description | 
|---|---|
| Occurs when the execution of the agent results in an exception. | |
| Returns the number of unprocessed messages in the message queue of the agent. | |
| Raises a timeout exception if a message not received in this amount of time. By default no timeout is used. | |
| Occurs when the execution of the agent results in an exception. | |
| Posts a message to the message queue of the MailboxProcessor, asynchronously. | |
| Posts a message to an agent and await a reply on the channel, asynchronously. | |
| Posts a message to an agent and await a reply on the channel, synchronously. | |
| Like AsyncPostAndReply, but returns None if no reply within the timeout period. | |
| Waits for a message. This will consume the first message in arrival order. | |
| Occurs when the execution of the agent results in an exception. | |
| Scans for a message by looking through messages in arrival order until scanner returns a Some value. Other messages remain in the queue. | |
| Starts the agent. | |
| Like PostAndReply, but returns None if no reply within the timeout period. | |
| Waits for a message. This will consume the first message in arrival order. | |
| Scans for a message by looking through messages in arrival order until scanner returns a Some value. Other messages remain in the queue. | 
Static Members
| Member | Description | 
|---|---|
| Creates and starts an agent. The body function is used to generate the asynchronous computation executed by the agent. | 
Example
The following example shows the basic use of the MailboxProcessor class.
open System
open Microsoft.FSharp.Control
type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)
let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)
mailbox.Start()
mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))
Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore
Sample Output
Press any key... Message count = 0. Waiting for next message. Message received. ID: 1 Contents: ABC Message count = 1. Waiting for next message. Message received. ID: 2 Contents: XYZ Message count = 2. Waiting for next message.
The following example shows how to use MailboxProcessor to create a simple agent that accepts various types of messages and returns appropriate replies. This server agent represents a market maker, which is a buying and selling agent on a stock exchange that sets bid and ask prices for assets. Clients can query for prices, or buy and sell shares.
open System
type AssetCode = string
type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value
type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int
type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType
let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]
let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray
let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0
let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())
marketMaker.Start()
// Query price. 
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))
// Test Buy Order. 
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))
// Test Sell Order. 
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))
// Test incorrect code. 
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))
// Test too large a number of shares. 
let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))
// Too large an amount of money for one transaction. 
let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))
let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)
let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }
let simulate =
    async {
        while (true) do 
            do! simulateOne()
            // Insert a delay so that you can see the results more easily. 
            do! Async.Sleep(1000)
    }
Async.Start(simulate)
Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore
Sample Output
Posting message for AAA Replying with Info for AAA Posting message for BBB Replying with Notification: Bought 100 units of BBB at price $20.100000. Total purchase $2010.000000. Marketmaker balance: $ 2010.00 Posting message for CCC Replying with Notification: Sold 100 units of CCC at price $30.000000. Total sale $3000.000000. Marketmaker balance: $ -990.00 Posting message for WrongCode Posting message with large number of shares of AAA. Insufficient shares to fulfill order for 1000000000 units of AAA. Posting message with too large a monetary amount. Insufficient cash to fulfill order for 100000000 units of AAA. Press any key... Posting BUY CCC 1338. Replying with Notification: Bought 1338 units of CCC at price $30.150000. Total purchase $40340.700000. Marketmaker balance: $ 39350.70 Program+Snippet3+Reply+Notify Posting BUY BBB 1961. Replying with Notification: Bought 1961 units of BBB at price $20.100000. Total purchase $39416.100000. Marketmaker balance: $ 78766.80
Platforms
Windows 8, Windows 7, Windows Server 2012, Windows Server 2008 R2
Version Information
F# Core Library Versions
Supported in: 2.0, 4.0, Portable