标签:
学习jstorm过程中,碰到一问题:
ERROR com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent - Failed Sync Process
java.lang.IllegalArgumentException: No output fields defined for component:stream wordNormalizer_componentId:default
at backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:114)
at backtype.storm.task.TopologyContext.getThisOutputFields(TopologyContext.java:157)
at com.alibaba.jstorm.cluster.Common.outbound_components(Common.java:600)
at com.alibaba.jstorm.task.Task.makeSendTargets(Task.java:133)
at com.alibaba.jstorm.task.Task.echoToSystemBolt(Task.java:162)
at com.alibaba.jstorm.task.Task.execute(Task.java:244)
at com.alibaba.jstorm.task.Task.mk_task(Task.java:289)
at com.alibaba.jstorm.daemon.worker.Worker.createTasks(Worker.java:123)
at com.alibaba.jstorm.daemon.worker.Worker.execute(Worker.java:218)
at com.alibaba.jstorm.daemon.worker.Worker.mk_worker(Worker.java:258)
at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.launchWorker(SyncProcessEvent.java:402)
at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.startNewWorkers(SyncProcessEvent.java:828)
at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.run(SyncProcessEvent.java:157)
at com.alibaba.jstorm.event.EventManagerImpExecute.run(EventManagerImpExecute.java:38)
at java.lang.Thread.run(Thread.java:745)
最后折腾了一下,改变了一下数据流分组就不报这个错误了。
情况如下:
1. spout产生的数据流带了streamId,
即:
collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random)); declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));
2.bolt产生的数据流也定义了入spout的数据流定义。
3. 拓扑数据流流向配置:
topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1) .fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new Fields(Chapter2CommonConstant.wordProducer_fields)); topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1) .fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));
这种情况下会报本文给出的异常。
解决办法:
3中拓扑数据流流向配置改为:就不会出现异常。
topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1) .shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId); topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1) .shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);
完整代码:
package com.doctor.ebook.getting_started_with_storm;
/**
* @author doctor
*
* @time 2015年4月25日 下午10:59:20
*/
public final class Chapter2CommonConstant {
public static final String wordProducer_componentId = "wordProducer_componentId";
public static final String wordProducer_streamId = "wordProducer_streamId";
public static final String wordProducer_fields = "wordProducer_randomString";
public static final String wordNormalizer_componentId = "wordNormalizer_componentId";
public static final String wordNormalizer_streamId = "wordNormalizer_streamId";
public static final String wordNormalizer_fields = "wordNormalizer_fields";
public static final String wordCounter_componentId = "wordCounter_componentId";
}
package com.doctor.ebook.getting_started_with_storm;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.RandomStringUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.doctor.common.ContextBaseRichSpout;
/**
* @author doctor
*
* @time 2015年4月25日 下午10:39:21
*/
public class WordProducerSpout extends ContextBaseRichSpout {
private static final long serialVersionUID = -930888930597360858L;
private String content = "A spout emits a list of defined fields. This architecture allows you to have" +
"different kinds of bolts reading the same spout stream, which can then" +
"define fields for other bolts to consume and so on";
/**
* open is the first method called in any spout.
*
* The parameters it receives are the TopologyContext, which contains all our topology data; the conf object, which is created
* in the topology definition; and the SpoutOutputCollector, which enables us to emit the data that will be processed by the
* bolts.
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
super.open(conf, context, collector);
}
/**
* from this method, we’ll emit values to be processed by the bolts.
*/
@Override
public void nextTuple() {
String random = RandomStringUtils.random(6, content);
try {
TimeUnit.SECONDS.sleep(1);
collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
log.info("WordProducerSpout:" + random);
} catch (InterruptedException e) {
log.error("TimeUnit.SECONDS.sleep.error", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));
}
}
package com.doctor.ebook.getting_started_with_storm;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.doctor.common.ContextBaseRichBolt;
/**
* @author doctor
*
* @time 2015年4月25日 下午11:14:27
*/
public class WordNormalizerBolt extends ContextBaseRichBolt {
private static final long serialVersionUID = -1244951787400604294L;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
}
/**
* The bolt will receive the line from the words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
*/
@Override
public void execute(Tuple input) {
if (Chapter2CommonConstant.wordProducer_componentId.equals(input.getSourceComponent()) &&
Chapter2CommonConstant.wordProducer_streamId.equals(input.getSourceStreamId())) {
String field = input.getStringByField(Chapter2CommonConstant.wordProducer_fields);
log.info("WordNormalizer.execute:" + field);
field = field.toLowerCase();
collector.emit(Chapter2CommonConstant.wordNormalizer_streamId, new Values(field));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Chapter2CommonConstant.wordNormalizer_streamId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));
}
}
package com.doctor.ebook.getting_started_with_storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.doctor.common.ContextBaseRichBolt;
/**
* @author doctor
*
* @time 2015年4月25日 下午11:35:05
*/
public class WordCounterBolt extends ContextBaseRichBolt {
private static final long serialVersionUID = 8157872805076023917L;
private Map<String, Integer> counters;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
counters = new HashMap<>();
}
@Override
public void execute(Tuple input) {
if (Chapter2CommonConstant.wordNormalizer_componentId.equals(input.getSourceComponent()) &&
Chapter2CommonConstant.wordNormalizer_streamId.equals(input.getSourceStreamId())) {
String field = input.getStringByField(Chapter2CommonConstant.wordNormalizer_fields);
if (counters.containsKey(field)) {
Integer num = counters.get(field);
counters.put(field, num + 1);
log.info("WordCounterBolt.execute:" + field + ":" + num + 1);
} else {
counters.put(field, 1);
log.info("WordCounterBolt.execute:" + field + ":" + 1);
}
}
}
@Override
public void cleanup() {
counters.clear();
super.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
package com.doctor.ebook.getting_started_with_storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
/**
* @author doctor
*
* @time 2015年4月25日 下午10:34:14
*/
public class Chapter2TopologyMain {
/**
* @param args
*/
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(Chapter2CommonConstant.wordProducer_componentId, new WordProducerSpout(), 1);
// topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
// .fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
// Fields(Chapter2CommonConstant.wordProducer_fields));
//
// topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
// .fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
// Fields(Chapter2CommonConstant.wordNormalizer_fields));
topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);
topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);
LocalCluster localCluster = new LocalCluster();
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
localCluster.submitTopology("Chapter2TopologyMain", conf, topologyBuilder.createTopology());
}
}
注释掉的数据流配置会出现本文给出的异常。
这个由于对jstorm源码没研究,也不知道是怎么回事。
No output fields defined for component:xxx::defaul
标签:
原文地址:http://my.oschina.net/doctor2014/blog/406431