码迷,mamicode.com
首页 > 其他好文 > 详细

No output fields defined for component:xxx::defaul

时间:2015-04-26 12:31:31      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:

        学习jstorm过程中,碰到一问题:


 ERROR com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent  - Failed Sync Process

java.lang.IllegalArgumentException: No output fields defined for component:stream wordNormalizer_componentId:default

at backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:114)

at backtype.storm.task.TopologyContext.getThisOutputFields(TopologyContext.java:157)

at com.alibaba.jstorm.cluster.Common.outbound_components(Common.java:600)

at com.alibaba.jstorm.task.Task.makeSendTargets(Task.java:133)

at com.alibaba.jstorm.task.Task.echoToSystemBolt(Task.java:162)

at com.alibaba.jstorm.task.Task.execute(Task.java:244)

at com.alibaba.jstorm.task.Task.mk_task(Task.java:289)

at com.alibaba.jstorm.daemon.worker.Worker.createTasks(Worker.java:123)

at com.alibaba.jstorm.daemon.worker.Worker.execute(Worker.java:218)

at com.alibaba.jstorm.daemon.worker.Worker.mk_worker(Worker.java:258)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.launchWorker(SyncProcessEvent.java:402)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.startNewWorkers(SyncProcessEvent.java:828)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.run(SyncProcessEvent.java:157)

at com.alibaba.jstorm.event.EventManagerImpExecute.run(EventManagerImpExecute.java:38)

at java.lang.Thread.run(Thread.java:745)


    最后折腾了一下,改变了一下数据流分组就不报这个错误了。

情况如下:

1. spout产生的数据流带了streamId,

即:

collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
	
	declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

   2.bolt产生的数据流也定义了入spout的数据流定义。

3. 拓扑数据流流向配置:

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
						Fields(Chapter2CommonConstant.wordProducer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
						Fields(Chapter2CommonConstant.wordNormalizer_fields));


这种情况下会报本文给出的异常。

解决办法:

3中拓扑数据流流向配置改为:就不会出现异常。

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

完整代码:

package com.doctor.ebook.getting_started_with_storm;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:59:20
 */
public final class Chapter2CommonConstant {

	public static final String wordProducer_componentId = "wordProducer_componentId";
	public static final String wordProducer_streamId = "wordProducer_streamId";
	public static final String wordProducer_fields = "wordProducer_randomString";

	public static final String wordNormalizer_componentId = "wordNormalizer_componentId";
	public static final String wordNormalizer_streamId = "wordNormalizer_streamId";
	public static final String wordNormalizer_fields = "wordNormalizer_fields";

	public static final String wordCounter_componentId = "wordCounter_componentId";
}

package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.RandomStringUtils;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichSpout;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:39:21
 */
public class WordProducerSpout extends ContextBaseRichSpout {
	private static final long serialVersionUID = -930888930597360858L;
	private String content = "A spout emits a list of defined fields. This architecture allows you to have" +
			"different kinds of bolts reading the same spout stream, which can then" +
			"define fields for other bolts to consume and so on";

	/**
	 * open is the first method called in any spout.
	 * 
	 * The parameters it receives are the TopologyContext, which contains all our topology data; the conf object, which is created
	 * in the topology definition; and the SpoutOutputCollector, which enables us to emit the data that will be processed by the
	 * bolts.
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		super.open(conf, context, collector);
	}

	/**
	 * from this method, we’ll emit values to be processed by the bolts.
	 */
	@Override
	public void nextTuple() {
		String random = RandomStringUtils.random(6, content);
		try {
			TimeUnit.SECONDS.sleep(1);
			collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
			log.info("WordProducerSpout:" + random);
		} catch (InterruptedException e) {
			log.error("TimeUnit.SECONDS.sleep.error", e);
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

	}

}

package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:14:27
 */
public class WordNormalizerBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = -1244951787400604294L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
	}

	/**
	 * The bolt will receive the line from the words file and process it to Normalize this line
	 * 
	 * The normalize will be put the words in lower case
	 */
	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordProducer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordProducer_streamId.equals(input.getSourceStreamId())) {
			String field = input.getStringByField(Chapter2CommonConstant.wordProducer_fields);
			log.info("WordNormalizer.execute:" + field);
			field = field.toLowerCase();
			collector.emit(Chapter2CommonConstant.wordNormalizer_streamId, new Values(field));
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordNormalizer_streamId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));

	}

}

package com.doctor.ebook.getting_started_with_storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:35:05
 */
public class WordCounterBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = 8157872805076023917L;
	private Map<String, Integer> counters;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
		counters = new HashMap<>();
	}

	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordNormalizer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordNormalizer_streamId.equals(input.getSourceStreamId())) {

			String field = input.getStringByField(Chapter2CommonConstant.wordNormalizer_fields);
			if (counters.containsKey(field)) {
				Integer num = counters.get(field);
				counters.put(field, num + 1);
				log.info("WordCounterBolt.execute:" + field + ":" + num + 1);
			} else {
				counters.put(field, 1);
				log.info("WordCounterBolt.execute:" + field + ":" + 1);

			}
		}

	}

	@Override
	public void cleanup() {
		counters.clear();
		super.cleanup();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

}

package com.doctor.ebook.getting_started_with_storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:34:14
 */
public class Chapter2TopologyMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout(Chapter2CommonConstant.wordProducer_componentId, new WordProducerSpout(), 1);

		// topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
		// Fields(Chapter2CommonConstant.wordProducer_fields));
		//
		// topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
		// Fields(Chapter2CommonConstant.wordNormalizer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

		LocalCluster localCluster = new LocalCluster();

		Config conf = new Config();
		conf.setDebug(true);
		conf.setNumWorkers(1);
		localCluster.submitTopology("Chapter2TopologyMain", conf, topologyBuilder.createTopology());

	}

}

注释掉的数据流配置会出现本文给出的异常。

这个由于对jstorm源码没研究,也不知道是怎么回事。

No output fields defined for component:xxx::defaul

标签:

原文地址:http://my.oschina.net/doctor2014/blog/406431

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!