码迷,mamicode.com
首页 > 编程语言 > 详细

混合使用 ForkJoin, Akka, Future 实现一千万个不重复整数的排序

时间:2016-05-29 23:08:43      阅读:261      评论:0      收藏:0      [点我收藏+]

标签:

 

    本来只是想写一个 ForkJoin 的示例,但写着写着就加入了 akka, future 的元素, 是在解决问题的过程中逐渐引入的。我觉得这种学习的方式很好,就是在解决一个问题的过程中,可以综合地探索和学习到很多不同的东西。传统的学习讲究"循序渐进"的方式,但是"跳跃式+快速试错"也许是学习新技术的更好的方法。 :)

  原本是想实现十亿个不重复整数的排序, 由于文件外排序没有解决,因此,暂时实现的是一千万个不重复数,可以一次性加载到 2G 的内存里。

 

   一、 任务拆分

      首先要进行任务拆分。要实现一千万个不重复整数的排序, 可以拆分为三个子任务: (1)  生成一千万的不重复整数并写入文件 NumberGeneratorTask; (2) 从文件读取并检测确实生成的是一千万个不重复的整数 CheckUnduplicatedNumbersActor; (3)  从文件读取整数进行排序和排序检测 BigfileSortActor。接下来逐一实现这些子任务。入口如下。这里使用了 Akka 的框架及 Java ForkJoin 线程池实例。    

package scalastudy.concurrent.forkjoin

import java.util.concurrent.{TimeUnit, ForkJoinPool}

import akka.actor.{Props, ActorSystem}

import scalastudy.concurrent.actors.{BigFileSortActor, CheckUnduplicatedNumbersActor, StatWordActor}
import scalastudy.concurrent.config.ActorSystemFactory

/**
  * Created by shuqin on 16/5/18.
  */
object BillionNumberSort extends App {

    val numbers = 10000000    // 在 [0, 2^31-1] 生成 numbers 个不重复的整数

    launch()

    def launch(): Unit = {
        val system:ActorSystem = ActorSystemFactory.newInstance()
        val bigfileSortActor = system.actorOf(Props(new BigFileSortActor(numbers)))
        val checkNumberActor = system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor")
        val numGenTask = new NumberGeneratorTask(numbers, 0, Integer.MAX_VALUE, checkNumberActor)

        val pool = new ForkJoinPool()
        pool.execute(numGenTask)
        pool.shutdown;
        pool.awaitTermination(420, TimeUnit.SECONDS);
        pool.shutdownNow
    }

}

 

  二、 生成一千万个不重复整数 NumberGeneratorTask

  显然,这个子任务是可以采用 ForkJoin 来完成的。 ForkJoin 是分治思想的框架性实现, 将原问题分解为同样性质的多个子问题,然后将子问题的解组合起来得到原问题的解。通常采用二分法。实现上,通常会采用递归结构, 注意递归不要太深。 NumberGeneratorTask 的实现如下:

package scalastudy.concurrent.forkjoin

import java.util.concurrent.RecursiveAction

import akka.actor.ActorRef
import zzz.study.algorithm.select.RandomSelector

/**
  *  Created by shuqin on 16/5/19.
  *
  * 在 [start, end] 选出 num 个不重复的整数
  *
  */
class NumberGeneratorTask(num:Int, start:Int, end:Int, checkNumberActor: ActorRef)  extends RecursiveAction {

    // 每次生成不超过 threshold 个不重复的整数数组; 
    // 该值不能过小, 否则会因递归层次过深导致内存不足.
    private val threshold = 500

    override def compute(): Unit = {

        // println("Select: " + num  + " unduplicated numbers from [" + start + " " + end + ")");

        if (num <= threshold) {

            if (num > end - start+1) {
                checkNumberActor ! start.to(end).toList
            }
            else {
                val randInts = RandomSelector.selectMDisorderedRandInts2(num, end-start+1)
                checkNumberActor ! randInts.map(i=>i+start).toList
            }
        }
        else {
            val middle = start/2 + end/2
            val leftTask = new NumberGeneratorTask(num/2, start, middle, checkNumberActor)
            val rightTask = new NumberGeneratorTask((num+1)/2, middle+1, end, checkNumberActor)
            //println("Left: [" + start + "-" + middle + "," + num/2 + "]")
            //println("Right: [" + (middle+1) + "-" + end + "," + (num+1)/2 + "]")

            leftTask.fork
            rightTask.fork
            leftTask.join
            rightTask.join
            checkNumberActor ! (start, end)
        }
    }

}

 

  三、 检测生成的一千万个整数不重复 CheckUnduplicatedNumbersActor

     嗯,看上去没有什么特别的技巧。有三点值得注意: (1)  这里涉及到 Actor 通信; 怎样判断整数生成任务完成从而可以开始检测了呢?在 NumberGeneratorTask 生成最后一组整数时并回退到最开始的调用层时,就会发送 (0, Integer.MAX_VALUE) 作为信号, 而 CheckUnduplicatedNumbersActor 则通过 case (0, Integer.MAX_VALUE) 可以匹配到这一点; (2) 在这里也使用了策略模式。对于一千万个整数来说,内存占用 40M 左右, 2G 内存是装滴下的, 若是十亿个整数,那么就需要 4G,就不能一次性加载了。因此这里定义了个接口并实现了一次性加载策略。 读者感兴趣可以实现多次加载策略,以应对内存不够的情形; (3) Source.fromFile(filename).getLines 这里返回的是迭代器, 如果内存不够用的话,就必须使用这个方法,而不是 Source.fromFile(filename).getLines.toList , 后者会将所有行全部加载到内存中从而导致 OutOfMemoryError .

package scalastudy.concurrent.actors

import java.io.{File, PrintWriter}

import akka.actor.{ActorRef, Actor}

import scala.io.Source
import scala.collection.immutable.{List}
import scala.collection.mutable.Set

import scalastudy.utils.{PathConstants}


/**
  * Created by shuqin on 16/5/19.
  */
class CheckUnduplicatedNumbersActor(numbers:Int, bigfileSortActor: ActorRef) extends Actor {

    val filename = PathConstants.projPath + "/data/"+numbers+".txt"
    val fwResult = new PrintWriter(new File(filename))

    var count = 0;

    def checkUnduplicatedNumbers(): Unit = {
        println("Expected: " + numbers + " , Actual Received: " + count)
        assert(count == numbers)
        assert(new OnceLoadStrategy().checkUnduplicatedNumbersInFile(filename) == true)
        println("checkUnduplicatedNumbers passed.")
    }

    override def receive: Receive = {

        case numberList: List[Int] =>
            fwResult.write(numberList.mkString(" ") + "\n");
            count += numberList.length

        case (0, Integer.MAX_VALUE) =>
            println("Reach End.")
            fwResult.flush
            fwResult.close
            checkUnduplicatedNumbers
            bigfileSortActor ! filename
    }

    /**
     * 一次性加载所有数到内存, 适用于内存可以装下所有数的情况
     * 比如 10000000 个整数占用 40M 空间, 2G 内存是绰绰有余的, 但十亿占用 4G 空间失效
     */
    class OnceLoadStrategy extends CheckUnduplicatedStrategy {
        def checkUnduplicatedNumbersInFile(filename:String):Boolean = {
            var numbersInFile = 0
            val unDupNumberSet = Set[Int]()
            Source.fromFile(filename).getLines.
                foreach { line =>
                    val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet
                    numbersInFile += numbersInLine.size;
                    unDupNumberSet ++= numbersInLine
                }
            println("Expected: " + numbers + " , Actual In File: " + numbersInFile)
            println("Unduplicated numbers in File: " + unDupNumberSet.size)
            unDupNumberSet.size == numbers
        }
    }

    trait CheckUnduplicatedStrategy {
        def checkUnduplicatedNumbersInFile(filename:String):Boolean
    }
}

 

    四、 大文件排序 BigfileSortActor

      Oh, 终于进入正题了。大文件排序当然采用归并排序了。 在这个实现里,值得注意的是采用了 Future 全异步框架。 可以看到:

  (1) def produceFuture(line:String): Future[List[List[Int]]] 将文件的每一行(包含 threshold 个整数)转化为一个对行内整数排序的 Future, 可以在后续获取结果; 对于一个文件,就是获得了 futureTasks =  List[Future[List[List[Int]]]] ;  List[List[Int]] 是为了让后面的 Reduce 语法上走得通。 

   (2)   val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) 将 List[Future[List[List[Int]]]] 整合成一个 TotalFuture, 这个 TotalFuture 的结果是 futureTasks 里面的所有 Future 结果的连接; 每一个 Future 的结果是一个已排序的列表; 那么 TotalFuture 的结果是一个已排序列表的列表。

   (3)   注意到下面这行代码: 是将一个 Future A 转化为另一个 Future B. 其中 B 的结果是基于 A. 在本例中,即是将已排序列表的列表合并为最终列表,但仍然返回的是 Future 而不是最终列表。为什么要这么写, 而不是将 sortedListsFuture 的结果取出来再合并呢? 这是由于之前的所有动作都是异步的。 如果应用只是取排序的结果,那么也没什么; 但如果应用要将 sortedListsFuture 的结果写入文件呢? 进而还要做一下排序检测? 那么, 就不得不在后面加入 TimeUnit.SECONDS.sleep(n) 的代码, 让主线程休息一会了(因为前面整个是异步的, 在 sortedListsFuture 还没完成时,后面的代码就会被执行了)! 而且你得不断估计前面的排序/合并操作究竟大约需要多少时间从而不断调整休眠的时间! 之前就是这样实现的! 但这样并不符合 Future 异步框架的初衷! 因此后面,我突然觉得要写成全异步的, 也体验到了写成全异步应用的滋味~~ :) 要求确实是有点高,需要不断从 Future 转换成新的 Future ~~ 同时你也发现, Scala Future 也提供了一个帮助编写全异步框架的 API ~~

 

sortedListsFuture map {
      value:List[List[Int]] =>
            CollectionUtil.mergeKOrderedList(value)
}

 

   (4) 由于后面将排序结果写入文件以及从文件检测排序是否 OK 都是同步的,因此,可以在排序 Future 完成后执行。 注意到 Future 的非阻塞写法: f.onComplete { case Success(result) => doWith(result) ;  case Failure(ex) => doWith(ex) }   

    (5)  为了将列表链接起来,也试错了好几次:  (x :: y :: Nil).flatten ; 如果写成 reduce(_ :: _ :: Nil) 是会报错的; 写成 reduce(_.flatten :: _.flatten :: Nil) 最终会合并成两个列表不符合预期。 

package scalastudy.concurrent.actors

import java.io.{File, PrintWriter}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{TimeUnit, Callable, Executors}

import akka.actor.{Props, ActorSystem, Actor}
import akka.dispatch.Futures

import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future}
import scala.io.Source
import scala.util.{Failure, Success}
import scalastudy.utils.{CollectionUtil, PathConstants, DefaultFileUtil}
import scala.concurrent.ExecutionContext.Implicits.global


/**
  * Created by shuqin on 16/5/20.
  */
class BigFileSortActor(numbers: Int) extends Actor {

    override def receive: Receive = {

        case filename:String =>
            println("Received File: " + filename)
            sortFile(filename)
    }

    def produceFuture(line:String): Future[List[List[Int]]] = {
        val origin = line.split("\\s+").map( s => Integer.parseInt(s)).toList
        Future {
            List(origin.sorted)
        }
    }

    def cat(x: List[List[Int]],y:List[List[Int]]): List[List[Int]] = {
        return (x :: y :: Nil).flatten
    }

    def obtainSortedFuture(futureTasks:List[Future[List[List[Int]]]]):Future[List[Int]] = {
        val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_))

        sortedListsFuture map {
            value:List[List[Int]] =>
                CollectionUtil.mergeKOrderedList(value)
        }
    }

    def checkSorted(filename:String): Unit = {
        var last = 0
        var count = 0
        Source.fromFile(filename + ".sorted.txt").getLines().toList.foreach {
            line =>
                val number = Integer.parseInt(line.trim)
                assert(number > last)
                count += 1
                last = number
        }
        assert(count == numbers)
        println("test sorted passed.")
    }

    def sortFile(filename:String):Unit = {

        val futureTasks = DefaultFileUtil.readFileLines(filename).map(produceFuture(_))
        println("task numbers: " + futureTasks.size)

        val allNumberSortedFuture = obtainSortedFuture(futureTasks)

        allNumberSortedFuture.onComplete {
            case Success(value:List[Int]) =>
                println("sort finished.")
                writeSorted(value, filename)
                checkSorted(filename)
            case Failure(ex) =>
                println("Sort failed: " + ex.getMessage)
        }
    }

    def writeSorted(allNumberSorted: List[Int], filename: String): Unit = {
        val fwResult = new PrintWriter(new File(filename + ".sorted.txt"))
        fwResult.write(allNumberSorted.mkString("\n"))
        fwResult.flush
        fwResult.close
    }
}

object BigFileSortActorTest {

    def main(args:Array[String]):Unit = {

        val numbers = 10000000
        val system = ActorSystem("BigFileSortActorTest")
        val bigFileSortActor = system.actorOf(Props(new BigFileSortActor(numbers)),name="bigFileSortActor")
        bigFileSortActor ! PathConstants.projPath + "/data/" + numbers +".txt"

        TimeUnit.SECONDS.sleep(640)
        system.shutdown

    }

}

 

  五、 辅助类

      (1)  CollectionUtil 实现了一个二路有序列表合并和多路有序列表合并。 其中多路有序列表合并后续可以优化成并行的。Scala 的 List 是一个列表 (head::(tail::Nil)), 空列表可以用 List(), Nil 来表示; 取元素可以用 List(index) 或 List.head , List.tail ; 

      (2) 从 N 个数中选出不重复的 M 个数参见 RandomSelector 的实现。 

package scalastudy.utils

import scala.collection.mutable
import scala.collection.mutable.{ListBuffer, Map}

/**
 * Created by lovesqcc on 16-4-2.
 */
object CollectionUtil {

  def main(args: Array[String]): Unit = {

    testMerge
    val map = Map("shuqin" -> 31, "fanfan" -> 26, "yanni" -> 28)
    sortByValue(map).foreach { println }
  }

  def testMerge(): Unit = {
    assert(CollectionUtil.merge(Nil, Nil) == List())
    assert(CollectionUtil.merge(List(), Nil) == List())
    assert(CollectionUtil.merge(List(), List()) == List())
    assert(CollectionUtil.merge(List(), List(1,3)) == List(1,3))
    assert(CollectionUtil.merge(List(4,2), List()) == List(4,2))
    assert(CollectionUtil.merge(List(4,2), Nil) == List(4,2))
    assert(CollectionUtil.merge(List(2,4), List(1,3)) == List(1,2,3,4))
    assert(CollectionUtil.merge(List(2,4), List(1,3,5)) == List(1,2,3,4,5))
    assert(CollectionUtil.merge(List(2,4,6), List(1,3)) == List(1,2,3,4,6))

    assert(CollectionUtil.mergeKOrderedList(Nil) == List())
    assert(CollectionUtil.mergeKOrderedList(List()) == List())
    assert(CollectionUtil.mergeKOrderedList(List(List())) == List())
    assert(CollectionUtil.mergeKOrderedList(List(List(1,2))) == List(1,2))
    assert(CollectionUtil.mergeKOrderedList(List(List(), List())) == List())
    assert(CollectionUtil.mergeKOrderedList(List(List(), List(1,3))) == List(1,3))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List())) == List(2,4))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List(1,3))) == List(1,2,3,4))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4), List(1,3,5))) == List(1,2,3,4,5))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4,6), List(1,3))) == List(1,2,3,4,6))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4,7), List(1,6), List(3,5))) == List(1,2,3,4,5,6,7))
    assert(CollectionUtil.mergeKOrderedList(List(List(2,4,9), List(1,7), List(3,6), List(5,8))) == List(1,2,3,4,5,6,7,8,9))
    println("test merge list passed.")
  }

  /**
   * 对指定 Map 按值排序
   */
  def sortByValue(m: Map[String,Int]): Map[String,Int] = {
    val sortedm = new mutable.LinkedHashMap[String,Int]
    m.toList.sortWith{case(kv1,kv2) => kv1._2 > kv2._2}.foreach { t =>
      sortedm(t._1) = t._2
    }
    return sortedm
  }

  /**
   * 合并两个有序列表
   */
  def merge(xList: List[Int], yList: List[Int]): List[Int] = {
    if (xList.isEmpty) {
      return yList
    }
    if (yList.isEmpty) {
      return xList
    }
    val result = ListBuffer[Int]()
    var xListC = xList
    var yListC = yList
    while (!xListC.isEmpty && !yListC.isEmpty ) {
      if (xListC.head < yListC.head) {
        result.append(xListC.head)
        xListC = xListC.tail
      }
      else {
        result.append(yListC.head)
        yListC = yListC.tail
      }
    }
    if (xListC.isEmpty) {
      result.appendAll(yListC)
    }
    if (yListC.isEmpty) {
      result.appendAll(xListC)
    }

    result.toList
  }

  /**
   * 合并k个有序列表
   */
  def mergeKOrderedList(klists: List[List[Int]]): List[Int] = {
      if (klists.isEmpty) {
        return List[Int]()
      }
      var nlist = klists.size
      if (nlist == 1) {
        return klists.head
      }
      var klistp = klists;
      val kbuf = ListBuffer[List[Int]]()
      while (nlist > 1) {
        for (i <- 0 to nlist/2-1) {
          kbuf.insert(i, merge(klistp(2*i), klistp(2*i+1)))
          if (nlist%2 == 1) {
            kbuf.append(klistp(nlist-1))
          }
        }
        nlist = nlist - nlist/2
        klistp = kbuf.toList
      }

      kbuf.toList.head
  }

}
package zzz.study.algorithm.select;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;

public class RandomSelector {
    
    private RandomSelector() {
        
    }
    
    private static Random rand = new Random(47);
    
    /**
     * bigRandInt: 返回一个非常大的随机整数,该整数的二进制位数不小于 bits
     */
    public static int bigRandInt(int bits)
    { 
         if (bits >= 32 || bits <= 0) {
             throw new IllegalArgumentException("参数 " + bits + " 错误,必须为小于 32 的正整数!");
         }
         int baseNum = 1 << (bits - 1);
         return rand.nextInt(Integer.MAX_VALUE - baseNum) + baseNum;
    }
    
    /**
     * randRange: 生成给定范围的随机整数
     * @param low  范围下限
     * @param high 范围上限(不包含)
     * @return 给定范围的随机整数
     */
    public static int randRange(int low, int high)
    {
        if (high <= low) {
            throw new IllegalArgumentException("参数 [" + low + "," + high + "] 错误,第一个参数必须小于第二个参数!");
        }
        return bigRandInt(30) % (high-low) + low;    
    }
    
    /**
     * selectMOrderedRandInts : 从指定集合中随机选择指定数目的整数,并以有序输出
     * @param m 需要选取的整数数目
     * @param n 指定整数集合 [0:n-1]
     * @return 随机选取的有序整数列表
     */
    public static int[] selectMOrderedRandInts(int m, int n)
    {
        checkParams(m, n);
        int[] result = new int[m];
        int remaining = n;
        int selector = m;    
        for (int k=0, i=0; i < n; i++) {
            if ((bigRandInt(30) % remaining) < selector) {
                result[k++] = i;
                selector--;
            }
            remaining--;    
        }
        return result;    
    }
    
    /**
     * selectMOrderedRandInts2 : 从指定集合中随机选择指定数目的整数,并以有序输出
     * @param m 需要选取的整数数目
     * @param n 指定整数集合 [0:n-1]
     * @return 随机选取的有序整数列表
     */
    public static int[] selectMOrderedRandInts2(int m, int n)
    {
        checkParams(m, n);
        Set<Integer> holder = new TreeSet<Integer>();
        while (holder.size() < m) {
            holder.add(bigRandInt(30) % n);
        }
        return collectionToArray(holder);
    }
    
    /**
     * selectMOrderedRandInts3 : 从指定集合中随机选择指定数目的整数,并以有序输出
     * @param m 需要选取的整数数目
     * @param n 指定整数集合 [0:n-1]
     * @return 随机选取的有序整数列表
     */
    public static int[] selectMOrderedRandInts3(int m, int n)
    {
        checkParams(m, n);
        int[] arr = selectMDisorderedRandInts3(m, n);
        Arrays.sort(arr);
        return arr;
    }
    
    /**
     * selectMDisorderedRandInts2: 从指定整数集合中随机选择指定数目的整数,并以无序输出
     * @param m 需要选取的整数数目
     * @param n 指定整数集合 [0:n-1]
     * @return 随机选取的无序整数列表
     */
    public static int[] selectMDisorderedRandInts2(int m, int n)
    {
        checkParams(m, n);
        Set<Integer> intSet = new HashSet<Integer>();
        while (intSet.size() < m) {
            intSet.add(bigRandInt(30) % n);
        }
        return collectionToArray(intSet);
    }
    
    /**
     * selectMDisorderedRandInts3: 从指定整数集合中随机选择指定数目的整数,并以无序输出
     * @param m 需要选取的整数数目
     * @param n 指定整数集合 [0:n-1]
     * @return 随机选取的无序整数列表
     */
    public static int[] selectMDisorderedRandInts3(int m, int n)
    {
        checkParams(m, n);
        int[] arr = new int[n];
        for (int i=0; i < n; i++) {
            arr[i] = i;
        }
        for (int k=0; k < m; k++) {
            int j = randRange(k, n);
            int tmp = arr[k];
            arr[k] = arr[j];
            arr[j] = tmp;
        }
        return Arrays.copyOf(arr, m);
    }
    
    public static void checkParams(int m, int n)
    {
        if (m > n || m <= 0 || n <= 0 ) {
            throw new IllegalArgumentException("参数 [" + m + "," + n + "] 错误,必须均为正整数,且第一个参数必须小于或等于第二个参数!");
        }
    }
    
    /**
     * collectionToArray : 将指定整数集合转化为整型数组列表
     * @param collection 指定整数集合
     * @return 要返回的整型数组列表,若给定集合为空,则返回 null
     */
    public static int[] collectionToArray(Collection<Integer> collection)
    {
        if (collection == null || collection.size() == 0) {
            return null;
        }
        int[] result = new int[collection.size()];
        int k = 0;
        for (Integer integer : collection) {
            result[k] = integer;
            k++;
        }
        return result;
    }
    
    /**
     * printArray: 打印数组的便利方法,每打印十个数换行 
     * @param arr 指定要打印的数组
     */
    public static void printArray(int[] arr)
    {
        for (int i=0; i < arr.length; i++) {
            System.out.printf("%d%c", arr[i], i%10==9 ? ‘\n‘ : ‘ ‘);
        }
    }
    
}   

 

  本文原创, 转载请注明出处,谢谢! :)

 

混合使用 ForkJoin, Akka, Future 实现一千万个不重复整数的排序

标签:

原文地址:http://www.cnblogs.com/lovesqcc/p/5540415.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!