码迷,mamicode.com
首页 > 其他好文 > 详细

066 基于checkpoint的HA机制实现

时间:2018-08-12 21:28:08      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:rgs   min   contex   通过   对象   class   场景   storage   nbsp   

1.说明

  针对需要恢复的应用场景,提供了HA的的机制

  内部实现原理:基于checkpoint的

  当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

 

2.注意点

  SparkStreaming 的HA和updateStateByKey来记录历史数据的API不能一起使用

 

二:程序

1.程序

 1 package com.stream.it
 2 
 3 import kafka.serializer.StringDecoder
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.kafka.KafkaUtils
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 
 9 object HAKafkaWordcount {
10   def main(args: Array[String]): Unit = {
11     val conf=new SparkConf()
12         .setAppName("spark-streaming-wordcount")
13           .setMaster("local[*]")
14     val sc=SparkContext.getOrCreate(conf)
15     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02"
16 
17 
18     /**
19       * 构造StreamingContext对象
20       *
21       * @return
22       */
23     def createStreamingContextFunc(): StreamingContext = {
24       val ssc = new StreamingContext(sc, Seconds(5))
25       ssc.checkpoint(checkpointDir)
26       val kafkaParams=Map("group.id"->"stream-sparking-0",
27         "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
28         "auto.offset.reset"->"smallest"
29       )
30       val topics=Map("beifeng"->1)
31       val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
32         ssc,             //给定sparkStreaming的上下文
33         kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接
34         topics,          //给定读取对应的topic的名称以及读取数据的线程数量
35         StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别
36       ).map(_._2)
37 
38 
39       val resultWordcount=dStream
40         .filter(line=>line.nonEmpty)
41         .flatMap(line=>line.split(" ").map((_,1)))
42         .reduceByKey(_+_)
43       resultWordcount.foreachRDD(rdd=>{
44         rdd.foreachPartition(iter=>iter.foreach(println))
45       })
46       ssc
47     }
48 
49     val ssc = StreamingContext.getOrCreate(
50       checkpointPath = checkpointDir,
51       creatingFunc = createStreamingContextFunc
52     )
53 
54     //启动
55     ssc.start()
56     //等到
57     ssc.awaitTermination()
58   }
59 }

 

2.注意点

  HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

066 基于checkpoint的HA机制实现

标签:rgs   min   contex   通过   对象   class   场景   storage   nbsp   

原文地址:https://www.cnblogs.com/juncaoit/p/9464277.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!