Scala Actor 线程安全

2019-06-03 09:18:19 | 编辑

1.Actor 和线程调度关系

线程之于 Actor 类似于客服经理之于消费者。当你拨打客户服务热线时,任何有空的客服经理都会接听你的热线。如果你挂掉之前的电话并重新拨通热线,此时上一位客服经理已经在处理别的客服电话了,那么另一位完全随机的客服经理现在将会回答你的疑问。只有在极端巧合下(在这两次热线电话的过程中),你才可能和同一位客户经理谈话。线程池中的线程对于Actor 来说非常像客服经理。。为了观察到这一点,我们稍微修改一下调用代码,接着上节的案例稍微改一下:

depp ! "Wonka"
hanks ! "Gump"
Thread.sleep(100)//在发送给 Actor 的两组消息之间,我们添加了一个小小的100 ms 的延迟
depp ! "Sparrow"
hanks ! "Phillips"

输出结果:

Wonka - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Gump - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Sparrow - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Phillips - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Calling from Thread[main,5,main]

一旦两个线程帮助 Actor 处理完它们的第一组消息,它们便跑一旁边休息去了,。但当下一组消息到达时,尽职尽责的线程便又会马上回归到它们的工作任务中。它们对曾经服务过的线程并没有什么亲和力。这也是在短暂的延迟之后,线程交换了它们所服务的Actor 的原因。

每次运行代码时,你都可能会看到不同的线程与Actor 之间的配对。实质上,这也表明了线程并不和Actor 绑定———一个线程池服务于多个Actor。

Akka 提供了大量的工具来配置线程池的大小、消息队列的大小以及许多其他参数,包括与远程Actor 进行交互。


2.Actor 线程安全

在是使用多线程时,首先必须得考虑使用同一变量或者资源时的共享资源的线程安全问题,程序员通常都会创建共享的可变变量,并使用同步原语sychonrized来提供线程安全性。但你得对锁和线程得关系学习得非常清楚,否则很容易出现灾难性得问题。


但在scala中就不一样,那么Actor 是如何消除共享的可变性所带来的痛苦的。因为Actor 一次最多只会处理一条消息,所以在Actor 中保存的任何字段都是自动线程安全的。它是可变的,但却没有共享可变性。一个Actor 的非final 字段具备自动隔离的可变性。

继续修改一下前面的案例,以便它追踪接收到的消息的数目。这将会为Actor引入状态—Actor 可以选择性地存储状态。

import akka.actor._
import scala.collection._
case class Play(role: String)
case class ReportCount(role: String)
class HollywoodActor() extends Actor {
  val messagesCount: mutable.Map[String, Int] = mutable.Map()
  def receive: Receive = {
    case Play(role) =>
      val currentCount = messagesCount.getOrElse(role, 0)
      messagesCount.update(role, currentCount + 1)
      println(s"Playing $role")
    case ReportCount(role) =>
      sender ! messagesCount.getOrElse(role, 0)
  }
}



新版本的 Actor 将会接收两种类型的消息。第一种消息类型用于告诉Actor 来扮演某一个角色,而第二种消息类型则用于查询Actor 已经扮演过某一个角色的次数,即它收到相同消息的次数。对于消息类型来说,我们创建了两个case 类,即play 和ReportCount。case 类非常适合这种场景,因为其简洁、持有不可变数据,并且能够很好地和Scala 的模式匹配设施一起工作。


我们将会向两个Actor 发送一些消息,并询问每条消息被Actor 接收到的次数。

除了发送,ReportCount则需要一些额外的工作。消息的发送者希望从Actor 接收到响应。为此,Akka 提供了一个询问(ask)模式。因为发送一条消息并等待响应可能会导致潜在的活锁—消息可能永远也不会到达,所以这个模式强制使用一个超时时间。


1.png

这段代码创建了两个Actor,并发送了一些Play 类型的消息给它们。到目前为止,发送的消息都是“发送并忘记”模式,即非阻塞的。

在代码的第 22 到24 行,我们发送了3 条ReportCount 类型的消息。这些消息都需要响应,因此,我们使用了?()方法而不是!()方法。

还记得!()方法代表名为tell()的更加具体的方法,同样,这个神秘的?()也代表名为ask()的更加具体的方法。要使用这个方法,我们需要import akka.pattern.ask。为了防止活锁,ask()方法需要一个超时时间,但该参数使用的是在第21 行中定义的隐式变量。

不同于什么也不返回的!()方法,?()方法返回一个Future。我们将3 次调用返回的Future,并将其分别保存在变量wonkaFuture、sparrowFuture 和gumpFuture 中。现在,消息已经发送,是时候等待并接收响应了。我们使用了Await 类的result()方法来做到这一点。这个方法接受我们等待的Future,以及我们愿意耐心等待响应到达的最长时长作为参数。最后,如果响应在超时时间之前到达,那么我们将打印出这些结果。


Sent roles to play
Playing Wonka
Playing Gump
Playing Wonka
Playing Sparrow
Depp played Wonka 2 time(s)
Depp played Sparrow 1 time(s)
Hanks played Gump 1 time(s)


该输出表明:Actor 在它们收到消息时处理了它们的消息,此外,还正确地追踪了每种消息被接收到的次数。

每当一条消息传递给了一个Actor 时,Actor 将会在任意的时间选择一条消息进行处理,它们各自的线程将会自动跨越内存栅栏。有了清晰的设计、更少的代码量、线程的自动切换以及由Actor 模型提供的线程安全,你便可以享受内心的平静并获得生产力上的提升。