跳转至内容

F# 编程/邮箱处理器

来自维基教科书,开放的书籍,开放的世界
上一个:异步工作流 索引 下一个:词法分析和解析
F#:MailboxProcessor 类

F# 的MailboxProcessor 类本质上是一个专门的的基于其自身逻辑控制线程运行的消息队列。任何线程都可以异步或同步地向 MailboxProcessor 发送消息,允许线程通过消息传递进行相互通信。这种"线程"的控制实际上是一个轻量级的模拟线程,通过对消息的异步反应来实现。这种消息传递并发风格的灵感来自 Erlang 编程语言。

定义邮箱处理器

[编辑 | 编辑源代码]

邮箱处理器是使用 MailboxProcessor.Start 方法创建的,该方法的类型为 Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>

let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { do printfn "n = %d, waiting..." n
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0)

inbox 的类型为 MailboxProcessor<'msg>,表示消息队列。方法 inbox.Receive() 从消息队列中取出第一个消息,并将其绑定到 msg 标识符。如果队列中没有消息,Receive 会将线程释放回线程池并等待更多消息。在 Receive 等待更多消息时,没有线程会被阻塞。

我们可以使用 fsi 对 counter 进行实验

> let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { do printfn "n = %d, waiting..." n
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0);;

val counter : MailboxProcessor<int>

n = 0, waiting...
> counter.Post(5);;
n = 5, waiting...
val it : unit = ()
> counter.Post(20);;
n = 25, waiting...
val it : unit = ()
> counter.Post(10);;
n = 35, waiting...
val it : unit = ()

邮箱处理器方法

[编辑 | 编辑源代码]

MailboxProcessor 类中有一些有用的方法

static member Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>

创建并启动 MailboxProcessor 实例。处理器执行的异步计算是 'initial' 函数返回的计算。

member Post : message:'msg -> unit

异步地将消息发布到 MailboxProcessor 的消息队列中。

member PostAndReply : buildMessage:(AsyncReplyChannel<'reply> -> 'msg) * ?timeout:int * ?exitContext:bool -> 'reply

将消息发布到 MailboxProcessor 的消息队列中,并在通道上等待回复。消息由对第一个函数的单个调用生成,该函数必须构建一个包含回复通道的消息。接收 MailboxProcessor 必须处理此消息,并精确地对回复通道调用一次 Reply 方法。

member Receive : ?timeout:int -> Async<'msg>

返回一个异步计算,该计算将按到达顺序使用第一个消息。在等待更多消息时,没有线程会被阻塞。如果超时,则引发 TimeoutException。

双向通信

[编辑 | 编辑源代码]

就像我们可以轻松地向 MailboxProcessor 发送消息一样,MailboxProcessor 也可以向消费者发送回复。例如,我们可以使用 PostAndReply 方法查询 MailboxProcessor 的值,如下所示

type msg =
    | Incr of int
    | Fetch of AsyncReplyChannel<int>

let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { let! msg = inbox.Receive()
                    match msg with
                    | Incr(x) -> return! loop(n + x)
                    | Fetch(replyChannel) ->
                        replyChannel.Reply(n)
                        return! loop(n) }
        loop 0)

msg 联合体封装了两种类型的消息:我们可以告诉 MailboxProcessor 进行增量,或者让它将它的内容发送到一个回复通道。类型 AsyncReplyChannel<'a> 公开了单个方法,member Reply : 'reply -> unit。我们可以使用 fsi 中的以下内容

> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57

请注意 PostAndReply 是一个同步方法。

用对象封装邮箱处理器

[编辑 | 编辑源代码]

通常,我们不想将类的实现细节公开给消费者。例如,我们可以将上面的示例重写为一个类,该类公开了一些选择方法

type countMsg =
    | Die
    | Incr of int
    | Fetch of AsyncReplyChannel<int>
    
type counter() =
    let innerCounter =
        MailboxProcessor.Start(fun inbox ->
            let rec loop n =
                async { let! msg = inbox.Receive()
                        match msg with
                        | Die -> return ()
                        | Incr x -> return! loop(n + x)
                        | Fetch(reply) ->
                            reply.Reply(n);
                            return! loop n }
            loop 0)
            
    member this.Incr(x) = innerCounter.Post(Incr x)
    member this.Fetch() = innerCounter.PostAndReply((fun reply -> Fetch(reply)), timeout = 2000)
    member this.Die() = innerCounter.Post(Die)

邮箱处理器示例

[编辑 | 编辑源代码]

素数筛

[编辑 | 编辑源代码]

Rob Pike 在 Google TechTalk 上关于 NewSqueak 编程语言的精彩演讲中介绍了它。NewSqueak 对并发的处理方式使用了通道,类似于 MailboxProcessor,用于线程间通信。在演讲的最后,他展示了如何使用这些通道实现素数筛。以下是基于 Pike 的 NewSqueak 代码的素数筛的实现

type 'a seqMsg =   
    | Die   
    | Next of AsyncReplyChannel<'a>   
  
type primes() =   
    let counter(init) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop n =   
                async { let! msg = inbox.Receive()   
                        match msg with   
                        | Die -> return ()   
                        | Next(reply) ->   
                            reply.Reply(n)   
                            return! loop(n + 1) }   
            loop init)   
      
    let filter(c : MailboxProcessor<'a seqMsg>, pred) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop() =   
                async {   
                    let! msg = inbox.Receive()   
                    match msg with   
                    | Die ->   
                        c.Post(Die)   
                        return()   
                    | Next(reply) ->   
                        let rec filter' n =   
                            if pred n then async { return n }   
                            else  
                                async {let! m = c.PostAndAsyncReply(Next)   
                                       return! filter' m }   
                        let! testItem = c.PostAndAsyncReply(Next)   
                        let! filteredItem = filter' testItem   
                        reply.Reply(filteredItem)   
                        return! loop()   
                }   
            loop()   
        )

    let processor = MailboxProcessor.Start(fun inbox ->   
        let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =   
            async {   
                let! msg = inbox.Receive()   
                match msg with   
                | Die ->   
                    oldFilter.Post(Die)   
                    return()   
                | Next(reply) ->   
                    reply.Reply(prime)   
                    let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))   
                    let! newPrime = oldFilter.PostAndAsyncReply(Next)   
                    return! loop newFilter newPrime   
            }   
        loop (counter(3)) 2)   
  
    member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)
    
    interface System.IDisposable with
        member this.Dispose() = processor.Post(Die)

    static member upto max =   
        [ use p = new primes()
          let lastPrime = ref (p.Next())
          while !lastPrime <= max do
            yield !lastPrime
            lastPrime := p.Next() ]

counter 代表一个从 n 到无穷大的无限列表。

filter 只是一个用于另一个 MailboxProcessor 的过滤器。它类似于 Seq.filter

processor 本质上是一个迭代过滤器:我们用第一个素数 2 和一个从 3 到无穷大的无限列表来播种素数列表。每次我们处理一条消息时,我们都会返回素数,然后用一个新的列表替换我们的无限列表,该列表过滤掉所有能被我们的素数整除的数字。每个新过滤列表的头部是下一个素数。

因此,我们第一次调用 Next 时,我们得到一个 2,并将我们的无限列表替换为所有不能被 2 整除的数字。我们再次调用 next,我们得到下一个素数 3,并再次过滤列表,过滤掉所有能被 3 整除的数字。我们再次调用 next,我们得到下一个素数 5(我们跳过 4,因为它能被 2 整除),并过滤掉所有能被 5 整除的数字。这个过程无限重复。最终结果是一个素数筛,它的实现与埃拉托斯特尼筛法完全相同。

我们可以在 fsi 中测试这个类

> let p = new primes();;

val p : primes

> p.Next();;
val it : int = 2
> p.Next();;
val it : int = 3
> p.Next();;
val it : int = 5
> p.Next();;
val it : int = 7
> p.Next();;
val it : int = 11
> p.Next();;
val it : int = 13
> p.Next();;
val it : int = 17
> primes.upto 100;;
val it : int list
= [2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
   73; 79; 83; 89; 97]
上一个:异步工作流 索引 下一个:词法分析和解析
华夏公益教科书