F# 编程/异步工作流
F#:异步工作流 |
异步工作流 允许程序员将单线程代码转换为多线程代码,而只需进行最小的代码更改。
异步工作流使用 计算表达式表示法 定义。
async { comp-exprs }
以下是一个使用 fsi 的示例
> let asyncAdd x y = async { return x + y };;
val asyncAdd : int -> int -> Async<int>
注意 asyncAdd
的返回值类型。它实际上并没有运行函数;相反,它返回一个 async<int>
,它是一种围绕我们函数的特殊包装器。
Async 模块 用于对 async<'a>
对象进行操作。它包含几个有用的方法,其中最重要的是
member RunSynchronously : computation:Async<'T> * ?timeout:int -> 'T
- 运行异步计算并等待其结果。如果异步计算中发生异常,则此函数会重新引发异常。作为默认 AsyncGroup 的一部分运行。
member Parallel : computationList:seq<Async<'T>> -> Async<'T array>
- 指定一个异步计算,当运行时,会执行所有给定的异步计算,最初将每个计算排队到线程池中。如果任何一个引发异常,则整个计算将引发异常,并尝试取消其他计算。所有子计算都属于 AsyncGroup,该组是外部计算的 AsyncGroup 的子组。
member Start : computation:Async<unit> -> unit
- 在线程池中启动异步计算。不要等待其结果。作为默认 AsyncGroup 的一部分运行
Async.RunSynchronously
用于运行 async<'a>
块并等待它们返回,Run.Parallel
自动在 CPU 拥有的所有处理器上运行每个 async<'a>
,而 Async.Start
则在不等待操作完成的情况下运行。为了使用规范示例,下载网页,我们可以在 fsi 中编写如下异步下载网页的代码
> let extractLinks url =
async {
let webClient = new System.Net.WebClient()
printfn "Downloading %s" url
let html = webClient.DownloadString(url : string)
printfn "Got %i bytes" html.Length
let matches = System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
printfn "Got %i links" matches.Count
return url, matches.Count
};;
val extractLinks : string -> Async<string * int>
> Async.RunSynchronously (extractLinks "http://www.msn.com/");;
Downloading http://www.msn.com/
Got 50742 bytes
Got 260 links
val it : string * int = ("http://www.msn.com/", 260)
async<'a>
对象由 AsyncBuilder 构建,它具有以下重要成员
member Bind : p:Async<'a> * f:('a -> Async<'b>) -> Async<'b>
/ let!
- 指定一个异步计算,当运行时,会运行 'p',当 'p' 生成一个结果 'res' 时,会运行 'f res'。
member Return : v:'a -> Async<'a>
/ return
- 指定一个异步计算,当运行时,会返回结果 'v'
换句话说,let!
执行一个异步工作流并将它的返回值绑定到一个标识符,return
只是返回一个结果,而 return!
执行一个异步工作流并将它的返回值作为一个结果返回。
这些原语允许我们在彼此之间组合异步块。例如,我们可以通过异步下载网页并异步提取它的 URL 来改进上面的代码
let extractLinksAsync html =
async {
return System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
}
let downloadAndExtractLinks url =
async {
let webClient = new System.Net.WebClient()
let html = webClient.DownloadString(url : string)
let! links = extractLinksAsync html
return url, links.Count
}
注意 let!
接收一个 async<'a>
并将它的返回值绑定到一个类型为 'a
的标识符。我们可以在 fsi 中测试这段代码
> let links = downloadAndExtractLinks "http://www.wordpress.com/";;
val links : Async<string * int>
> Async.Run links;;
val it : string * int = ("http://www.wordpress.com/", 132)
let!
是做什么的?
let!
在它自己的线程上运行一个 async<'a>
对象,然后它立即将当前线程释放回线程池。当 let!
返回时,工作流的执行将在新线程上继续,该线程可能与工作流开始运行的线程相同,也可能不同。因此,异步工作流往往会在线程之间“跳跃”,这是一个有趣的现象,这里 明确展示了这一点,但这通常并不被认为是一件坏事。
考虑函数 Seq.map
。此函数是同步的,但是它没有真正的理由需要 同步,因为每个元素都可以并行映射(假设我们不共享任何可变状态)。使用 模块扩展,我们可以用最小的努力编写 Seq.map
的并行版本
module Seq =
let pmap f l =
seq { for a in l -> async { return f a } }
|> Async.Parallel
|> Async.Run
并行映射可能会对映射操作的速度产生重大影响。我们可以使用以下方法直接比较串行和并行映射
open System.Text.RegularExpressions
open System.Net
let download url =
let webclient = new System.Net.WebClient()
webclient.DownloadString(url : string)
let extractLinks html = Regex.Matches(html, @"http://\S+")
let downloadAndExtractLinks url =
let links = (url |> download |> extractLinks)
url, links.Count
let urls =
[@"http://www.craigslist.com/";
@"http://www.msn.com/";
@"https://wikibooks.cn/wiki/Main_Page";
@"http://www.wordpress.com/";
@"http://news.google.com/";]
let pmap f l =
seq { for a in l -> async { return f a } }
|> Async.Parallel
|> Async.Run
let testSynchronous() = List.map downloadAndExtractLinks urls
let testAsynchronous() = pmap downloadAndExtractLinks urls
let time msg f =
let stopwatch = System.Diagnostics.Stopwatch.StartNew()
let temp = f()
stopwatch.Stop()
printfn "(%f ms) %s: %A" stopwatch.Elapsed.TotalMilliseconds msg temp
let main() =
printfn "Start..."
time "Synchronous" testSynchronous
time "Asynchronous" testAsynchronous
printfn "Done."
main()
此程序具有以下类型
val download : string -> string
val extractLinks : string -> MatchCollection
val downloadAndExtractLinks : string -> string * int
val urls : string list
val pmap : ('a -> 'b) -> seq<'a> -> 'b array
val testSynchronous : unit -> (string * int) list
val testAsynchronous : unit -> (string * int) array
val time : string -> (unit -> 'a) -> unit
val main : unit -> unit
此程序输出以下内容
Start... (4276.190900 ms) Synchronous: [("http://www.craigslist.com/", 185); ("http://www.msn.com/", 262); ("https://wikibooks.cn/wiki/Main_Page", 190); ("http://www.wordpress.com/", 132); ("http://news.google.com/", 296)] (1939.117900 ms) Asynchronous: [|("http://www.craigslist.com/", 185); ("http://www.msn.com/", 261); ("https://wikibooks.cn/wiki/Main_Page", 190); ("http://www.wordpress.com/", 132); ("http://news.google.com/", 294)|] Done.
使用 pmap
的代码运行速度快了大约 2.2 倍,因为网页是并行下载的,而不是串行下载的。
在软件开发的最初 50 年中,程序员可以欣慰地认为,计算机硬件的性能大约每 18 个月翻一番。如果一个程序今天很慢,你只需等待几个月,该程序就会在没有任何源代码更改的情况下以两倍的速度运行。这种趋势一直持续到 2000 年代初,2003 年的商用台式机拥有比 1993 年最快的超级计算机更强大的处理能力。然而,在 Herb Sutter 发表了一篇著名的文章 免费午餐结束了:软件中并发性的根本性转变 之后,处理器在 2005 年左右达到了大约 3.7 GHz 的峰值。计算速度的理论上限受到光速和物理定律的限制,我们已经非常接近这个极限了。由于 CPU 设计人员无法设计出更快的 CPU,他们转向设计具有多个内核和对多线程更好支持的处理器。程序员不再拥有应用程序随着硬件改进而以两倍速度运行的奢侈条件——免费午餐结束了。
时钟频率没有变得更快,但是企业每年处理的数据量呈指数级增长(通常每年增长 10-20%)。为了满足不断增长的业务处理需求,所有软件开发的未来都趋向于开发高度并行、多线程的应用程序,这些应用程序利用多核处理器、分布式系统和云计算。
多线程编程以难以正确实现并且学习曲线陡峭而闻名。为什么它有这样的声誉?简单地说,可变共享状态使程序难以推理。当两个线程修改相同的变量时,很容易使变量处于无效状态。
竞争条件
作为一个演示,以下是如何使用共享状态(非线程版本)来增加全局变量
let test() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
counter := !counter + 1m
IncrGlobalCounter 1000000
IncrGlobalCounter 1000000
!counter // returns 2000000M
这有效,但一些程序员可能会注意到,对 IncrGlobalCounter
的两次调用可以并行计算,因为没有真正的原因需要等待一个调用完成才能执行另一个调用。使用 System.Threading 命名空间中的 .NET 线程原语,程序员可以将此重写为以下代码
open System.Threading
let testAsync() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
counter := !counter + 1m
let AsyncIncrGlobalCounter numberOfTimes =
new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
let t1 = AsyncIncrGlobalCounter 1000000
let t2 = AsyncIncrGlobalCounter 1000000
t1.Start() // runs t1 asyncronously
t2.Start() // runs t2 asyncronously
t1.Join() // waits until t1 finishes
t2.Join() // waits until t2 finishes
!counter
此程序应该 与前面的程序做同样的事情,只是它应该在 ~1/2 的时间内运行。以下是在 fsi 中进行 5 次测试运行的结果
> [for a in 1 .. 5 -> testAsync()];;
val it : decimal list = [1498017M; 1509820M; 1426922M; 1504574M; 1420401M]
该程序在计算上是合理的,但它每次运行都会产生不同的结果。发生了什么?
增加一个十进制值需要多个机器指令。特别是,用于增加十进制值的 .NET IL 如下所示
// pushes static field onto evaluation stack
L_0004: ldsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i
// executes Decimal.op_Increment method
L_0009: call valuetype [mscorlib]System.Decimal [mscorlib]System.Decimal::op_Increment(valuetype [mscorlib]System.Decimal)
// replaces static field with value from evaluation stack
L_000e: stsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i
假设有两个线程调用此代码(线程 1 和线程 2 进行的调用是交错的)
Thread1: Loads value "100" onto its evaluation stack. Thread1: Call add with "100" and "1" Thread2: Loads value "100" onto its evaluation stack. Thread1: Writes "101" back out to static variable Thread2: Call add with "100" and "1" Thread2: Writes "101" back out to static variable (Oops, we've incremented an old value and wrote it back out) Thread1: Loads value "101" onto its evaluation stack. Thread2: Loads value "101" onto its evaluation stack. (Now we let Thread1 get a little further ahead of Thread2) Thread1: Call add with "101" and "1" Thread1: Writes "102" back out to static variable. Thread1: Loads value "102" to evaluation stack Thread1: Call add with "102" and "1" Thread1: Writes "103" back out to static variable. Thread2: Call add with "101" and "1 Thread2: Writes "102" back out to static variable (Oops, now we've completely overwritten work done by Thread1!)
这种类型的错误称为竞争条件,它在多线程应用程序中经常发生。与普通错误不同,竞争条件通常是非确定性的,这使得它们非常难以追踪。
通常,程序员通过引入锁来解决竞争条件。当一个对象被“锁定”时,所有其他线程都必须等待,直到该对象被“解锁”才能继续执行。我们可以使用块访问 counter
变量来重写上面的代码,而每个线程都写入该变量
open System.Threading
let testAsync() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
lock counter (fun () -> counter := !counter + 1m)
(* lock is a function in F# library. It automatically unlocks "counter" when 'fun () -> ...' completes *)
let AsyncIncrGlobalCounter numberOfTimes =
new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
let t1 = AsyncIncrGlobalCounter 1000000
let t2 = AsyncIncrGlobalCounter 1000000
t1.Start() // runs t1 asyncronously
t2.Start() // runs t2 asyncronously
t1.Join() // waits until t1 finishes
t2.Join() // waits until t2 finishes
!counter
锁保证每个线程对共享状态的独占访问,并强制每个线程在代码 counter := !counter + 1m
运行完成之前等待另一个线程。此函数现在会产生预期结果。
死锁
锁强制线程等待,直到对象被解锁。但是,锁经常会导致一个新的问题:假设有两个线程 ThreadA 和 ThreadB 操作两个相应的共享状态 StateA 和 StateB。ThreadA 锁定 StateA 和 StateB,ThreadB 锁定 StateB 和 StateA。如果时间安排得当,当 ThreadA 需要访问 StateB 时,它会等待 ThreadB 解锁 StateB;当 ThreadB 需要访问 StateA 时,它也无法继续,因为 StateA 被 ThreadA 锁定。这两个线程相互阻塞,无法继续执行。这被称为死锁。
以下是一些演示死锁的简单代码
open System.Threading
let testDeadlock() =
let stateA = ref "Shared State A"
let stateB = ref "Shared State B"
let threadA = new Thread(fun () ->
printfn "threadA started"
lock stateA (fun () ->
printfn "stateA: %s" !stateA
Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
lock stateB (fun () -> printfn "stateB: %s" !stateB))
printfn "threadA finished")
let threadB = new Thread(fun () ->
printfn "threadB started"
lock stateB (fun () ->
printfn "stateB: %s" !stateB
Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
lock stateA (fun () -> printfn "stateA: %s" !stateA))
printfn "threadB finished")
printfn "Starting..."
threadA.Start()
threadB.Start()
threadA.Join()
threadB.Join()
printfn "Finished..."
这些类型的错误在多线程代码中经常发生,尽管它们通常不像上面显示的代码那样明确。
简单地说,可变状态是多线程代码的敌人。函数式编程通常极大地简化了多线程:由于值默认情况下是不可变的,程序员不需要担心一个线程在两个线程之间修改共享状态的值,因此它消除了与竞争条件相关的整类多线程错误。由于不存在竞争条件,因此也没有理由使用锁,因此不可变性也消除了与死锁相关的另一整类错误。