标签:memory session sig detail ice replace second date cti
Structured Streaming 是 Spark 2.0 引入的功能,有以下特点
Structured Streaming 可以大大简化代码编写
从 localhost:9999 不断接受数据并统计词量
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession     .builder     .appName("StructuredNetworkWordCount")     .getOrCreate()
	
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark     .readStream     .format("socket")     .option("host", "localhost")     .option("port", 9999)     .load()
# Split the lines into words, name the new column as "word"
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts     .writeStream     .outputMode("complete")     .format("console")     .start()
query.awaitTermination()
先启动 netcat server
nc -lk 9999
再提交程序
spark-submit --master local ./spark_test.py
在 netcat 依次输入下面内容
hello world
apache spark
hello spark
spark 程序输出
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|hello|    1|
|world|    1|
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| hello|    1|
|apache|    1|
| spark|    1|
| world|    1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| hello|    2|
|apache|    1|
| spark|    2|
| world|    1|
+------+-----+
spark 还有不断打出 streaming 信息
20/05/25 23:50:26 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "606f3e4f-72a7-4c9a-b87e-be9f34320e47",
  "runId" : "26ce998c-3f59-475b-95a0-2022f6a7ccc2",
  "name" : null,
  "timestamp" : "2020-05-25T15:50:10.789Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 100.0,
  "processedRowsPerSecond" : 0.06258997308631158,
  "durationMs" : {
    "addBatch" : 15628,
    "getBatch" : 263,
    "getOffset" : 0,
    "queryPlanning" : 31,
    "triggerExecution" : 15977,
    "walCommit" : 39
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 2
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 2,
    "numInputRows" : 1,
    "inputRowsPerSecond" : 100.0,
    "processedRowsPerSecond" : 0.06258997308631158
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@224c4764"
  }
}
可以看到虽然是分批处理,但还是每次都输出了总的统计结果,这是因为指定了 outputMode("complete")
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| hello|    1|
| spark|    1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| hello|    2|
|apache|    1|
| spark|    2|
| world|    1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| hello|    2|
| spark|    2|
+------+-----+
并不是所有的计算都支持这三种模式,具体可以参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
这个过程中 Spark 只维持中间结果,对于增量数据,处理完就会丢弃
Structured Streaming 允许基于数据自身携带的时间信息,通过窗口,进行各种聚合运算
# 假设 words 有两个字段,timestamp 和 word
# 基于 timestamp 使用窗口统计词量
# 窗口大小 10 min,滑动距离是 5 min
# 即在窗口 0~10,5~15,10~20,15~25 内统计词量
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
更具体的例子
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
windowDuration = ‘10 seconds‘
slideDuration = ‘5 seconds‘
spark = SparkSession	.builder	.appName("StructuredNetworkWordCountWindowed")	.getOrCreate()
lines = spark	.readStream	.format(‘socket‘)	.option(‘host‘, host)	.option(‘port‘, port)	.option(‘includeTimestamp‘, ‘true‘)	.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
	explode(split(lines.value, ‘ ‘)).alias(‘word‘),
	lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
	window(words.timestamp, windowDuration, slideDuration),
	words.word
).count().orderBy(‘window‘)
# Start running the query that prints the windowed word counts to the console
query = windowedCounts	.writeStream	.outputMode(‘complete‘)	.format(‘console‘)	.option(‘truncate‘, ‘false‘)	.start()
query.awaitTermination()
基于窗口的运算中,可能会出现延迟数据,即某个窗口已经计算结束后,依然有属于该窗口的数据到来,Spark 通过 Watermarking (水印)指定最多可容忍多久的延迟
# Group the data by window and word and compute the count of each group
windowedCounts = words     .withWatermark("timestamp", "10 minutes")     .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word)     .count()
水印的计算
watermarking = max_event_time_received - threshold
判断是否要丢弃的标准是
watermarking > windown_end_time
假设窗口大小 10 分钟,滑动时间 5 分钟,在当前收到的所有数据中,最新的时间是 31 分,而 threshold 是 10 分,那么当前的 watermarking 就是 31 - 10 = 21,如果此时有一条 14 分的数据到来,这条数据会被丢弃,因为 14 分的数据属于 (5,15)和(10,20)这两个窗口,而这两个窗口的结束时间 15 和 20 都要小于 21
在 Update Mode 中,能被 watermarking 所允许的延迟数据会用于更新已有窗口
在 Append Mode 中,则是等到 watermarking 大于窗口结束时间,才真正计算输出该窗口
只适用于部分 Source 和 Sink
Source 要能支持记录上次读取的位置,即 offset
Sink 要支持幂等性,即同样的操作执行多次不会影响结果
然后 Spark 通过使用 checkpointing 和 write-ahead logs 记录 offset 和中间状态
这样保证了能够进行错误恢复,并且从结果上看,每份数据只被处理一次(exactly-once)
(实际上这也无法保证,因为还取决于用户代码是否也是幂等性,最简单的例子用户代码使用了随机数,那每次执行都不一样,或者用户代码需要去外部的其他地方读取数据,每次读到的可能也不一样)
aggDF     .writeStream     .outputMode("complete")     .option("checkpointLocation", "path/to/HDFS/dir")     .format("memory")     .start()
This checkpoint location has to be a path in an HDFS compatible file system
There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query
spark = SparkSession        .builder        .appName("Test")        .getOrCreate()
# Read text from socket
socketDF = spark     .readStream     .format("socket")     .option("host", "localhost")     .option("port", 9999)     .load()
socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources
socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark     .readStream     .option("sep", ";")     .schema(userSchema)     .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
# Create DataSet representing the stream of input lines from kafka
lines = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", "localhost:9092")    .option("subscribe", "topicA,topicB")    .load()    .selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
    # explode turns each item in an array into a separate row
    explode(
        split(lines.value, ‘ ‘)
    ).alias(‘word‘)
)
还有个 Rate Source(for testing) 不清楚是干啥
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
大多数批处理的操作,在流处理也适用
流数据和批数据的 Join
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  # right outer join with a static DF
流数据和流数据的 Join
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
)
更具体的内容参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations
# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF     .writeStream     .queryName("aggregates")     .outputMode("complete")     .format("memory")     .start()
spark.sql("select * from aggregates").show()   # interactively query in-memory table
queryName 指定的名字就是表名,可以使用 Spark SQL 查询
streamingDf = spark.readStream. ...
# Without watermark using guid column
streamingDf.dropDuplicates("guid")
# With watermark using guid and eventTime columns
streamingDf   .withWatermark("eventTime", "10 seconds")   .dropDuplicates("guid", "eventTime")
包括有水印和没有水印两种情况
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
每种 sink 能支持的 output mode 和 fault tolerant 不一样,具体参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
def foreach_batch_function(micro_batch_df, micro_batch_unique_id):
    # Transform and write batchDF
    pass
  
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   
def process_row(row):
    # Write row to storage
    pass
query = streamingDF.writeStream.foreach(process_row).start()  
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass
    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass
    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
Foreach 和 ForeachBatch 只保证 at-least-once 机制
# Default trigger (runs micro-batch as soon as it can)
# 处理完上一个 micro-batch 后,立刻将所有新增数据作为下一个 micro-batch 处理
df.writeStream   .format("console")   .start()
# ProcessingTime trigger with two-seconds micro-batch interval
# 在 Default 模式基础上
#     1. 如果上一个 micro-batch 在 2 秒内处理完,那会等够 2 秒才触发新的 micro-batch 
#     2. 如果上一个 micro-batch 处理时间超过 2 秒,会立刻触发新的 micro-batch 
#     3. 如果没有新数据,那么即使 2 秒的时间到了,也不会触发新的运算
df.writeStream   .format("console")   .trigger(processingTime=‘2 seconds‘)   .start()
# One-time trigger
# 处理一个 micro-batch 后就停止
df.writeStream   .format("console")   .trigger(once=True)   .start()
# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous=‘1 second‘)
  .start()
Continuous 模式是实验性的
query = df.writeStream.format("console").start()   # get the query object
query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart
query.name()        # get the name of the auto-generated or user-specified name
query.explain()   # print detailed explanations of the query
query.stop()      # stop the query
query.awaitTermination()   # block until query is terminated, with stop() or with error
query.exception()       # the exception if the query has been terminated with error
query.recentProgress()  # an array of the most recent progress updates for this query
query.lastProgress()    # the most recent progress update of this streaming query
可以查看流数据处理的情况
query = ...  # a StreamingQuery
print(query.lastProgress)
‘‘‘
Will print something like the following.
{u‘stateOperators‘: [], u‘eventTime‘: {u‘watermark‘: u‘2016-12-14T18:45:24.873Z‘}, u‘name‘: u‘MyQuery‘, u‘timestamp‘: u‘2016-12-14T18:45:24.873Z‘, u‘processedRowsPerSecond‘: 200.0, u‘inputRowsPerSecond‘: 120.0, u‘numInputRows‘: 10, u‘sources‘: [{u‘description‘: u‘KafkaSource[Subscribe[topic-0]]‘, u‘endOffset‘: {u‘topic-0‘: {u‘1‘: 134, u‘0‘: 534, u‘3‘: 21, u‘2‘: 0, u‘4‘: 115}}, u‘processedRowsPerSecond‘: 200.0, u‘inputRowsPerSecond‘: 120.0, u‘numInputRows‘: 10, u‘startOffset‘: {u‘topic-0‘: {u‘1‘: 1, u‘0‘: 1, u‘3‘: 1, u‘2‘: 0, u‘4‘: 1}}}], u‘durationMs‘: {u‘getOffset‘: 2, u‘triggerExecution‘: 3}, u‘runId‘: u‘88e2ff94-ede0-45a8-b687-6316fbef529a‘, u‘id‘: u‘ce011fdc-8762-4dcb-84eb-a77333e28109‘, u‘sink‘: {u‘description‘: u‘MemorySink‘}}
‘‘‘
print(query.status)
‘‘‘ 
Will print something like the following.
{u‘message‘: u‘Waiting for data to arrive‘, u‘isTriggerActive‘: False, u‘isDataAvailable‘: False}
‘‘‘
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
# 将 metrics 发到配置的 sink
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
用于监控流数据处理的状态
标签:memory session sig detail ice replace second date cti
原文地址:https://www.cnblogs.com/moonlight-lin/p/12989407.html