Scala Actor 多线程并发读取文件数量的案例
1.读取文件夹下文件数量的Actor代码
我们想要计算该目录层次结构之下的所有文件的数目。我们可以将该问题划分为查找给定目录下的每个子目录中的文件总数,然后再归并结果。反过来,这也告诉我们,我们有两个主要部分:一是探索文件,二是归并结果。
对于一个给定目录下的多个子目录,查找子目录可以并发进行。因为在任何给定时刻,对于一个Actor,我们只能得到一个计算单元。因为我们需要同时执行并发任务,所以我们将需要多个Actor。在我们的设计中,FileExplorer 是一个无状态的Actor—我们将使用由Akka 提供的称为RoundRobinPool 的路由器,它由这个类的几个实例支撑。顾名思义,发送到这个路由器的消息将会被均匀地路由到支撑这个路由器的多个Actor。我们还会使用另一个Actor,其只是一个FilesCounter 的实例。这个Actor 将是有状态的,这里便是被隔离的可变状态所存在的地方,并会记录文件的个数。让我们先创建无状态的 FileExplorer Actor。
class FileExplorer extends Actor { def receive: Receive = { case dirName: String => val file = new File(dirName) val children = file.listFiles() var filesCount = 0 if (children != null) { children.filter { _.isDirectory } .foreach { sender ! _.getAbsolutePath } filesCount = children.count { !_.isDirectory } } sender ! filesCount } }
在 receive()方法中,我们只查找包含该目录名的字符串。当接收到消息之后,我们将在给定的目录下查找文件和子目录。我们简单地将每个子目录发送给该消息的发送者(一个监管Actor),这样它便可以让其他FileExplorer 着手遍历该子目录。之后,我们还将在这个目录下查找到的文件数发送给该发送者。
下面让我们看一看有状态的 FilesCounter Actor。
2.统一监管统计的线程Actor代码
import akka.actor._ import akka.routing._ class FilesCounter extends Actor { val start: Long = System.nanoTime var filesCount = 0L var pending = 0 val fileExplorers: ActorRef = context.actorOf(RoundRobinPool(100).props(Props[FileExplorer])) def receive: Receive = { case dirName: String => pending = pending + 1 fileExplorers ! dirName case count: Int => filesCount = filesCount + count pending = pending - 1 if (pending == 0) { val end = System.nanoTime println(s"Files count: $filesCount") println(s"Time taken: ${(end - start) / 1.0e9} seconds") context.system.terminate() } } }
这个 Actor 维护了几个字段。start 字段记录了该Actor 被激活的时间。filesCount和pending 字段是可变变量,分别记录了已经发现的文件数以及目前正在进行的尚未完成的文件遍历数。最后一个字段fileExplorers 持有一个RoundRobinPool 实例的引用,该路由器本身也是一个Actor,它持有了一个具有100 个FileExplorer Actor 的实例。
我们希望 fileExplorers 引用的路由器以及我们创建的100 个Actor,都能够存在于同一个ActorSystem 中,并和创建它们的Actor 共享相同的线程池。为此,我们需要访问FileCounter Actor 正在运行的上下文ActorSystem—这时可以使用context()方法。此外,我们需要指示路由器创建FileExplorer Actor 的实例。为此,我们使用了Props类—可以将这等同于在Java 中提供FileExplorer.class。
我们在 receive()方法中匹配两种类型的消息:一个表示将要探索的目录名的String,另一个表示到目前为止找到的文件计数的Int。当接收到具有目录名的消息时,我们增加pending 的值以表示正在遍历文件,并使用路由器将目录的遍历调度给FileExplorer Actor。当计数作为消息被接收时,我们将该计数添加到隔离的可变字段filesCount 中,并递减pending 的值来表示已经结束了对一个子目录的遍历。
在 receive()方法中还剩下最后一项任务。如果pending 变量的值降为0,则表示已经遍历完了所有的子目录。在这个时候,我们将报告文件计数以及所耗费的时间,并调用ActorSystem 上的方法来关闭ActorSystem。
3.初始化调用Actor多线程代码
已经有这两个就绪的Actor 了。我们还需要引导代码来创建ActorSystem 以及一个FilesCounter 实例。我们来编写下面的代码。
import akka.actor._ object CountFiles extends App { val system = ActorSystem("sample") val filesCounter = system.actorOf(Props[FilesCounter]) filesCounter ! args(0) }
是时候看一下效果了。使用下面的命令编译这 3 个文件,并运行CountFiles 单例:
scalac -d classes FilesCounter.scala FileExplorer.scala CountFiles.scala scala -classpath classes CountFiles /Users/venkats/agility
现在让我们看一下输出结果:
Files count: 479758 Time taken: 5.609851764 seconds
输出结果表明,与我们一开始的顺序执行版本相比,这个版本在速度上有了可观的改进。
我们不需要大量的代码来实现这一点。此外,也没有杂乱的线程创建和同步代码。这段代码所产生的清晰度令人相当欣慰。在使用JDK 的解决方案时,我们通常不知道代码是否正确,与之不同的是,这段代码非常容易理解,并且易于改进。