标签:
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 “内核”(虚拟 CPU)所取代。这一发现对编程社区造成了不小的冲击,因为正确创建线程安全的代码,在理论而非实践中,始终会提高高性能开发人员的身价,而让各公司难以聘用他们。看上去,仅有少数人充分理解了 Java 的线程模型、并发 API 以及 “同步” 的含义,以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在。
据推测,行业的其余部分将自力更生,这显然不是一个理想的结局,至少不是 IT 部门努力开发软件所应得的回报。
与 Scala 在 .NET 领域中的姐妹语言 F# 相似,Scala 是针对 “并发性问题” 的解决方案之一。在本期文章中,我讨论了 Scala 的一些属性,这些属性使它更加胜任于编写线程安全的代码,比如默认不可修改的对象,并讨论了一种返回对象副本而不是修改它们内容的首选设计方案。Scala 对并发性的支持远比此深远;现在,我们有必要来了解一下 Scala 的各种库。
在深入研究 Scala 的并发性支持之前,有必要确保您具备了对 Java 基本并发性模型的良好理解,因为 Scala 的并发性支持,从某种程度上说,建立在 JVM 和支持库所提供的特性和功能的基础之上。为此,清单 1 中的代码包含了一个已知的 Producer/Consumer并发性问题(详见 Sun Java Tutorial 的 “Guarded Blocks” 小节)。注意,Java Tutorial 版本并未在其解决方案中使用 java.util.concurrent类,而是择优使用了 java.lang.Object中的较旧的 wait()/notifyAll()方法:
package com.tedneward.scalaexamples.notj5;
class Producer implements Runnable
{
private Drop drop;
private String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
public Producer(Drop drop) { this.drop = drop; }
public void run()
{
for (int i = 0; i < importantInfo.length; i++)
{
drop.put(importantInfo[i]);
}
drop.put("DONE");
}
}
class Consumer implements Runnable
{
private Drop drop;
public Consumer(Drop drop) { this.drop = drop; }
public void run()
{
for (String message = drop.take(); !message.equals("DONE");
message = drop.take())
{
System.out.format("MESSAGE RECEIVED: %s%n", message);
}
}
}
class Drop
{
//Message sent from producer to consumer.
private String message;
//True if consumer should wait for producer to send message,
//false if producer should wait for consumer to retrieve message.
private boolean empty = true;
//Object to use to synchronize against so as to not "leak" the
//"this" monitor
private Object lock = new Object();
public String take()
{
synchronized(lock)
{
//Wait until message is available.
while (empty)
{
try
{
lock.wait();
}
catch (InterruptedException e) {}
}
//Toggle status.
empty = true;
//Notify producer that status has changed.
lock.notifyAll();
return message;
}
}
public void put(String message)
{
synchronized(lock)
{
//Wait until message has been retrieved.
while (!empty)
{
try
{
lock.wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = false;
//Store message.
this.message = message;
//Notify consumer that status has changed.
lock.notifyAll();
}
}
}
public class ProdConSample
{
public static void main(String[] args)
{
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
注意: 我在此处展示的代码对 Sun 教程解决方案做了少许修改;它们提供的代码存在一个很小的设计缺陷(参见 Java 教程 “缺陷”)。
Producer/Consumer 问题的核心非常容易理解:一个(或多个)生产者实体希望将数据提供给一个(或多个)使用者实体供它们使用和操作(在本例中,它包括将数据打印到控制台)。Producer和 Consumer类是相应直观的 Runnable- 实现类:Producer从数组中获取String,并通过 put将它们放置到 Consumer的缓冲区中,并根据需要执行 take。
问题的难点在于,如果 Producer运行过快,则数据在覆盖时可能会丢失;如果 Consumer运行过快,则当 Consumer读取相同的数据两次时,数据可能会得到重复处理。缓冲区(在 Java Tutorial 代码中称作 Drop)将确保不会出现这两种情况。数据破坏的可能性就更不用提了(在 String 引用的例子中很困难,但仍然值得注意),因为数据会由 put放入缓冲区,并由 take取出。
关于此主题的全面讨论请阅读 Brian Goetz 的 Java Concurrency in Practice或 Doug Lea 的Concurrent Programming in Java(参见 参考资料),但是,在应用 Scala 之前有必要快速了解一下此代码的运行原理。
当 Java 编译器看到 synchronized关键字时,它会在同步块的位置生成一个 try/finally块,其顶部包括一个 monitorenter操作码,并且 finally块中包括一个 monitorexit操作码,以确保监控程序(Java 的原子性基础)已经发布,而与代码退出的方式无关。因此,Drop中的 put代码将被重写,如清单 2 所示:
// This is pseudocode
public void put(String message)
{
try
{
monitorenter(lock)
//Wait until message has been retrieved.
while (!empty)
{
try
{
lock.wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = false;
//Store message.
this.message = message;
//Notify consumer that status has changed.
lock.notifyAll();
}
finally
{
monitorexit(lock)
}
}
wait()方法将通知当前线程进入非活动状态,并等待另一个线对该对象调用 notifyAll()。然后,通知的线程必须在能够继续执行的时候尝试再次获取监控程序。从本质上说,wait()和 notify()/notifyAll()允许一种简单的信令机制,它允许 Drop在 Producer和 Consumer线程之间进行协调,每个 put都有相应的 take。
本文的 代码下载部分使用 Java5 并发性增强(Lock和 Condition接口以及 ReentrantLock锁定实现)提供 清单 2的基于超时的版本,但基本代码模式仍然相同。这就是问题所在:编写清单 2 这样的代码的开发人员需要过度专注于线程和锁定的细节以及低级实现代码,以便让它们能够正确运行。此外,开发人员需要对每一行代码刨根知底,以确定是否需要保护它们,因为过度同步与过少同步同样有害。
现在,我们来看到 Scala 替代方案。
开始应用 Scala 并发性的一种方法是将 Java 代码直接转换为 Scala,以便利用 Scala 的语法优势来简化代码(至少能简化一点):
object ProdConSample
{
class Producer(drop : Drop)
extends Runnable
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);
override def run() : Unit =
{
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
}
class Consumer(drop : Drop)
extends Runnable
{
override def run() : Unit =
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
class Drop
{
var message : String = ""
var empty : Boolean = true
var lock : AnyRef = new Object()
def put(x: String) : Unit =
lock.synchronized
{
// Wait until message has been retrieved
await (empty == true)
// Toggle status
empty = false
// Store message
message = x
// Notify consumer that status has changed
lock.notifyAll()
}
def take() : String =
lock.synchronized
{
// Wait until message is available.
await (empty == false)
// Toggle status
empty=true
// Notify producer that staus has changed
lock.notifyAll()
// Return the message
message
}
private def await(cond: => Boolean) =
while (!cond) { lock.wait() }
}
def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop();
// Spawn Producer
new Thread(new Producer(drop)).start();
// Spawn Consumer
new Thread(new Consumer(drop)).start();
}
}
Producer和 Consumer类几乎与它们的 Java 同类相同,再一次扩展(实现)了 Runnable接口并覆盖了 run()方法,并且 —对于 Producer的情况 —分别使用了内置迭代方法来遍历 importantInfo数组的内容。(实际上,为了让它更像 Scala,importantInfo可能应该是一个List而不是 Array,但在第一次尝试时,我希望尽可能保证它们与原始 Java 代码一致。)
Drop类同样类似于它的 Java 版本。但 Scala 中有一些例外,“synchronized” 并不是关键字,它是针对 AnyRef类定义的一个方法,即 Scala “所有引用类型的根”。这意味着,要同步某个特定的对象,您只需要对该对象调用同步方法;在本例中,对 Drop上的 lock 字段中所保存的对象调用同步方法。
注意,我们在 await()方法定义的 Drop类中还利用了一种 Scala 机制:cond参数是等待计算的代码块,而不是在传递给该方法之前进行计算。在 Scala 中,这被称作 “call-by-name”;此处,它是一种实用的方法,可以捕获需要在 Java 版本中表示两次的条件等待逻辑(分别用于put和 take)。
最后,在 main()中,创建 Drop实例,实例化两个线程,使用 start()启动它们,然后在 main()的结束部分退出,相信 JVM 会在 main()结束之前启动这两个线程。(在生产代码中,可能无法保证这种情况,但对于这样的简单的例子,99.99 % 没有问题。)
但是,已经说过,仍然存在相同的基本问题:程序员仍然需要过分担心两个线程之间的通信和协调问题。虽然一些 Scala 机制可以简化语法,但这目前为止并没有相当大的吸引力。
Scala Library Reference 中有一个有趣的包:scala.concurrency。这个包包含许多不同的并发性结构,包括我们即将利用的 MailBox类。
顾名思义,MailBox从本质上说就是 Drop,用于在检测之前保存数据块的单槽缓冲区。但是,MailBox最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和 case 类中,这使它比简单的 Drop(或 Drop的多槽数据保存类 java.util.concurrent.BoundedBuffer)更加灵活。
package com.tedneward.scalaexamples.scala.V2
{
import concurrent.{MailBox, ops}
object ProdConSample
{
class Producer(drop : Drop)
extends Runnable
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);
override def run() : Unit =
{
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
}
class Consumer(drop : Drop)
extends Runnable
{
override def run() : Unit =
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
class Drop
{
private val m = new MailBox()
private case class Empty()
private case class Full(x : String)
m send Empty() // initialization
def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}
def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}
def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()
// Spawn Producer
new Thread(new Producer(drop)).start();
// Spawn Consumer
new Thread(new Consumer(drop)).start();
}
}
}
此处,v2 和 v1 之间的惟一区别在于 Drop的实现,它现在利用 MailBox类处理传入以及从 Drop中删除的消息的阻塞和信号事务。(我们可以重写 Producer和 Consumer,让它们直接使用 MailBox,但考虑到简单性,我们假定希望保持所有示例中的 DropAPI 相一致。)使用MailBox与使用典型的 BoundedBuffer(Drop)稍有不同,因此我们来仔细看看其代码。
MailBox有两个基本操作:send和 receive。receiveWithin 方法仅仅是基于超时的 receive。MailBox接收任何类型的消息。send()方法将消息放置到邮箱中,并立即通知任何关心该类型消息的等待接收者,并将它附加到一个消息链表中以便稍后检索。receive()方法将阻塞,直到接收到对于功能块合适的消息。
因此,在这种情况下,我们将创建两个 case 类,一个不包含任何内容(Empty),这表示 MailBox为空,另一个包含消息数据(Full。
put方法,由于它会将数据放置在 Drop中,对 MailBox调用 receive()以查找 Empty实例,因此会阻塞直到发送 Empty。此时,它发送一个 Full实例给包含新数据的 MailBox。take方法,由于它会从 Drop中删除数据,对 MailBox调用 receive()以查找 Full实例,提取消息(再次得益于模式匹配从 case 类内部提取值并将它们绑到本地变量的能力)并发送一个 Empty 实例给 MailBox。不需要明确的锁定,并且不需要考虑监控程序。
事实上,我们可以显著缩短代码,只要 Producer 和 Consumer不需要功能全面的类(此处便是如此) —两者从本质上说都是Runnable.run()方法的瘦包装器,Scala 可以使用 scala.concurrent.ops对象的 spawn方法来实现,如清单 5 所示:
package com.tedneward.scalaexamples.scala.V3
{
import concurrent.MailBox
import concurrent.ops._
object ProdConSample
{
class Drop
{
private val m = new MailBox()
private case class Empty()
private case class Full(x : String)
m send Empty() // initialization
def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}
def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}
def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
}
}
spawn方法(通过包块顶部的 ops对象导入)接收一个代码块(另一个 by-name 参数示例)并将它包装在匿名构造的线程对象的 run()方法内部。事实上,并不难理解 spawn的定义在 ops类的内部是什么样的:
def spawn(p: => Unit) = {
val t = new Thread() { override def run() = p }
t.start()
}
……这再一次强调了 by-name 参数的强大之处。
ops.spawn方法的一个缺点在于,它是在 2003 年 Java 5 concurrency 类还不可用的时候编写的。特别是,java.util.concurrent.Executor及其同类的作用是让开发人员更加轻松地生成线程,而不需要实际处理直接创建线程对象的细节。幸运的是,在您自己的自定义库中重新创建 spawn的定义是相当简单的,这需要利用 Executor(或 ExecutorService或ScheduledExecutorService)来执行线程的实际启动任务。
事实上,Scala 的并发性支持超越了 MailBox和 ops类;Scala 还支持一个类似的 “Actors” 概念,它使用了与 MailBox所采用的方法相类似的消息传递方法,但应用更加全面并且灵活性也更好。但是,这部分内容将在下期讨论。
Scala 为并发性提供了两种级别的支持,这与其他与 Java 相关的主题极为类似:
wait()/notifyAll())。MailBox类以及将在本系列下一篇文章中讨论的 Actors 库。两个例子中的目标是相同的:让开发人员能够更加轻松地专注于问题的实质,而不用考虑并发编程的低级细节(显然,第二种方法更好地实现了这一目标,至少对于没有过多考虑低级细节的人来说是这样的。)
但是,当前 Scala 库的一个明显的缺陷就是缺乏 Java 5 支持;scala.concurrent.ops类应该具有 spawn这样的利用新的 Executor接口的方法。它还应该支持利用新的 Lock接口的各种版本的 synchronized。幸运的是,这些都是可以在 Scala 生命周期中实现的库增强,而不会破坏已有代码;它们甚至可以由 Scala 开发人员自己完成,而不需要等待 Scala 的核心开发团队提供给他们(只需要花费少量时间)。
“actor” 实现在称为 actor 的执行实体之间使用消息传递进行协作(注意,这里有意避免使用 “进程”、“线程” 或 “机器” 等词汇)。尽管它听起来与 RPC 机制有点儿相似,但是它们是有区别的。RPC 调用(比如 Java RMI 调用)会在调用者端阻塞,直到服务器端完成处理并发送回某种响应(返回值或异常),而消息传递方法不会阻塞调用者,因此可以巧妙地避免死锁。
仅仅传递消息并不能避免错误的并发代码的所有问题。另外,这种方法还有助于使用 “不共享任何东西” 编程风格,也就是说不同的 actor 并不访问共享的数据结构(这有助于促进封装 actor,无论 actor 是 JVM 本地的,还是位于其他地方) — 这样就完全不需要同步了。毕竟,如果不共享任何东西,并发执行就不涉及任何需要同步的东西。
这不算是对 actor 模型的正规描述,而且毫无疑问,具有更正规的计算机科学背景的人会找到各种更严谨的描述方法,能够描述 actor 的所有细节。但是对于本文来说,这个描述已经够了。在网上可以找到更详细更正规的描述,还有一些学术文章详细讨论了 actor 背后的概念(请您自己决定是否要深入学习这些概念)。现在,我们来看看 Scala actors API。
使用 actor 根本不困难,只需使用 Actor 类的 actor 方法创建一个 actor,见清单 1:
import scala.actors._, Actor._
package com.tedneward.scalaexamples.scala.V4
{
object Actor1
{
def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case msg => System.out.println(msg)
}
}
badActor ! "Do ya feel lucky, punk?"
}
}
}
这里同时做了两件事。
首先,我们从 Scala Actors 库的包中导入了这个库,然后从库中直接导入了 Actor 类的成员;第二步并不是完全必要的,因为在后面的代码中可以使用 Actor.actor 替代 actor,但是这么做能够表明 actor 是语言的内置结构并(在一定程度上)提高代码的可读性。
下一步是使用 actor 方法创建 actor 本身,这个方法通过参数接收一个代码块。在这里,代码块执行一个简单的 receive(稍后讨论)。结果是一个 actor,它被存储在一个值引用中,供以后使用。
请记住,除了消息之外,actor 不使用其他通信方法。使用 ! 的代码行实际上是一个向 badActor 发送消息的方法,这可能不太直观。Actor 内部还包含另一个 MailBox 元素(已讨论);! 方法接收传递过来的参数(在这里是一个字符串),把它发送给邮箱,然后立即返回。
消息交付给 actor 之后,actor 通过调用它的 receive 方法来处理消息;这个方法从邮箱中取出第一个可用的消息,把它交付给一个模式匹配块。注意,因为这里没有指定模式匹配的类型,所以任何消息都是匹配的,而且消息被绑定到 msg 名称(为了打印它)。
一定要注意一点:对于可以发送的类型,没有任何限制 — 不一定要像前面的示例那样发送字符串。实际上,基于 actor 的设计常常使用 Scala case 类携带实际消息本身,这样就可以根据 case 类的参数/成员的类型提供隐式的 “命令” 或 “动作”,或者向动作提供数据。
例如,假设希望 actor 用两个不同的动作来响应发送的消息;新的实现可能与清单 2 相似:
object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case NegotiateNewContract =>
System.out.println("I won‘t do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case _ =>
System.out.println("Huh? I‘ll be in my trailer.")
}
}
badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
}
}
到目前为止,看起来似乎没问题,但是在运行时,只协商了新合同;在此之后,JVM 终止了。初看上去,似乎是生成的线程无法足够快地响应消息,但是要记住在 actor 模型中并不处理线程,只处理消息传递。这里的问题其实非常简单:一次接收使用一个消息,所以无论队列中有多少个消息正在等待处理都无所谓,因为只有一次接收,所以只交付一个消息。
纠正这个问题需要对代码做以下修改,见清单 3:
receive 块放在一个接近无限的循环中。 object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println("I won‘t do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println("Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println("Huh? I‘ll be in my trailer.")
}
}
}
badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
badActor ! ThatsAWrap
}
}
这下行了!使用 Scala actor 就这么容易。
上面的代码没有反映出并发性 — 到目前为止给出的代码更像是另一种异步的方法调用形式,您看不出区别。(从技术上说,在第二个示例中引入接近无限循环之前的代码中,可以猜出有一定的并发性存在,但这只是偶然的证据,不是明确的证明)。
为了证明在幕后确实有多个线程存在,我们深入研究一下前一个示例:
object Actor3
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
def ct =
"Thread " + Thread.currentThread().getName() + ": "
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println(ct + "I won‘t do it for less than $1 million!")
case Speak(line) =>
System.out.println(ct + line)
case Gesture(bodyPart, action) =>
System.out.println(ct + "(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println(ct + "Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println(ct + "Huh? I‘ll be in my trailer.")
}
}
}
System.out.println(ct + "Negotiating...")
badActor ! NegotiateNewContract
System.out.println(ct + "Speaking...")
badActor ! Speak("Do ya feel lucky, punk?")
System.out.println(ct + "Gesturing...")
badActor ! Gesture("face", "grimaces")
System.out.println(ct + "Speaking again...")
badActor ! Speak("Well, do ya?")
System.out.println(ct + "Wrapping up")
badActor ! ThatsAWrap
}
}
运行这个新示例,就会非常明确地发现确实有两个不同的线程:
main 线程(所有 Java 程序都以它开始)Thread-2 线程,它是 Scala Actors 库在幕后生成的因此,在启动第一个 actor 时,本质上已经开始了多线程执行。
但是,习惯这种新的执行模型可能有点儿困难,因为这是一种全新的并发性考虑方式。例如,请考虑 前一篇文章 中的 Producer/Consumer 模型。那里有大量代码,尤其是在 Drop 类中,我们可以清楚地看到线程之间,以及线程与保证所有东西同步的监视器之间有哪些交互活动。为了便于参考,我在这里给出前一篇文章中的 V3 代码:
package com.tedneward.scalaexamples.scala.V3
{
import concurrent.MailBox
import concurrent.ops._
object ProdConSample
{
class Drop
{
private val m = new MailBox()
private case class Empty()
private case class Full(x : String)
m send Empty() // initialization
def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}
def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}
def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
}
}
尽管看到 Scala 如何简化这些代码很有意思,但是它实际上与原来的 Java 版本没有概念性差异。现在,看看如果把 Producer/Consumer 示例的基于 actor 的版本缩减到最基本的形式,它会是什么样子:
object ProdConSample1
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
}
}
}
consumer ! "Mares eat oats"
consumer ! "Does eat oats"
consumer ! "Little lambs eat ivy"
consumer ! "Kids eat ivy too"
consumer ! "DONE"
}
}
第一个版本确实简短多了,而且在某些情况下可能能够完成所需的所有工作;但是,如果运行这段代码并与以前的版本做比较,就会发现一个重要的差异 — 基于 actor 的版本是一个多位置缓冲区,而不是我们以前使用的单位置缓冲。这看起来是一项改进,而不是缺陷,但是我们要通过对比确认这一点。我们来创建 Drop 的基于 actor 的版本,在这个版本中所有对 put() 的调用必须由对 take() 的调用进行平衡。
幸运的是,Scala Actors 库很容易模拟这种功能。希望让 Producer 一直阻塞,直到 Consumer 接收了消息;实现的方法很简单:让 Producer 一直阻塞,直到它从 Consumer 收到已经接收消息的确认。从某种意义上说,这就是以前的基于监视器的代码所做的,那个版本通过对锁对象使用监视器发送这种信号。
在 Scala Actors 库中,最容易的实现方法是使用 !? 方法而不是 ! 方法(这样就会一直阻塞到收到确认时)。(在 Scala Actors 实现中,每个 Java 线程都是一个 actor,所以回复会发送到与 main 线程隐式关联的邮箱)。这意味着 Consumer 需要发送某种确认;这要使用隐式继承的reply(它还继承 receive 方法),见清单 7:
object ProdConSample2
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}
System.out.println("Sending....")
consumer !? "Mares eat oats"
System.out.println("Sending....")
consumer !? "Does eat oats"
System.out.println("Sending....")
consumer !? "Little lambs eat ivy"
System.out.println("Sending....")
consumer !? "Kids eat ivy too"
System.out.println("Sending....")
consumer !? "DONE"
}
}
如果喜欢使用 spawn 把 Producer 放在 main() 之外的另一个线程中(这非常接近最初的代码),那么代码可能像清单 8 这样:
object ProdConSampleUsingSpawn
{
import concurrent.ops._
def main(args : Array[String]) : Unit =
{
// Spawn Consumer
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("MESSAGE RECEIVED: " + msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too",
"DONE"
);
importantInfo.foreach((msg) => consumer !? msg)
}
}
}
无论从哪个角度来看,基于 actor 的版本都比原来的版本简单多了。读者只要让 actor 和隐含的邮箱自己发挥作用即可。
但是,这并不简单。actor 模型完全颠覆了考虑并发性和线程安全的整个过程;在以前的模型中,我们主要关注共享的数据结构(数据并发性),而现在主要关注操作数据的代码本身的结构(任务并发性),尽可能少共享数据。请注意 Producer/Consumer 示例的不同版本的差异。在以前的示例中,并发功能是围绕 Drop 类(有界限的缓冲区)显式编写的。在本文中的版本中,Drop 甚至没有出现,重点在于两个 actor(线程)以及它们之间的交互(通过不共享任何东西的消息)。
当然,仍然可以用 actor 构建以数据为中心的并发构造;只是必须采用稍有差异的方式。请考虑一个简单的 “计数器” 对象,它使用 actor 消息传达 “increment” 和 “get” 操作,见清单 9:
object CountingSample
{
case class Incr
case class Value(sender : Actor)
case class Lock(sender : Actor)
case class UnLock(value : Int)
class Counter extends Actor
{
override def act(): Unit = loop(0)
def loop(value: int): Unit = {
receive {
case Incr() => loop(value + 1)
case Value(a) => a ! value; loop(value)
case Lock(a) => a ! value
receive { case UnLock(v) => loop(v) }
case _ => loop(value)
}
}
}
def main(args : Array[String]) : Unit =
{
val counter = new Counter
counter.start()
counter ! Incr()
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Console.println(cvalue) }
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Console.println(cvalue) }
}
}
为了进一步扩展 Producer/Consumer 示例,清单 10 给出一个在内部使用 actor 的 Drop 版本(这样,其他 Java 类就可以使用这个 Drop,而不需要直接调用 actor 的方法):
object ActorDropSample
{
class Drop
{
private case class Put(x: String)
private case object Take
private case object Stop
private val buffer =
actor
{
var data = ""
loop
{
react
{
case Put(x) if data == "" =>
data = x; reply()
case Take if data != "" =>
val r = data; data = ""; reply(r)
case Stop =>
reply(); exit("stopped")
}
}
}
def put(x: String) { buffer !? Put(x) }
def take() : String = (buffer !? Take).asInstanceOf[String]
def stop() { buffer !? Stop }
}
def main(args : Array[String]) : Unit =
{
import concurrent.ops._
// Create Drop
val drop = new Drop()
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);
importantInfo.foreach((msg) => { drop.put(msg) })
drop.put("DONE")
}
// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
drop.stop()
}
}
}
可以看到,这需要更多代码(和更多的线程,因为每个 actor 都在一个线程池内部起作用),但是这个版本的 API 与以前的版本相同,它把所有与并发性相关的代码都放在 Drop 内部,这正是 Java 开发人员所期望的。
actor 还有更多特性。
在规模很大的系统中,让每个 actor 都由一个 Java 线程支持是非常浪费资源的,尤其是在 actor 的等待时间比处理时间长的情况下。在这些情况下,基于事件的 actor 可能更合适;这种 actor 实际上放在一个闭包中,闭包捕捉 actor 的其他动作。也就是说,现在并不通过线程状态和寄存器表示代码块(函数)。当一个消息到达 actor 时(这时显然需要活动的线程),触发闭包,闭包在它的活动期间借用一个活动的线程,然后通过回调本身终止或进入 “等待” 状态,这样就会释放线程。(请参见 参考资料 中 Haller/Odersky 的文章)。
在 Scala Actors 库中,这要使用 react 方法而不是前面使用的 receive。使用 react 的关键是在形式上 react 不能返回,所以 react 中的实现必须重复调用包含 react 块的代码块。简便方法是使用 loop 结构创建一个接近无限的循环。这意味着 清单 10 中的 Drop 实现实际上只通过借用调用者的线程执行操作,这会减少执行所有操作所需的线程数。(在实践中,我还没有见过在简单的示例中出现这种效果,所以我想我们只能暂且相信 Scala 设计者的说法)。
在某些情况下,可能选择通过派生基本的 Actor 类(在这种情况下,必须定义 act 方法,否则类仍然是抽象的)创建一个新类,它隐式地作为 actor 执行。尽管这是可行的,但是这种思想在 Scala 社区中不受欢迎;在一般情况下,我在这里描述的方法(使用 Actor 对象中的 actor 方法)是创建 actor 的首选方法。
因为 actor 编程需要与 “传统” 对象编程不同的风格,所以在使用 actor 时要记住几点。
首先,actor 的主要能力来源于消息传递风格,而不采用阻塞-调用风格,这是它的主要特点。(有意思的是,也有使用消息传递作为核心机制的面向对象语言。最知名的两个例子是 Objective-C 和 Smalltalk,还有 ThoughtWorker 的 Ola Bini 新创建的 Ioke)。如果创建直接或间接扩展Actor 的类,那么要确保对对象的所有调用都通过消息传递进行。
第二,因为可以在任何时候交付消息,而且更重要的是,在发送和接收之间可能有相当长的延迟,所以一定要确保消息携带正确地处理它们所需的所有状态。这种方式会:
第三,actor 应该不会阻塞,您从前面的内容应该能够看出这一点。从本质上说,阻塞是导致死锁的原因;代码可能产生的阻塞越少,发生死锁的可能性就越低。
很有意思的是,如果您熟悉 Java Message Service (JMS) API,就会发现我给出的这些建议在很大程度上也适用于 JMS — 毕竟,actor 消息传递风格只是在实体之间传递消息,JMS 消息传递也是在实体之间传递消息。它们的差异在于,JMS 消息往往比较大,在层和进程级别上操作;而 actor 消息往往比较小,在对象和线程级别上操作。如果您掌握了 JMS,actor 也不难掌握。
actor 并不是解决所有并发性问题的万灵药,但是它们为应用程序或库代码的建模提供了一种新的方式,所用的构造相当简单明了。尽管它们的工作方式有时与您预期的不一样,但是一些行为正是我们所熟悉的 — 毕竟,我们在最初使用对象时也有点不习惯,只要经过努力,您也会掌握并喜欢上 actor。
标签:
原文地址:http://www.cnblogs.com/scala/p/4385658.html