F# 编程/邮箱处理器
F#:MailboxProcessor 类 |
F# 的MailboxProcessor 类本质上是一个专门的的基于其自身逻辑控制线程运行的消息队列。任何线程都可以异步或同步地向 MailboxProcessor 发送消息,允许线程通过消息传递进行相互通信。这种"线程"的控制实际上是一个轻量级的模拟线程,通过对消息的异步反应来实现。这种消息传递并发风格的灵感来自 Erlang 编程语言。
邮箱处理器是使用 MailboxProcessor.Start 方法创建的,该方法的类型为 Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { do printfn "n = %d, waiting..." n
let! msg = inbox.Receive()
return! loop(n+msg) }
loop 0)
值 inbox
的类型为 MailboxProcessor<'msg>
,表示消息队列。方法 inbox.Receive()
从消息队列中取出第一个消息,并将其绑定到 msg
标识符。如果队列中没有消息,Receive
会将线程释放回线程池并等待更多消息。在 Receive
等待更多消息时,没有线程会被阻塞。
我们可以使用 fsi 对 counter
进行实验
> let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { do printfn "n = %d, waiting..." n
let! msg = inbox.Receive()
return! loop(n+msg) }
loop 0);;
val counter : MailboxProcessor<int>
n = 0, waiting...
> counter.Post(5);;
n = 5, waiting...
val it : unit = ()
> counter.Post(20);;
n = 25, waiting...
val it : unit = ()
> counter.Post(10);;
n = 35, waiting...
val it : unit = ()
MailboxProcessor 类中有一些有用的方法
static member Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>
- 创建并启动 MailboxProcessor 实例。处理器执行的异步计算是 'initial' 函数返回的计算。
member Post : message:'msg -> unit
- 异步地将消息发布到 MailboxProcessor 的消息队列中。
member PostAndReply : buildMessage:(AsyncReplyChannel<'reply> -> 'msg) * ?timeout:int * ?exitContext:bool -> 'reply
- 将消息发布到 MailboxProcessor 的消息队列中,并在通道上等待回复。消息由对第一个函数的单个调用生成,该函数必须构建一个包含回复通道的消息。接收 MailboxProcessor 必须处理此消息,并精确地对回复通道调用一次 Reply 方法。
member Receive : ?timeout:int -> Async<'msg>
- 返回一个异步计算,该计算将按到达顺序使用第一个消息。在等待更多消息时,没有线程会被阻塞。如果超时,则引发 TimeoutException。
就像我们可以轻松地向 MailboxProcessor 发送消息一样,MailboxProcessor 也可以向消费者发送回复。例如,我们可以使用 PostAndReply
方法查询 MailboxProcessor 的值,如下所示
type msg =
| Incr of int
| Fetch of AsyncReplyChannel<int>
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Incr(x) -> return! loop(n + x)
| Fetch(replyChannel) ->
replyChannel.Reply(n)
return! loop(n) }
loop 0)
msg
联合体封装了两种类型的消息:我们可以告诉 MailboxProcessor 进行增量,或者让它将它的内容发送到一个回复通道。类型 AsyncReplyChannel<'a>
公开了单个方法,member Reply : 'reply -> unit
。我们可以使用 fsi 中的以下内容
> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57
请注意 PostAndReply
是一个同步方法。
通常,我们不想将类的实现细节公开给消费者。例如,我们可以将上面的示例重写为一个类,该类公开了一些选择方法
type countMsg =
| Die
| Incr of int
| Fetch of AsyncReplyChannel<int>
type counter() =
let innerCounter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Die -> return ()
| Incr x -> return! loop(n + x)
| Fetch(reply) ->
reply.Reply(n);
return! loop n }
loop 0)
member this.Incr(x) = innerCounter.Post(Incr x)
member this.Fetch() = innerCounter.PostAndReply((fun reply -> Fetch(reply)), timeout = 2000)
member this.Die() = innerCounter.Post(Die)
Rob Pike 在 Google TechTalk 上关于 NewSqueak 编程语言的精彩演讲中介绍了它。NewSqueak 对并发的处理方式使用了通道,类似于 MailboxProcessor,用于线程间通信。在演讲的最后,他展示了如何使用这些通道实现素数筛。以下是基于 Pike 的 NewSqueak 代码的素数筛的实现
type 'a seqMsg =
| Die
| Next of AsyncReplyChannel<'a>
type primes() =
let counter(init) =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Die -> return ()
| Next(reply) ->
reply.Reply(n)
return! loop(n + 1) }
loop init)
let filter(c : MailboxProcessor<'a seqMsg>, pred) =
MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
let! msg = inbox.Receive()
match msg with
| Die ->
c.Post(Die)
return()
| Next(reply) ->
let rec filter' n =
if pred n then async { return n }
else
async {let! m = c.PostAndAsyncReply(Next)
return! filter' m }
let! testItem = c.PostAndAsyncReply(Next)
let! filteredItem = filter' testItem
reply.Reply(filteredItem)
return! loop()
}
loop()
)
let processor = MailboxProcessor.Start(fun inbox ->
let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =
async {
let! msg = inbox.Receive()
match msg with
| Die ->
oldFilter.Post(Die)
return()
| Next(reply) ->
reply.Reply(prime)
let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))
let! newPrime = oldFilter.PostAndAsyncReply(Next)
return! loop newFilter newPrime
}
loop (counter(3)) 2)
member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)
interface System.IDisposable with
member this.Dispose() = processor.Post(Die)
static member upto max =
[ use p = new primes()
let lastPrime = ref (p.Next())
while !lastPrime <= max do
yield !lastPrime
lastPrime := p.Next() ]
counter
代表一个从 n 到无穷大的无限列表。
filter
只是一个用于另一个 MailboxProcessor 的过滤器。它类似于 Seq.filter
。
processor
本质上是一个迭代过滤器:我们用第一个素数 2
和一个从 3 到无穷大的无限列表来播种素数列表。每次我们处理一条消息时,我们都会返回素数,然后用一个新的列表替换我们的无限列表,该列表过滤掉所有能被我们的素数整除的数字。每个新过滤列表的头部是下一个素数。
因此,我们第一次调用 Next
时,我们得到一个 2,并将我们的无限列表替换为所有不能被 2 整除的数字。我们再次调用 next,我们得到下一个素数 3,并再次过滤列表,过滤掉所有能被 3 整除的数字。我们再次调用 next,我们得到下一个素数 5(我们跳过 4,因为它能被 2 整除),并过滤掉所有能被 5 整除的数字。这个过程无限重复。最终结果是一个素数筛,它的实现与埃拉托斯特尼筛法完全相同。
我们可以在 fsi 中测试这个类
> let p = new primes();;
val p : primes
> p.Next();;
val it : int = 2
> p.Next();;
val it : int = 3
> p.Next();;
val it : int = 5
> p.Next();;
val it : int = 7
> p.Next();;
val it : int = 11
> p.Next();;
val it : int = 13
> p.Next();;
val it : int = 17
> primes.upto 100;;
val it : int list
= [2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
73; 79; 83; 89; 97]