码迷,mamicode.com
首页 > 其他好文 > 详细

利用mapreduce清洗日志

时间:2014-08-24 12:50:43      阅读:295      评论:0      收藏:0      [点我收藏+]

标签:des   blog   java   os   io   for   ar   art   div   

package com.libc;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Process {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, Text> {
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			// TODO Auto-generated method stub
			String datas = "";
			try {
				datas = new String(value.getBytes(), 0, value.getLength(),
						"GBK");
			} catch (UnsupportedEncodingException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			// datas = value.toString();
			try {

				String[] split = datas.split(" time=");

				// 处理头中包含空格的字段
				Pattern p = Pattern.compile("phonemodel=\"(.*?)\"");
				String pm = getIndex(split[0], p);
				split[0] = split[0].replaceAll(pm, pm.replace(" ", ""));
				Pattern p1 = Pattern.compile("networktype=\"(.*?)\"");
				String nt = getIndex(split[0], p1);
				split[0] = split[0].replaceAll(nt, nt.replace(" ", ""));
				for (int i = 1; i < split.length; i++) {
					String[] codes = split[i].split(" ", 4);
					int headLen = split[0].split(" ").length;
					if (headLen != 20) {
						// 丢掉错误日志
						continue;
					}
					// 处理旧版本日志判别标准:|
					if (codes[2].equals("code=\"100\"")){
						if(codes[3].indexOf("contact_name")>-1){
							codes[3] = process100(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);
						
					}else if(codes[2].equals("code=\"101\"") ){
						if(codes[3].indexOf("message_to_")>-1){
							codes[3] = process101(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);
					}
					else if(codes[2].equals("code=\"102\"")){
						if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){
							codes[3] = process102(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);
						
					}else{
						codes[3] = codes[3].replace("  ", " ");
					}
						
					String collect = split[0] + " time=" + codes[0] + " "
							+ codes[1] + " " + codes[2] + " " + codes[3];
					word.set(collect);

					context.write(word, new Text(""));
				}

			} catch (Exception e) {
				// TODO Auto-generated catch block
			}
		}
	}

	public static String process100(String code) throws Exception{
		String[] codes = code.split(" ");
		HashMap<String, Contact> hs = new HashMap<String, Process.Contact>();
		Pattern p0 = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)\"");
		for (int i = 0; i < codes.length; i++) {
			if (codes[i].equals(""))
				continue;
			String index = getIndex(codes[i], p0);
			if (index == null)
				continue;
			String value = getIndex(codes[i], p1);
			Contact contact = null;
			if (hs.containsKey(index)) {
				contact = hs.get(index);
			} else {
				contact = new Contact();
			}
			if (codes[i].startsWith("contact_name_")) {
				contact.contactName = value;
			} else if (codes[i].startsWith("contact_num_")) {
				contact.contactNum = value;
			}
			contact.index = index;
			hs.put(index, contact);
		}

		return printToString(hs);
	}

	public static String process101(String code) throws Exception{
		String[] codes = code.split("\"  ");
		HashMap<String, Message> hs = new HashMap<String, Process.Message>();
		Pattern p = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)");
		for (int i = 0; i < codes.length; i++) {
			String index = getIndex(codes[i], p);
			String value = getIndex(codes[i], p1);
			if (index == null)
				continue;
			Message message = null;
			if (hs.containsKey(index)) {
				message = hs.get(index);
			} else {
				message = new Message();
			}
			if (codes[i].startsWith("message_time_")) {
				message.messageTime = value;
			} else if (codes[i].startsWith("message_to_")) {
				message.messageTo = value;
			}
			message.index = index;
			hs.put(index, message);
		}

		return printToString(hs);
	}

	public static String process102(String code) throws Exception{
		String[] codes = code.split("\"  ");
		HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>();
		Pattern p = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)");
		for (int i = 0; i < codes.length; i++) {
			String index = getIndex(codes[i], p);
			if (index == null)
				continue;
			String value = getIndex(codes[i], p1);
			CallLog callLog = null;
			if (hs.containsKey(index)) {
				callLog = hs.get(index);
			} else {
				callLog = new CallLog();
			}
			if (codes[i].startsWith("caller_date_")) {
				callLog.callerDate = value;
			} else if (codes[i].startsWith("caller_duration_")) {
				callLog.callerDuration = value;
			} else if (codes[i].startsWith("caller_name_")) {
				callLog.callerName = value;
			} else if (codes[i].startsWith("caller_num_")) {
				callLog.callerNum = value;
			}
			callLog.index = index;
			hs.put(index, callLog);
		}

		return printToString(hs);
	}

	public static String printToString(Map hs) {
		Set set = hs.keySet();
		Iterator<String> it = set.iterator();
		String result = "";
		while (it.hasNext()) {
			result = result + hs.get(it.next()).toString() + "|";
		}
		return result;
	}

	public static String getIndex(String code, Pattern p) {
		String index = null;

		Matcher matcher = p.matcher(code);
		if (matcher.find()) {
			index = matcher.group(1);
		}
		return index;
	}

	public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {

		public void reduce(Text key, Text rr, Context context)
				throws IOException, InterruptedException {
			context.write(key, new Text(""));
		}
	}

	public static class Contact {

		public String index;
		public String contactName;
		public String contactNum;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "contact_" + index + "=" + this.contactName + ";"
					+ this.contactNum;
		}
	}

	public static class Message {
		public String index;
		public String messageTime;
		public String messageTo;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "message_" + this.index + "=" + this.messageTo + ";"
					+ this.messageTime;
		}
	}

	public static class CallLog {
		public String index;
		public String callerDuration;
		public String callerNum;
		public String callerName;
		public String callerDate;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "callLog_" + this.index + "=" + this.callerName + ";"
					+ this.callerNum + ";" + this.callerDate + ";"
					+ this.callerDuration;
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: process <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "process");
		job.setJarByClass(Process.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

  此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,

因此修改为第二版后可以对jvm内存自定义配置

利用mapreduce清洗日志

标签:des   blog   java   os   io   for   ar   art   div   

原文地址:http://www.cnblogs.com/charlie-badegg/p/3932626.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!