码迷,mamicode.com
首页 > Web开发 > 详细

高性能存储项目笔记-netty1

时间:2017-08-24 00:14:39      阅读:269      评论:0      收藏:0      [点我收藏+]

标签:server   网络框架   add   dia   原因   java   editor   定义   read   

大四毕业准研一的项目,项目主要用于接收udp,tcp,dns等数据,进行分析存盘。存盘后用于数据挖掘试着找出有异常行为的僵尸网络主机。底层网络框架使用netty。

netty的简介:

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

netty的maven:

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>

netty的性能测试:

项目测试时用700Mb/s(忘记是MB还是Mb了,应该是Mb)的数据跑了一晚上没有掉包。(之前有掉包的情况出现,原因是由于路由器的最大帧长度为1500字节,而传输的测试数据帧长2000+字节,甚至有4000的,即巨帧)。用1GMb+/s(几乎已经达到测试网线的最大速率)的速度跑过几分钟,没有掉包。性能可以说是非常高了。

 

 客户端是由C++编写,代码这里略。(需要注意的是C++传递的结构体时会进行数据对齐,java按字节读取时补齐的字段会读为0)

接受线程服务器端源码:

  1 /*
  2  * To change this license header, choose License Headers in Project Properties.
  3  * To change this template file, choose Tools | Templates
  4  * and open the template in the editor.
  5  */
  6 package packetserver;
  7 
  8 
  9 import commonclasses.RawData;//原始数据
 10 import commonclasses.ServerProperties;//配置文件
 11 import commonclasses.StatisticData;//统计数据
 12 import io.netty.bootstrap.ServerBootstrap;
 13 import io.netty.channel.ChannelInitializer;
 14 import io.netty.channel.ChannelPipeline;
 15 import io.netty.channel.socket.SocketChannel;
 16 
 17 import java.nio.ByteOrder;//java和linux默认的小端大端顺序不一样,需要设置
 18 import io.netty.handler.codec.ByteToMessageDecoder;
 19 
 20 import io.netty.buffer.ByteBuf;
 21 import io.netty.channel.ChannelFuture;
 22 import io.netty.channel.ChannelHandlerContext;
 23 import io.netty.channel.ChannelInboundHandlerAdapter;
 24 import io.netty.channel.ChannelOption;
 25 import io.netty.channel.EventLoopGroup;
 26 import io.netty.channel.nio.NioEventLoopGroup;
 27 import io.netty.channel.socket.nio.NioServerSocketChannel;
 28 import io.netty.handler.codec.MessageToByteEncoder;
 29 import java.net.InetAddress;
 30 import java.net.NetworkInterface;
 31 import java.util.Enumeration;
 32 import java.util.List;
 33 
 34 /**
 35  *
 36  * @author gaoxiang
 37  */
 38 public class RawDataNetAgent implements Runnable {
 39 
 40     static int port = ServerProperties.rawDataServerPort;
 41     static String hostadd = ServerProperties.recevierServerIP;
 42     RawDataServer s1 = new RawDataServer();
 43 
 44     RawDataAgent fileAgent;
 45     private Thread workerThread;
 46 
 47     RawDataNetAgent(RawDataAgent fileAgent) {
 48         this.fileAgent = fileAgent;
 49 
 50         workerThread = new Thread(this);
 51         workerThread.start();
 52     }
 53 
 54     @Override
 55     public void run() {
 56         try {
 57             s1.bind();
 58             System.out.println("NetAgent ended...");
 59         } catch (Exception e) {
 60             // TODO Auto-generated catch block
 61             e.printStackTrace();
 62         }
 63     }
 64 
 65     class RawDataServerInitializer extends ChannelInitializer<SocketChannel> {
 66 
 67         @Override
 68         protected void initChannel(SocketChannel ch) throws Exception {
 69             ChannelPipeline pipeline = ch.pipeline();
 70             pipeline.addLast(new RawDataServerDecoder());
 71             // pipeline.addLast(new ServerEncoder());
 72             pipeline.addLast(new RawDataServerHandler());
 73 
 74         }
 75     }
 76 
 77     class RawDataServerEncoder extends MessageToByteEncoder {
 78 
 79         @Override
 80         protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
 81             byte[] body = (byte[]) (msg);
 82             int dataLength = body.length;
 83             out.writeInt(dataLength);
 84             out.writeBytes(body);
 85         }
 86 
 87     }
 88 
 89     class RawDataServerDecoder extends ByteToMessageDecoder {
 90 
 91         final int socketHeaderLength = 16;
 92         final int rawHeaderLength = 74;
 93 
 94         @Override
 95         protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 96             if (in.readableBytes() < socketHeaderLength) {
 97                 return;
 98             }
 99             in.markWriterIndex();
100             ByteBuf buf = in.order(ByteOrder.LITTLE_ENDIAN);
101 
102             int ssyn = buf.readInt();
103             if (ssyn != 0xfae9dafc) {
104                 in.readerIndex(in.readerIndex() - 3);
105                 in.discardReadBytes();
106                 System.out.println("ssyn != 0xfae9dafc");
107                 return;
108             }
109 
110             int totallength = buf.readInt();
111             if (totallength > in.readableBytes() + 8) {
112                 in.resetReaderIndex();
113                 return;
114             }
115             out.add(in.readBytes(totallength - socketHeaderLength + 8));
116             in.discardReadBytes();
117 
118         }
119 
120     }
121 
122     class RawDataServerHandler extends ChannelInboundHandlerAdapter {
123 
124         int num = 0;
125 
126         @Override
127         public void channelActive(ChannelHandlerContext ctx) throws Exception {
128             System.out.println("channelActive work");
129         }
130 
131         @Override
132         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
133             cause.printStackTrace();
134             ctx.close();
135         }
136 
137         public int verseInt(int i) {
138             int j;
139             j = (i >> 24) & 0xff | (i >> 8) & 0xff00 | (i << 8) & 0Xff0000 | (i << 24) & 0xff000000;
140             return j;
141         }
142 
143         @Override
144         public void channelRead(ChannelHandlerContext ctx, Object _msg) throws Exception {
145             ByteBuf bytes = ((ByteBuf) _msg).order(ByteOrder.LITTLE_ENDIAN);
146             int type = bytes.readUnsignedShort();
147             int code = bytes.readUnsignedShort();
148             long capip = bytes.readUnsignedInt();
149             bytes.discardReadBytes();
150             if (type == 1000 && code == 1001) {
151                 SoKeyRz msg = new SoKeyRz(bytes);
152                 RawData rf = new RawData();
153                 rf.protocolNumber = (byte) msg.tuple5.protocol;
154                 rf.sPort = (short) msg.tuple5.source;
155                 rf.dPort = (short) msg.tuple5.dest;
156                 rf.sIP = verseInt((int) msg.tuple5.saddr);
157                 rf.dIP = verseInt((int) msg.tuple5.daddr);
158                 rf.timeStamp = msg.t_s * 1000 + msg.t_ms / 1000;// Caution, just for
159                 // test!!!!!!!
160                 rf.payload = msg.data;
161                 /**
162                  * have been changed to fit mapdb
163                  */
164                 Object[] object = new Object[]{
165                     rf.sIP, rf.dIP, rf.sPort, rf.dPort, rf.protocolNumber, rf.timeStamp,
166                 };
167                 AddRawDataToQueue(rf);
168 
169             }
170             bytes.release();
171         }
172 
173         public void AddRawDataToQueue(RawData rf) {
174             fileAgent.offer(rf);
175         }
176     }
177 
178     class RawDataServer {
179 
180         ChannelFuture f;
181 
182         public void closeServer() {
183             f.channel().close();
184         }
185 
186         public void bind() throws Exception {
187             EventLoopGroup bossGroup = new NioEventLoopGroup();
188             EventLoopGroup workerGroup = new NioEventLoopGroup();
189             try {
190                 ServerBootstrap b = new ServerBootstrap();
191                 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// .localAddress(inetHost,
192                         // inetPort)
193                         .childHandler(new RawDataServerInitializer()).option(ChannelOption.SO_BACKLOG, 2048)
194                         .childOption(ChannelOption.SO_KEEPALIVE, true);
195 
196                 f = b.bind(RawDataNetAgent.hostadd, RawDataNetAgent.port).sync();
197                 f.channel().closeFuture().sync();
198             } finally {
199                 bossGroup.shutdownGracefully();
200                 workerGroup.shutdownGracefully();
201             }
202         }
203 
204         private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
205 
206             @Override
207             protected void initChannel(SocketChannel arg0) throws Exception {
208                 System.out.println("server initChannel..");
209                 arg0.pipeline().addLast(new RawDataServerHandler());
210             }
211         }
212 
213         public String getAddr(String rcvName) {
214             Enumeration<NetworkInterface> netInterfaces = null;
215             try {
216                 netInterfaces = NetworkInterface.getNetworkInterfaces();
217 
218                 while (netInterfaces.hasMoreElements()) {
219 
220                     NetworkInterface ni = netInterfaces.nextElement();
221 
222                     if (ni.getDisplayName().equals(rcvName)) {
223                         Enumeration<InetAddress> ips = ni.getInetAddresses();
224                         ips.nextElement();
225                         return ips.nextElement().getHostAddress();
226                     }
227                 }
228             } catch (Exception e) {
229                 e.printStackTrace();
230             }
231             return "localhost";
232 
233         }
234 
235     }
236 
237     class SoKeyRz {//包的格式定义,包名公司定的
238 
239         long soid;
240         long cap_ip;
241         int cap_port;
242         long t_s;
243         long t_ms;
244         byte[] srcmac = new byte[6];
245         byte[] dstmac = new byte[6];
246         Tuple5 tuple5 = new Tuple5();
247         char src_dep[] = new char[16];
248         long datalen;
249         char reserve[] = new char[8];
250         byte data[] = null;
251 
252         public SoKeyRz(ByteBuf buf) {
253             soid = buf.readUnsignedInt();
254             cap_ip = buf.readUnsignedInt();
255             cap_port = buf.readUnsignedShort();
256             t_s = buf.readUnsignedInt();
257             t_ms = buf.readUnsignedInt();
258             ///////////// buf.readCharSequence();
259             for (int i = 0; i < 6; i++) {
260                 srcmac[i] = buf.readByte();
261             }
262             // buf.readCharSequence(6, srcmac);
263             for (int i = 0; i < 6; i++) {
264                 dstmac[i] = buf.readByte();
265             }
266             tuple5.protocol = buf.readInt();
267             tuple5.source = buf.readUnsignedShort();
268             tuple5.dest = buf.readUnsignedShort();
269             tuple5.saddr = buf.readUnsignedInt();
270             tuple5.daddr = buf.readUnsignedInt();
271             for (int i = 0; i < 16; i++) {
272                 src_dep[i] = (char) buf.readByte();
273             }
274             datalen = buf.readUnsignedInt();
275             for (int i = 0; i < 8; i++) {
276                 reserve[i] = (char) buf.readByte();
277             }
278             data = new byte[(int) datalen];
279             buf.readBytes(data);
280         }
281     ;
282 
283     }
284 
285 class Tuple5 {//tcp五元组
286 
287         int protocol;
288         int source;
289         int dest;
290         long saddr;
291         long daddr;
292     }
293 }

 



 

高性能存储项目笔记-netty1

标签:server   网络框架   add   dia   原因   java   editor   定义   read   

原文地址:http://www.cnblogs.com/theWeirdCode/p/7420721.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!