标签:storm
上一篇写了 storm运行异常之No output fields defined for component:stream XxxBolt:null 发现是多线程导致的,但是也有可能是其他原因,今天就来追踪一下。
错误log:
Caused by: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:null
at backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:113) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:53) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.executor$mk_task_receiver$fn__4244.invoke(executor.clj:397) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
... 6 common frames omitted/**
* Gets the declared output fields for the specified component/stream.
*/
public Fields getComponentOutputFields(String componentId, String streamId) {
Fields ret = _componentToStreamToFields.get(componentId).get(streamId);
if(ret==null) {
throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
}
return ret;
}继续观察错误stack,发现是executor.clj 的 mk_task_receiver 调用出错。来看看这个方法:
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
(fast-list-iter [[task-id msg] tuple-batch]
(let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
(when debug? (log-message "Processing received message " tuple))
(if task-id
(tuple-action-fn task-id tuple)
;; null task ids are broadcast tuples
(fast-list-iter [task-id task-ids]
(tuple-action-fn task-id tuple)
))
))))))这里是对Tuple发序列化过程,实例一个TupleImpl,会调用其构造函数:
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
this.values = values;
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
String componentId = context.getComponentId(taskId);
Fields schema = context.getComponentOutputFields(componentId, streamId);
if(values.size()!=schema.size()) {
throw new IllegalArgumentException(
"Tuple created with wrong number of fields. " +
"Expected " + schema.size() + " fields but got " +
values.size() + " fields");
}
}
那么这个StreamId是从什么时候传进来的呐??
storm是像spark一样,使用DAG引擎的,关于DAG引擎的优缺点,请看 DAG (directed acyclic graph) 作为大数据执行引擎的优点
DAG就是一个有向图,在createTopology时就创建好了,具体请看
1、我们一般用TopologyBuilder来构建topology,每次setBolt时,都会把指定group方式,grouping里面就保留当前bolt接收上游bolt的streamId
private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {
_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
return this;
}public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
ComponentCommon common = getComponentCommon(boltId, bolt);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));
}
for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));
}
return new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());
}(defn mk-executor [worker executor-id]
(let [executor-data (mk-executor-data worker executor-id);; mk-executor-data
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
(map (fn [t] [t (task/mk-task executor-data t)]))
(into {})
(HashMap.))
_ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
report-error-and-die (:report-error-and-die executor-data)
component-id (:component-id executor-data)
;; starting the batch-transfer->worker ensures that anything publishing to that queue
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas)) ;;这里会调用mk-threads:spout和mk-thread:bolt来创建thread
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)let [executor-data (mk-executor-data worker executor-id);; mk-executor-data
在mk-executor-data方法里有调用mk-grouper的方法的方法,在下面代码的第37行
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
task-ids (executor-id->tasks executor-id)
component-id (.getComponentId worker-context (first task-ids))
storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
executor-type (executor-type worker-context component-id)
batch-transfer->worker (disruptor/disruptor-queue
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
:claim-strategy :single-threaded
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
]
(recursive-map
:worker worker
:worker-context worker-context
:executor-id executor-id
:task-ids task-ids
:component-id component-id
:open-or-prepare-was-called? (atom false)
:storm-conf storm-conf
:receive-queue ((:executor-receive-queue-map worker) executor-id)
:storm-id (:storm-id worker)
:conf (:conf worker)
:shared-executor-data (HashMap.)
:storm-active-atom (:storm-active-atom worker)
:batch-transfer-queue batch-transfer->worker
:transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
:suicide-fn (:suicide-fn worker)
:storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
:interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
;; outbound-components方法里outbound-groupings的会调用mk-grouper方法
;; mk-grouper method doc => Returns a function that returns a vector of which task indices to send tuple to, or just a single task index.
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
:report-error-and-die (fn [error]
((:report-error <>) error)
((:suicide-fn <>)))
:deserializer (KryoTupleDeserializer. storm-conf worker-context)
:sampler (mk-stats-sampler storm-conf)
;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
)))
回来再看错误堆栈信息
java.lang.RuntimeException: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:null
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:96) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:81) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.executor$fn__4321$fn__4333$fn__4380.invoke(executor.clj:747) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:457) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
MutableObject mo = _buffer.get(curr);
Object o = mo.o;
mo.setObject(null);
if(o==FLUSH_CACHE) {
Object c = null;
while(true) {
c = _cache.poll();
if(c==null) break;
else handler.onEvent(c, curr, true);
}
} else if(o==INTERRUPT) {
throw new InterruptedException("Disruptor processing interrupted");
} else {
handler.onEvent(o, curr, curr == cursor);
}
} catch (Exception e) {
// 这里抛出的,引起这个异常的地方是上面handler.onEvent()方法
throw new RuntimeException(e);
}
}
//TODO: only set this if the consumer cursor has changed?
_consumer.set(cursor);
}(defmethod mk-threads :bolt [executor-data task-datas]
(let [execute-sampler (mk-stats-sampler (:storm-conf executor-data))
executor-stats (:stats executor-data)
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
;; buffer other tuples until fully synchronized, then process all of those tuples
;; then go into normal loop
;; spill to disk?
;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
;; or just timeout the sync messages that are coming in until full sync is hit from that task
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
(let [task-data (get task-datas task-id)
^IBolt bolt-obj (:object task-data)
user-context (:user-context task-data)
sampler? (sampler)
execute-sampler? (execute-sampler)
now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
(when sampler?
(.setProcessSampleStartTime tuple now))
(when execute-sampler?
(.setExecuteSampleStartTime tuple now))
(.execute bolt-obj tuple)
(let [delta (tuple-execute-time-delta! tuple)]
(task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
(when delta
(builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)
(stats/bolt-execute-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)))))))]
;; TODO: can get any SubscribedState objects out of the context now
[(async-loop
(fn []
;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
(while (not @(:storm-active-atom executor-data))
(Thread/sleep 100))
(log-message "Preparing bolt " component-id ":" (keys task-datas))
(doseq [[task-id task-data] task-datas
:let [^IBolt bolt-obj (:object task-data)
tasks-fn (:tasks-fn task-data)
user-context (:user-context task-data)
bolt-emit (fn [stream anchors values task]
(let [out-tasks (if task
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
(let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
(let [edge-id (MessageId/generateId rand)]
(.updateAckVal a edge-id)
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
(transfer-fn t
(TupleImpl. worker-context
values
task-id
stream
(MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)
:transfer (:transfer-queue (:worker executor-data))}
storm-conf user-context)
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)}
storm-conf user-context)
)
(.prepare bolt-obj
storm-conf
user-context
(OutputCollector.
(reify IOutputCollector
(emit [this stream anchors values]
(bolt-emit stream anchors values nil))
(emitDirect [this task stream anchors values]
(bolt-emit stream anchors values task))
(^void ack [this ^Tuple tuple]
(let [^TupleImpl tuple tuple
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)])
))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
(builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
ACKER-FAIL-STREAM-ID
[root]))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
(builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple))
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
(reportError [this error]
(report-error error)
)))))
(reset! open-or-prepare-was-called? true)
(log-message "Prepared bolt " component-id ":" (keys task-datas))
(setup-metrics! executor-data)
(let [receive-queue (:receive-queue executor-data)
;; 这里创建event-handler供disruptorQueue来调用
event-handler (mk-task-receiver executor-data tuple-action-fn)]
(disruptor/consumer-started! receive-queue)
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler);;这里开始使用event-handler
0)))
:kill-fn (:report-error-and-die executor-data)
:factory? true
:thread-name component-id)]))====================================
其实追到这里我还是没有找出为什么会报这个异常的原因,哪位大牛如果知道,请留言,或e-mail(joey.wen@outlook.com)告知,I will appreciate that
storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪
标签:storm
原文地址:http://blog.csdn.net/wzhg0508/article/details/42487545