码迷,mamicode.com
首页 > Web开发 > 详细

Apache Camel分解与聚合

时间:2015-07-31 18:24:17      阅读:929      评论:0      收藏:0      [点我收藏+]

标签:split   aggregate   分解   聚合   camel   

   在使用Camel时,你可能会使用到分解与聚合,例如当你向消息队列发送一个很大的文件的时候,你可能出于文件大小限制或效率的考量,需要将一个文件分解为若干文件包分别发送,在接收到接收到所有文件包后再合并为一个完整的文件。

   分解即将一个消息分解为若干份(消息),然后可以对其进行单独处理,如下图:

技术分享

   要实现分解功能,则需要在路由定义中添加SplitDefinition,也就是要调用ProcessorDefinition.split方法,split方法主要是接收一个Expression对象,org.apache.camel.Expression是一个接口,其中只有一个evaluate方法:


package org.apache.camel;

public interface Expression {

    <T> T evaluate(Exchange exchange, Class<T> type);
}

在调用split方法时,evaluate方法需要返回一个的对象类型有一定的规则(要求),具体的规则是什么,看下面的源码则一目录了然:

public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {

	// if its a message than we want to iterate its body
	if (value instanceof Message) {
		value = ((Message) value).getBody();
	}

	if (value == null) {
		return Collections.emptyList().iterator();
	} else if (value instanceof Iterator) {
		return (Iterator<Object>)value;
	} else if (value instanceof Iterable) {
		return ((Iterable<Object>)value).iterator();
	} else if (value.getClass().isArray()) {
		if (isPrimitiveArrayType(value.getClass())) {
			final Object array = value;
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return (idx + 1) < Array.getLength(array);
				}

				public Object next() {
					idx++;
					return Array.get(array, idx);
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}

			};
		} else {
			List<Object> list = Arrays.asList((Object[]) value);
			return list.iterator();
		}
	} else if (value instanceof NodeList) {
		// lets iterate through DOM results after performing XPaths
		final NodeList nodeList = (NodeList) value;
		return new Iterator<Object>() {
			int idx = -1;

			public boolean hasNext() {
				return (idx + 1) < nodeList.getLength();
			}

			public Object next() {
				idx++;
				return nodeList.item(idx);
			}

			public void remove() {
				throw new UnsupportedOperationException();
			}
		};
	} else if (value instanceof String) {
		final String s = (String) value;

		// this code is optimized to only use a Scanner if needed, eg there is a delimiter

		if (delimiter != null && s.contains(delimiter)) {
			// use a scanner if it contains the delimiter
			Scanner scanner = new Scanner((String)value);

			if (DEFAULT_DELIMITER.equals(delimiter)) {
				// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
				// which may have balanced parentheses pairs as well.
				// if the value contains parentheses we need to balance those, to avoid iterating
				// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
				// the regexp will split by comma, but honor parentheses pair that may include commas
				// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
				// then the regexp will split that into two:
				// -> bean=foo?method=killer(a,b)
				// -> bean=bar?method=great(a,b)
				// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
				delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
			}

			scanner.useDelimiter(delimiter);
			return CastUtils.cast(scanner);
		} else {
			// use a plain iterator that returns the value as is as there are only a single value
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
				}

				public Object next() {
					idx++;
					return s;
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}
			};
		}
	} else {
		return Collections.singletonList(value).iterator();
	}
}

   该方法定义在org.apache.camel.util.ObjectHelper类,由上可知允许的类型有很多,但最终于转换为了一个java.uti.Iterator对象,这样Camel就能通过该迭代器遍历出各个元素对象,然后针对每一个元素对象创建一个Exchange对象,再把元素对象设置到Message的body中,这样就把一个消息分解为了多份。



   聚合,刚好是分解的逆过程,即将根据路由定义路由的多个消息合并为一个消息,如下图:

技术分享
   聚合主要要解决的问题是如何确定哪些消息是要进行聚合的,聚合的过程是怎样的。要实现聚合功能,则需要向路由定义中添加AggregateDefinition,调用ProcessorDefinition.aggregate方法,该方法主要是要提供一个Expression与AggregationStrategy对象,前者用于确定哪些消息需要被聚合,后者用于确定具体的聚合过程如何进行。

   下面提供一个分解与聚合具体的例子,实现一个示例功能,将一个文件根据每一行进行分解,被分解后再进行聚合:

package com.xtayfjpk.camel;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		CamelContext camelContext = new DefaultCamelContext();
		camelContext.addRoutes(new RouteBuilder() {
			
			@Override
			public void configure() throws Exception {
				//轮询指定目录
				this.from("file:H:/temp/in?noop=true")
					//添加SplitDefinition,传入一个自定义的Expression对象
					.split(new ExpressionAdapter() {
					
					@Override
					public Object evaluate(Exchange exchange) {
						//返回一个实现了Iterator接口的类,每一个迭代出来的元素对象创建一个Exchange
						File file = exchange.getIn().getBody(File.class);
						System.out.println(file);
						Scanner scanner = null;
						if(file!=null) {
							try {
								scanner = new Scanner(file);
								//根据行进行分解
								scanner.useDelimiter("\n");
							} catch (FileNotFoundException e) {
								e.printStackTrace();
							}
						}
						return scanner;
					}
				}).process(new Processor() {
					private int count = 0;
					
					public void process(Exchange exchange) throws Exception {
						//往消息中设置关联key的值,在聚合的时候将会使用到
						//如果消息的关联key值是相同的则表示需要进行聚合
						exchange.getIn().setHeader("test_correlation_key", (++count)%2);
						System.out.println("body:" + exchange.getIn().getBody());
					}
				}).aggregate(header("test_correlation_key"), new AggregationStrategy() {
					
					public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
						//如果oldExchange为null,则说明是第一个分解包
						if (oldExchange == null) {
				            return newExchange;
				        }
				 
				        String oldBody = oldExchange.getIn().getBody(String.class);
				        System.out.println("old body:" + oldBody);
				        String newBody = newExchange.getIn().getBody(String.class);
				        System.out.println("new body:" + newBody);
						//将新与旧包进行合并,再设置进Message的body中
				        oldExchange.getIn().setBody(oldBody + "\n" + newBody);
				        return oldExchange;
					}
				}).completionTimeout(5000).process(new Processor() {
					
					public void process(Exchange exchange) throws Exception {
						//示例后续处理,进行输出
						System.out.println("body:" + exchange.getIn().getBody());
						
					}
				});
				
			}
		});
		
		camelContext.start();
		
		Object object = new Object();
		synchronized (object) {
			object.wait();
		}
	}
}

在分解与聚合过程中,与此相关的一些数据会做为Exchange的属性进行设置,如分解序号,分解是否完成,聚合序列等,详细内容可参看Camel官方文档:
http://camel.apache.org/splitter.html
http://camel.apache.org/aggregator.html

版权声明:本文为博主原创文章,未经博主允许不得转载。

Apache Camel分解与聚合

标签:split   aggregate   分解   聚合   camel   

原文地址:http://blog.csdn.net/xtayfjpk/article/details/47173089

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!