标签:storm trident trident api trident api 实践 storm trident api 详解
public static class PerActorTweetsFilter extends BaseFilter {
String actor;
public PerActorTweetsFilter(String actor) {
this.actor = actor;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return tuple.getString(0).equals(actor);
}
} Topology: topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.each(new Fields("actor", "text"), new Utils.PrintFilter()); 从上面例子看到,each()方法有一些构造参数public static class UppercaseFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
collector.emit(new Values(tuple.getString(0).toUpperCase()));
}
} Topology:topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
.each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter()); 首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。 java mystream.project(new Fields("b", "d")) 则输出的流仅包含 [“b”, “d”]字段。public static class PerActorTweetsFilter extends BaseFilter {
private int partitionIndex;
private String actor;
public PerActorTweetsFilter(String actor) {
this.actor = actor;
}
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionIndex = context.getPartitionIndex();
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean filter = tuple.getString(0).equals(actor);
if(filter) {
System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);
}
return filter;
}
}Topology:topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter()); 如果我们指定执行Filter任务的线程数量为5,那么最终的执行结果会如何呢?看一下我们的测试结果:I am partition [4] and I have kept a tweet by: dave I am partition [3] and I have kept a tweet by: dave I am partition [0] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [1] and I have kept a tweet by: dave我们可以很清楚的发现,一共有5个线程在执行Filter。
topology.newStream("spout", spout)
.parallelismHint(2)
.shuffle()
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter());
I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave测试结果正如我们上面描述的那样,相同字段的tuple被route到了同一个partition中。
public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {
@Override
public Map<String, Integer> init(Object batchId, TridentCollector collector) {
return new HashMap<String, Integer>();
}
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
String location = tuple.getString(0);
val.put(location, MapUtils.getInteger(val, location, 0) + 1);
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
collector.emit(new Values(val));
}
}topology.newStream("spout", spout)
.aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
.each(new Fields("location_counts"), new Utils.PrintFilter());
这个aggregator很简单:计算每一个batch的location的数量。通过这个例子我们可以看到Aggregator接口:[{USA=3, Spain=1, UK=1}]
[{USA=3, Spain=2}]
[{France=1, USA=4}]
[{USA=4, Spain=1}]
[{USA=5}] 我们可以看到打印的结果,其中每一条的和都是5,这是因为我们的Spout的每个batch中tuple数量设置的是5,所以每个线程的计算结果也会是5。 除此之外,Trident还提供了其它两个Aggregator接口: CombinerAggregator, ReducerAggregator,具体使用方法请参考Trident API。如果我们将上面的Topology稍微改造一下,猜一下结果会是如何?
topology.newStream("spout", spout)
.partitionBy(new Fields("location"))
.partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
.parallelismHint(3)
.each(new Fields("location_counts"), new Utils.PrintFilter());
我们一起来分析一下,首先partitionBy()方法将tuples按其location字段重定向到下一处理逻辑,而且相同location字段的tuple一定会被分配到同一个线程中处理。其次,partitionAggregate()方法,注意它与Aggregate不同,它不是一个重定向方法,它仅仅是对当前partition上的各个batch执行聚合操作。因为我们根据location进行了重定向操作,测试数据一共有4个location,而当前一共有3个partition,因此可以猜测我们的最终测试结果中,有一个partition会处理两个location的batch,最终测试结果如下:[{France=10, Spain=5}]
[{USA=63}]
[{UK=22}] 需要注意的是,partitionAggregate虽然也是聚合操作,但与上面的Aggregate完全不同,它不是一个重定向操作。topology.newStream("spout", spout)
.groupBy(new Fields("location"))
.aggregate(new Fields("location"), new Count(), new Fields("count"))
.each(new Fields("location", "count"), new Utils.PrintFilter()); 我们先看一下执行的结果:... [France, 25] [UK, 2] [USA, 25] [Spain, 44] [France, 26] [UK, 3] ...上面这段代码计算出了每个location的数量,即使我们的Count函数没有指定并行度。这就是groupBy()起的作用,它会根据指定的字段创建一个GroupedStream,相同字段的tuple都会被重定向到一起,汇聚成一个group。groupBy()之后是aggregate,与之前的聚合整个batch不同,此时的aggregate会单独聚合每个group。我们也可以这么认为,groupBy会把Stream按照指定字段分成一个个stream group,每个group就像一个batch一样被处理。
不过需要注意的是,groupBy()本身并不是一个重定向操作,但如果它后面跟的是aggregator的话就是,跟的是partitionAggregate的话就不是。
作者:suifeng3051
原文地址:http://blog.csdn.net/suifeng3051/article/details/41118721
标签:storm trident trident api trident api 实践 storm trident api 详解
原文地址:http://blog.csdn.net/suifeng3051/article/details/41118721