标签:art 讲解 while 机制 current getline 汇总 并发 执行
1.对于上一篇讲解的scala的一些补充
val files = Array[String]("a.txt","b.txt","c.txt")
for(f <- files){xxxx}
目标一:熟悉Scala Actor并发编程
目标二:为学习Akka做准备
注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。
Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,
老版本的Actor已经废弃
2.概念
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的
并发机制,
Scala是运用消息(message)的发送、接收来实现多线程的。
使用Scala能够更容易地实现多线程应用的开发
3.Actor方法执行顺序
1.首先调用start()方法执行Actor
2.调用start()方法后其act()方法会被执行
3.向Actor发送消息
4.wordCount的Actor的计算方法,虽然现在不用,但是思路还是有用的
package main.cn.wj.test
import scala.actors.{Actor, Future}
import scala.collection.immutable.HashSet
import scala.io.Source
import scala.collection.mutable.ListBuffer
/**
* Created by WJ on 2016/12/22.
*/
class Task extends Actor{
override def act(): Unit = {
loop{
react{
case SubmitTask(filename) =>{
val result = Source.fromFile(filename).getLines().flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size)
sender ! ResultTask(result)
}
case StopTask =>{
exit()
}
}
}
}
}
case class SubmitTask(filename:String)
case class ResultTask (result:Map[String,Int])
case object StopTask
object ActorWordCount {
def main(args: Array[String]): Unit = {
var replySet = new HashSet[Future[Any]]()
val resultList = new ListBuffer[ResultTask]
val files = Array[String]("E://Test/words.log", "E://Test/words.txt")
for (f <- files) {
val actor = new Task
val reply = actor.start() !! SubmitTask(f) //<reply 等同于Future>
replySet += reply
}
while(replySet.size > 0 ){
val toCompute = replySet.filter(_.isSet)
for(f <- toCompute) {
val result = f.apply().asInstanceOf[ResultTask]
resultList += result
replySet -= f
}
Thread.sleep(100)
}
// reduce功能 ,汇总
//List
val fr = resultList.flatMap(_.result).groupBy((_._1)).mapValues(_.foldLeft(0)(_+_._2))
println(fr)
}
}
5.看了上面的关于多线程相关的知识点,看看我们的线程池的代码
package main.cn.wj.test import java.util.concurrent.{Executor, Executors} /** * Created by WJ on 2016/12/22. */ object ThreadDemo { def main(args: Array[String]): Unit = { val pool = Executors.newFixedThreadPool(5); for (i <- 1 to 10){ pool.execute(new Runnable { override def run(): Unit = { println(Thread.currentThread().getName) Thread.sleep(1000) } }) } } }
标签:art 讲解 while 机制 current getline 汇总 并发 执行
原文地址:http://www.cnblogs.com/wnbahmbb/p/6213423.html