码迷,mamicode.com
首页 > 其他好文 > 详细

大数据培训<一> Avro

时间:2015-09-22 16:50:12      阅读:245      评论:0      收藏:0      [点我收藏+]

标签:

由于最近在整理公司的培训事情,首先培训的就是Avro,故这里做一个记录

一、介绍,直接看官网来得快

官方网站:http://avro.apache.org/

1.1、Maven项目构建pom示例

所需要的jar包

 <!-- 测试类 -->
             <dependency>
                   <groupId> junit</groupId >
                   <artifactId> junit</artifactId >
                   <version> 4.12</version >
             </dependency>
             <!-- 序列化需要的jar -->
             <dependency>
                   <groupId> org.apache.avro</groupId >
                   <artifactId> avro</artifactId >
                   <version> 1.7.7</version >
             </dependency>
             <!-- rpc 通讯需要的jar -->
             <dependency>
                   <groupId> org.apache.avro</groupId >
                   <artifactId> avro-ipc</artifactId>
                   <version> 1.7.7</version >
             </dependency>

所需要的插件,如果是在外部生成,可以不要

 <plugin >
    <groupId > org.apache.avro</ groupId >
    <artifactId > avro-maven-plugin</ artifactId >
    <version > 1.7.7</ version >
    <executions >
          <execution>
                <phase> generate-sources</phase >
                <goals>
                      <!-- Schema序列化 -->
                      <!-- <goal>schema</goal> -->
                      <!-- RPC协议通讯 -->
                      <goal> protocol</goal >
                </goals>
                <configuration>
                      <!-- 源目录,用于存放 avro的schema文件及protocol文件 -->
                      <sourceDirectory> ${project.basedir}/src/main/resources/</ sourceDirectory >
                      <outputDirectory> ${project.basedir}/src/main/java/</ outputDirectory >
                </configuration>
          </execution>
    </executions >
 </plugin >

二、消息结构

Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。

Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。

2.1、基本数据类型

null: 表示没有值 0字节

boolean: 表示一个二进制布尔值 一个字节 0-false,1-true

int: 表示32位有符号整数
long: 表示64位有符号整数
float: 表示32位的单精度浮点数(IEEE 754)4字节
double: 表示64位双精度浮点数(IEEE 754) 8字节
bytes: 表示8位的无符号字节序列
string: Unicode 编码的字符序列

总共就这8种原生数据类型,这些原生数据类型均没有明确的属性。

原生数据类型也可以使用JSON定义类型名称,比如schema "string"和{"type": "string"}是同义且相等的。

2.2、复合数据类型

2.2.1、records使用类型名称“record”,并且支持三个必选属性。
type: 必有属性。

name: 必有属性,是一个JSON string,提供了记录的名字。

namespace,也是一个JSON string,用来限定和修饰name属性,动态方式生成后,为Java的包名

doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。

aliases: 可选属性,是JSON的一个string数组,为这条记录提供别名。

fields: 必选属性,是一个JSON数组,数组中列举了所有的field。每一个field都是一个JSON对象,并且具有如下属性:

     name: 必选属性,field的名字,是一个JSON string,类似一个Java属性。

     doc: 可选属性,为使用此Schema的用户提供了描述此field的文档。

     type: 必选属性,定义Schema的一个JSON对象,或者是命名一条记录定义的JSON string。

     default: 可选属性,即field的默认值,当读到缺少这个field的实例时用到。

例如:

{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
]
}

说明:以上表示建立了一个复合类型User,具有三个字段属性,分别是字符串类型的name,整型的favorite_number,字符串类型的favorite_color,报名:example.avro,类名为:user

2.2.2、“enum”的type并且支持如下的属性:

name: 必有属性,是一个JSON string,提供了enum的名字。
namespace,也是一个JSON string,用来限定和修饰name属性。
aliases: 可选属性,是JSON的一个string数组,为这个enum提供别名。
doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。
symbols: 必有属性,是一个JSON string数组,列举了所有的symbol,在enum中的所有symbol都必须是唯一的,不允许重复。比如下面的例子:
例如

{ "type": "enum",
"name": "Suit",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}

说明:以上表示建立了一个Suit的枚举类,含有symbols里面的属性

2.2.3、Array使用名为"array"的type,并且支持一个属性 items: array中元素的Schema

例如

{
  "type": "record",
  "name": "ArrAvro",
  "fields" : [
    {"name": "arr", "type": ["null",{"type":"array", "items":"string"}]}
  ]
}

2.2.4、Map使用名为"map"的type,并且支持一个属性 values: 用来定义map的值的Schema。Maps的key都是string。

比如一个key为string,value为long的maps定义为:

例如
{
  "type": "record",
  "name": "MapAvro",
  "fields" : [
    {"name": "map", "type": ["null",{"type":"map", "values":"string"}]}
  ]

}

2.2.5、序列化文件样式及RPC通讯协议样式编写

序列化schema文件

Members.avsc

{
       "namespace":"com.ifree.serrpc.builder",
       "type":"record",
       "name":"Members",
       "fields":[
            {
                   "name":"userName",
                   "type":"string"
            },
            {
                   "name":"userPwd",
                   "type":"string"
            },
            {
                   "name":"realName",
                   "type":"string"
            }
            
      ]
      
}

说明:该文件中的namespace命名空间在生成代码的时候会自动生成包路径;type类型是record复合类型;name为Members,在生成的时候,会生成一个Members类,建议首字母大写;fields是一个字段集合,里面的每一个字段类似与Java的实体字段,如userName字段,类型为String


RPC通讯协议protocol文件

Members.avpr

{
       "namespace":"com.ifree.serrpc.builder",
       "protocol":"MemberIFace",
       "types":[
            {
                   "type":"record",
                   "name":"Members",
                   "fields":[
                        {
                               "name":"userName",
                               "type":"string"
                        },
                        {
                               "name":"userPwd",
                               "type":"string"
                        },
                        {
                               "name":"realName",
                               "type":[
                                     "string",
                                     "null"
                              ]
                              
                        }
                        
                  ]
                  
            },
            {
                   "type":"record",
                   "name":"Retmsg",
                   "fields":[
                        {
                               "name":"msg",
                               "type":"string"
                        }
                  ]
            }
            
      ],
       "messages":{
             "login":{
                   "doc":"member login.",
                   "request":[
                        {
                               "name":"m",
                               "type":"Members"
                        }
                        
                  ],
                   "response":"Retmsg"
            }
            
      }
}

说明:该文件中,namespace表示包路径;protocol表示协议,名字是MemberIFace,这里在工具生成的时候会生成一个类,故首字母大写;types是一个类型集合,返回类型和请求类型都可以在这里定义,里面的定义方式可参考序列化化定义文件;messages是表示一个请求返回消息体,login表示一个Java方法,里面有两部分组成,请求和响应,request表示请求,里面是一个members类型的对象,response表示相应,返回值,这里是Retmsg对象。

三、序列化编码

     Avro有两种序列化编码:binary和JSON。

四、序列化

      在使用序列化时,我们可以有两种方式来实现,一种是静态方式,采用schema文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析schema文件内容,动态设置内容。

五、RPC

      在使用Avro进行RPC通讯时,我们可以有两种方式来实现,一种是静态方式,采用protocol文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析protocol文件内容,动态设置内容。

六、综合示例

序列化Schema和RPC的Protocol都是上面的文件,这里不做列出

Java代码:

服务端代码:含(动态|工具)序列化、RPC通讯代码

package com.ifree.serrpc.avro;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;

import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.generic.GenericResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Test;

import com.ifree.serrpc.builder.MemberIFace;
import com.ifree.serrpc.builder.Members;
import com.ifree.serrpc.impl.MemberIFaceImpl;

/**
 * 会员信息处理服务端
 * 
 * @author ifree
 *
 */
public class MemberServerProvider {

	/**
	 * 动态序列化:通过动态解析Schema文件进行内容设置,并序列化内容
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberInfoDynSer() throws IOException {
		// 1.解析schema文件内容
		Parser parser = new Parser();
		Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc"));
		// 2.构建数据写对象
		DatumWriter<GenericRecord> mGr = new SpecificDatumWriter<GenericRecord>(mSchema);
		DataFileWriter<GenericRecord> mDfw = new DataFileWriter<GenericRecord>(mGr);
		// 3.创建序列化文件
		mDfw.create(mSchema, new File("E:/avro/members.avro"));
		// 4.添加序列化数据
		for (int i = 0; i < 20; i++) {
			GenericRecord gr = new GenericData.Record(mSchema);
			int r = i * new Random().nextInt(50);
			gr.put("userName", "xiaoming-" + r);
			gr.put("userPwd", "9999" + r);
			gr.put("realName", "小明" + r + "号");
			mDfw.append(gr);
		}
		// 5.关闭数据文件写对象
		mDfw.close();
		System.out.println("Dyn Builder Ser Start Complete.");
	}

	/**
	 * 通过Java工具生成文件方式进行序列化操作 命令:C:\Users\Administrator>java -jar
	 * E:\avro\avro-tools-1.7.7.jar compile schema E:\avro\Members.avsc E:\avro
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberInfoToolsSer() throws IOException {
		// 1.为Member生成对象进行设置必要的内容,这里实现三种设置方式的演示
		// 1.1、构造方式
		Members m1 = new Members("xiaoming", "123456", "校名");
		// 1.2、属性设置
		Members m2 = new Members();
		m2.setUserName("xiaoyi");
		m2.setUserPwd("888888");
		m2.setRealName("小艺");
		// 1.3、Builder方式设置
		Members m3 = Members.newBuilder().setUserName("xiaohong").setUserPwd("999999").setRealName("小红").build();
		// 2.构建反序列化写对象
		DatumWriter<Members> mDw = new SpecificDatumWriter<Members>(Members.class);
		DataFileWriter<Members> mDfw = new DataFileWriter<Members>(mDw);
		// 2.1.通过对Members.avsc的解析创建Schema
		Schema schema = new Parser().parse(AvroSerProvider.class.getClass().getResourceAsStream("/Members.avsc"));
		// 2.2.打开一个通道,把schema和输出的序列化文件关联起来
		mDfw.create(schema, new File("E:/avro/members.avro"));
		// 4.把刚刚创建的Users类数据追加到数据文件写入对象中
		mDfw.append(m1);
		mDfw.append(m2);
		mDfw.append(m3);
		// 5.关闭数据文件写入对象
		mDfw.close();
		System.out.println("Tools Builder Ser Start Complete.");
	}

	// ******************************************************ser
	// end*********************************************************
	/**
	 * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer
	 * 采用HTTPSERVER方式调用
	 * 
	 * @throws IOException
	 * @throws InterruptedException
	 */
	@Test
	public void MemberHttpRPCDynBuilderServer() throws IOException, InterruptedException {
		// 1.进行业务处理
		GenericResponder gr = bussinessDeal();
		// 2.开启一个HTTP服务端,进行等待客户端的连接
		Server server = new HttpServer(gr, 60090);
		server.start();
		System.out.println("Dyn Builder PRC Start Complete.");
		server.join();
	}

	/**
	 * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer
	 * 采用Netty方式调用
	 * 
	 * @throws IOException
	 * @throws InterruptedException
	 */
	@Test
	public void MemberNettyRPCDynBuilderServer() throws IOException, InterruptedException {
		// 1.进行业务处理
		GenericResponder gr = bussinessDeal();
		// 2.开启一个Netty服务端,进行等待客户端的连接
		Server server = new NettyServer(gr, new InetSocketAddress(60090));
		server.start();
		System.out.println("Dyn Builder PRC Start Complete.");
		server.join();
	}

	/**
	 * 主要进行业务处理 服务端逻辑处理 采用动态生成代码处理方式,客户端和服务端只需要有protocol文件即可,不需要手工生成代码
	 * 
	 * @return
	 * @throws IOException
	 */
	private GenericResponder bussinessDeal() throws IOException {
		// 1.构建协议
		final Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr"));
		// 2.构建业务逻辑及响应客户端
		GenericResponder gr = new GenericResponder(protocol) {
			@Override
			public Object respond(Message message, Object request) throws Exception {
				System.err.println("request:" + request);
				// 3.获取请求信息
				GenericRecord record = (GenericRecord) request;
				GenericRecord retGr = null;
				// 4.判断请求的方法
				if (message.getName().equals("login")) {
					// 5.获取到传输的参数
					Object obj = record.get("m");

					GenericRecord mGr = (GenericRecord) obj;
					String userName = mGr.get("userName").toString();
					String userPwd = mGr.get("userPwd").toString();
					// 6.进行相应的业务逻辑处理
					System.out.println("Members:" + ",userName:" + userName + mGr + ",userPwd:" + userPwd);
					String retMsg;
					if (userName.equalsIgnoreCase("rita") && userPwd.equals("123456")) {
						retMsg = "哈哈,恭喜你,成功登录。";
						System.out.println(retMsg);
					} else {
						retMsg = "登录失败。";
						System.out.println(retMsg);
					}
					// 7.获取返回值类型
					retGr = new GenericData.Record(protocol.getMessages().get("login").getResponse());
					// 8.构造回复消息
					retGr.put("msg", retMsg);
				}
				System.err.println("DEAL SUCCESS!");
				return retGr;
			}
		};
		return gr;
	}
	
	
	/**
	 * Java工具生成协议代码方式:java -jar  E:\avro\avro-tools-1.7.7.jar compile protocol E:\avro\Members.avpr E:\avro
	 * 功能和动态调用方式一致
	 * @throws InterruptedException 
	 */
	@Test
	public void MemberNettyRPCToolsBuilderServer() throws InterruptedException{
		//1.构造接口和实现类的映射相应对象,MemberIFaceImpl该类为具体的业务实现类
		SpecificResponder responder=new SpecificResponder(MemberIFace.class, new MemberIFaceImpl());
		//2.Netty启动RPC服务
		Server server=new NettyServer(responder, new InetSocketAddress(60090));
		server.start();
		System.out.println("Tools Builder PRC Start Complete.");
		server.join();
	}

}

客户端代码:含(动态|工具)反序列化、RPC通讯代码

package com.ifree.serrpc.avro;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;

import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.specific.SpecificDatumReader;
import org.junit.Test;

import com.ifree.serrpc.builder.MemberIFace;
import com.ifree.serrpc.builder.Members;
import com.ifree.serrpc.builder.Retmsg;

/**
 * 服务消费者 该类测试了通过工具和动态序列化及反序列化两种方式,同时测试了通过工具生成代码及动态调用RPC服务两种方式
 * 
 * @author ifree
 *
 */
public class MemberServerConsumer {

	/**
	 * 动态反序列:通过Schema文件进行动态反序列化操作
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberInfoDynDeser() throws IOException {
		// 1.schema文件解析
		Parser parser = new Parser();
		Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc"));

		// 2.构建数据读对象
		DatumReader<GenericRecord> mGr = new SpecificDatumReader<GenericRecord>(mSchema);
		DataFileReader<GenericRecord> mDfr = new DataFileReader<GenericRecord>(new File("E:/avro/members.avro"), mGr);
		// 3.从序列化文件中进行数据反序列化取出数据
		GenericRecord gr = null;
		while (mDfr.hasNext()) {
			gr = mDfr.next();
			System.err.println("deser data:" + gr.toString());
		}
		mDfr.close();
		System.out.println("Dyn Builder Ser Start Complete.");
	}

	/**
	 * 通过Java工具来生成必要的类,进行反序列化操作
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberInfoToolsDeser() throws IOException {
		// 1.构建反序列化读取对象
		DatumReader<Members> mDr = new SpecificDatumReader<Members>(Members.class);
		DataFileReader<Members> mDfr = new DataFileReader<Members>(new File("E:/avro/members.avro"), mDr);
		Members m = null;
		// 2.循环读取文件数据
		while (mDfr.hasNext()) {
			m = mDfr.next();
			System.err.println("tools deser data :" + m);
		}
		// 3.关闭读取对象
		mDfr.close();
		System.out.println("Tools Builder Ser Start Complete.");
	}

	/**
	 * 采用HTTP方式建立和服务端的连接
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberHttpRPCDynBuilderClient() throws IOException {
		// 1.建立和服务端的http通讯
		Transceiver transceiver = new HttpTransceiver(new URL("http://192.168.1.116:60090"));
		bussinessDeal(transceiver);
	}

	/**
	 * 采用Netty方式建立和服务端的连接
	 * 
	 * @throws IOException
	 */
	@Test
	public void MemberNettyRPCDynBuilderClient() throws IOException {
		// 1.建立和服务端的Netty通讯
		Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090));
		// 2.进行必要的业务处理
		bussinessDeal(transceiver);
	}

	/**
	 * 进行必要的业务处理
	 * 
	 * @param transceiver
	 * @throws IOException
	 */
	private void bussinessDeal(Transceiver transceiver) throws IOException {
		// 2.获取协议
		Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr"));
		// 3.根据协议和通讯构造请求对象
		GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
		// 4.根据schema获取messages主节点内容
		GenericRecord loginGr = new GenericData.Record(protocol.getMessages().get("login").getRequest());
		// 5.在根据协议里面获取request中的schema
		GenericRecord mGr = new GenericData.Record(protocol.getType("Members"));
		// 6.设置request中的请求数据
		mGr.put("userName", "rita");
		mGr.put("userPwd", "123456");
		// 7、把二级内容加入到一级message的主节点中
		loginGr.put("m", mGr);
		// 8.设置完毕后,请求方法,正式发送访问请求信息,并得到响应内容
		Object retObj = requestor.request("login", loginGr);
		// 9.进行解析操作
		GenericRecord upGr = (GenericRecord) retObj;
		System.out.println(upGr.get("msg"));
	}

	/**
	 * Java工具生成协议代码方式:java -jar E:\avro\avro-tools-1.7.7.jar compile protocol
	 * E:\avro\Members.avpr E:\avro 功能和动态调用方式一致
	 * 
	 * @throws InterruptedException
	 * @throws IOException
	 */
	@Test
	public void MemberNettyRPCToolsBuilderClient() throws InterruptedException, IOException {
		// 1.和服务端建立通讯
		Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090));
		// 2.获取客户端对象
		MemberIFace memberIFace = SpecificRequestor.getClient(MemberIFace.class, transceiver);
		// 3.进行数据设置
		Members members = new Members();
		members.setUserName("rita");
		members.setUserPwd("123456");
		// 开始调用登录方法
		Retmsg retmsg = memberIFace.login(members);
		System.out.println("Recive Msg:" + retmsg.getMsg());
	}
}

具体业务实现类

package com.ifree.serrpc.impl;

import org.apache.avro.AvroRemoteException;

import com.ifree.serrpc.builder.MemberIFace;
import com.ifree.serrpc.builder.Members;
import com.ifree.serrpc.builder.Retmsg;

/**
* 具体的业务处理类
* @author Administrator
*
*/
public class MemberIFaceImpl implements MemberIFace {

     final String userName="rita";
     final String userPwd="888888";
     /**
     * 登录业务处理
     */
     @Override
     public Retmsg login(Members m) throws AvroRemoteException {
          //验证登录权限
          if(m.getUserName().equals(userName)&&m.getUserPwd().equals(userPwd)){
               return new Retmsg("恭喜你,登录成功,欢迎进入AVRO测试环境。");
          }
          return new Retmsg("对不起,权限不足,不能登录。");
     }

}


demo地址:https://git.oschina.net/ifree613/SerRpcDemo.git


大数据培训<一> Avro

标签:

原文地址:http://my.oschina.net/tearsky/blog/509610

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!