码迷,mamicode.com
首页 > 移动开发 > 详细

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

时间:2016-07-08 11:52:48      阅读:330      评论:0      收藏:0      [点我收藏+]

标签:

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制、分区机制,先介绍一下业务场景:
先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和。
本次描述所用数据:
日志格式描述:
技术分享
日志flowdata.txt中的具体数据:
技术分享
首先我们先通过mapreduce程序实现上面的业务逻辑:
代码实现:

package FlowSum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;



//本程序的目的是统计每个用户的上行流量和,下行流量和,以及总流量和:用到的知识点是hadoop中自定义数据类型(序列化与反序列化机制)
public class MsisdnFlowSum
{
      public static String path1 = "file:///C:\\flowdata.txt";
      public static String path2 = "file:///C:\\flowdir\\";
      public static void main(String[] args) throws Exception
      {

            Configuration conf = new Configuration();
            FileSystem fileSystem = FileSystem.get(conf);

            if(fileSystem.exists(new Path(path2)))
            {
                fileSystem.delete(new Path(path2), true);
            }

            Job job = Job.getInstance(conf);
            job.setJarByClass(MsisdnFlowSum.class);

            FileInputFormat.setInputPaths(job, new Path(path1));
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowType.class);

            job.setNumReduceTasks(1);
            job.setPartitionerClass(HashPartitioner.class);


            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowType.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, new Path(path2));
            job.waitForCompletion(true);    
            //查看运行结果:
            FSDataInputStream fr = fileSystem.open(new Path("file:///C:\\flowdir\\part-r-00000"));
            IOUtils.copyBytes(fr, System.out, 1024, true);
      }
      public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowType>
      {
            protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
            {
                  //拿到日志中的一行数据
                  String line = v1.toString();
                  //切分各个字段
                  String[] splited = line.split("\t");
                  //获取我们所需要的字段:手机号、上行流量、下行流量
                  String msisdn = splited[1]; 
                  long upPayLoad = Long.parseLong(splited[8]);
                  long downPayLoad = Long.parseLong(splited[9]);
                  //将数据进行输出
                  context.write(new Text(msisdn), new FlowType(upPayLoad,downPayLoad));
            }
      }   
      public static class MyReducer extends Reducer<Text, FlowType, Text, FlowType>
      {
            protected void reduce(Text k2, Iterable<FlowType> v2s,Context context)throws IOException, InterruptedException
            {
                   long payLoadSum = 0L;      //计算每个用户的上行流量和
                   long downLoadSum = 0L;    //统计每个用户的下行流量和
                   //数据传递过来的时候:<手机号,{FlowType1,FlowType2,FlowType3……}>
                   for (FlowType v2 : v2s)
                  {
                      payLoadSum += v2.upPayLoad;
                      downLoadSum += v2.downPayLoad;              
                  }
                  context.write(k2, new FlowType(payLoadSum,downLoadSum));  //在此需要重写toString()方法 
            }   
      }
}
class FlowType  implements  Writable
{
         public long upPayLoad;//上行流量
         public long downPayLoad;//下行流量
         public long loadSum; //总流量

         public FlowType(){}
         public FlowType(long upPayLoad,long downPayLoad)
         {
              this.upPayLoad = upPayLoad;
              this.downPayLoad = downPayLoad;

              this.loadSum = upPayLoad + downPayLoad;//利用构造函数的技巧,创建构造函数时,总流量被自动求出    
         }
        //只要数据在网络中进行传输,就需要序列化与反序列化
        //先序列化,将对象(字段)写到字节输出流当中
        public void write(DataOutput fw) throws IOException
        {
            fw.writeLong(upPayLoad);
            fw.writeLong(downPayLoad);  
        }
        //反序列化,将对象从字节输入流当中读取出来,并且序列化与反序列化的字段顺序要相同
        public void readFields(DataInput  fr) throws IOException
        {
           this.upPayLoad =  fr.readLong();//将上行流量给反序列化出来
           this.downPayLoad =  fr.readLong();   //将下行流量给反序列化出来  
        }
        public String toString()
        {
            return "" + this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
        }
}

运行结果:

13480253104 180 180 360
13502468823 7335    110349  117684
13560439658 2034    5892    7926
13600217502 1080    186852  187932
13602846565 1938    2910    4848
13660577991 6960    690 7650
13719199419 240 0   240
13726230503 2481    24681   27162
13760778710 120 120 240
13823070001 360 180 540
13826544101 264 0   264
13922314466 3008    3720    6728
13925057413 11058   48243   59301
13926251106 240 0   240
13926435656 132 1512    1644
15013685858 3659    3538    7197
15920133257 3156    2936    6092
15989002119 1938    180 2118
18211575961 1527    2106    3633
18320173382 9531    2412    11943
84138413    4116    1432    5548

接下来我们将针对上面的结果进行二次排序,在实际的业务当中,二次排序总是基于上面的结果进行排序的,具体实现代码:
二次排序的标准:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序

package FlowSum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

//本程序的目的是针对流量统计结果进行二次排序
/*
二次排序的方式:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序
*/
public class FlowSort
{
    public static String path1 = "file:///C:\\result1.txt";
    public static String path2 = "file:///C:\\sortdir";
    public static void main(String[] args) throws Exception
    {
          Configuration conf = new Configuration();
          FileSystem fileSystem = FileSystem.get(conf);

          if(fileSystem.exists(new Path(path2)))
          {
              fileSystem.delete(new Path(path2), true);
          }

          Job job = Job.getInstance(conf);
          job.setJarByClass(FlowSort.class);

          FileInputFormat.setInputPaths(job, new Path(path1));
          job.setInputFormatClass(TextInputFormat.class);
          job.setMapperClass(MyMapper.class);
          job.setMapOutputKeyClass(SortType.class);
          job.setMapOutputValueClass(NullWritable.class);

          job.setNumReduceTasks(1);
          job.setPartitionerClass(HashPartitioner.class);


          job.setReducerClass(MyReducer.class);
          job.setOutputKeyClass(SortType.class);
          job.setOutputValueClass(NullWritable.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          FileOutputFormat.setOutputPath(job, new Path(path2));
          job.waitForCompletion(true);    
          //查询排序结果:
          FSDataInputStream fr = fileSystem.open(new Path("file:///C:\\sortdir\\part-r-00000"));
          IOUtils.copyBytes(fr, System.out, 1024, true);
    }
    public static class MyMapper extends Mapper<LongWritable, Text, SortType, NullWritable>
    {
        protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
        {
            //拿到日志中的一行数据
            String line = v1.toString();
            //切分各个字段
            String[] splited = line.split("\t");
            //获取我们所需要的字段:手机号、上行流量、下行流量、总流量
            String msisdn = splited[0]; 
            long upPayLoad = Long.parseLong(splited[1]);
            long downPayLoad = Long.parseLong(splited[2]);
            long loadSum = Long.parseLong(splited[3]);
            context.write(new SortType(msisdn,upPayLoad,downPayLoad,loadSum), NullWritable.get());
        }
    }
    public static class MyReducer extends Reducer<SortType, NullWritable, SortType, NullWritable>
    {
        protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
        {
                context.write(k2, NullWritable.get());
        }
    }
}
@SuppressWarnings("rawtypes")
class SortType  implements  WritableComparable
{
    public String msisdn;   //用户手机号
    public long upPayLoad;//上行流量
    public long downPayLoad;//下行流量
    public long loadSum;   //总流量


    public SortType(){}
    public SortType(String msisdn, long upPayLoad, long downPayLoad,long loadSum)
    {
        this.msisdn = msisdn;
        this.upPayLoad = upPayLoad;
        this.downPayLoad = downPayLoad;
        this.loadSum = loadSum;
    }

    public void write(DataOutput fw) throws IOException
    {
        fw.writeUTF(msisdn);
        fw.writeLong(upPayLoad);
        fw.writeLong(downPayLoad); 
        fw.writeLong(loadSum);

    }
    public void readFields(DataInput fr) throws IOException
    {
        this.msisdn = fr.readUTF();
        this.upPayLoad =  fr.readLong();
        this.downPayLoad =  fr.readLong();
        this.loadSum = fr.readLong();   
    }

    public int compareTo(Object obj)  //重写compareTo方法,指定排序标准
    {  
            SortType cc = (SortType)obj;  
            if(this.loadSum!=cc.loadSum)  
                return (int) (this.loadSum - cc.loadSum);//loadSum升序排列  
            else  if(this.loadSum==cc.loadSum)
            {
                return (int) (cc.downPayLoad -this.downPayLoad );//downPayLoad降序排列  
            }
            else
                return (int) (this.upPayLoad -cc.upPayLoad );//upPayLoad升序排列        
    }  
    public String toString()
    {
        return "" + this.msisdn+"\t"+this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
    }
}

运行日式与结果:

Map-Reduce Framework
        Map input records=21
        Map output records=21
        Map output bytes=774
        Map output materialized bytes=822
        Input split bytes=85
        Combine input records=0
        Combine output records=0
        Reduce input groups=20
        Reduce shuffle bytes=822
        Reduce input records=21
        Reduce output records=20
        Spilled Records=42
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=535822336

13760778710 120 120 240
13926251106 240 0   240
13826544101 264 0   264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512    1644
15989002119 1938    180 2118
18211575961 1527    2106    3633
13602846565 1938    2910    4848
84138413    4116    1432    5548
15920133257 3156    2936    6092
13922314466 3008    3720    6728
15013685858 3659    3538    7197
13660577991 6960    690 7650
13560439658 2034    5892    7926
18320173382 9531    2412    11943
13726230503 2481    24681   27162
13925057413 11058   48243   59301
13502468823 7335    110349  117684
13600217502 1080    186852  187932

从上面的运行日志上面我们可以看出,mapreduce读取进来的数据是21条,但是输出的数据是20条,很明显,少了1条数据,针对这个问题,修改措施如下:

将:
protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
        {
                context.write(k2, NullWritable.get());
        }
修改为:
    protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
        {
                for (NullWritable v2 : v2s)
                {
                    context.write(k2, v2);
                }
        }

修改完代码后,在次运行程序,相应日志和结果如下:

Map-Reduce Framework
        Map input records=21
        Map output records=21
        Map output bytes=774
        Map output materialized bytes=822
        Input split bytes=85
        Combine input records=0
        Combine output records=0
        Reduce input groups=20
        Reduce shuffle bytes=822
        Reduce input records=21
        Reduce output records=21
        Spilled Records=42
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=11
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=467664896

13760778710 120 120 240
13926251106 240 0   240
13719199419 240 0   240
13826544101 264 0   264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512    1644
15989002119 1938    180 2118
18211575961 1527    2106    3633
13602846565 1938    2910    4848
84138413    4116    1432    5548
15920133257 3156    2936    6092
13922314466 3008    3720    6728
15013685858 3659    3538    7197
13660577991 6960    690 7650
13560439658 2034    5892    7926
18320173382 9531    2412    11943
13726230503 2481    24681   27162
13925057413 11058   48243   59301
13502468823 7335    110349  117684
13600217502 1080    186852  187932

从结果可以看出,二次排序的结果达到了我们的预期要求。
接下来我们进行分区:将手机号的流量统计结果输出到一个文件中,将非手机号(电话号)的流量统计结果输出到一个文件中。
实现代码:

package FlowSum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

//本程序的目的是针对流量统计结果进行二次排序和分区
/*
二次排序的方式:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序
*/
public class FlowSort
{
    public static String path1 = "file:///C:\\result1.txt";
    public static String path2 = "file:///C:\\sortdir";
    public static void main(String[] args) throws Exception
    {
          Configuration conf = new Configuration();
          FileSystem fileSystem = FileSystem.get(conf);

          if(fileSystem.exists(new Path(path2)))
          {
              fileSystem.delete(new Path(path2), true);
          }

          Job job = Job.getInstance(conf);
          job.setJarByClass(FlowSort.class);

          FileInputFormat.setInputPaths(job, new Path(path1));
          job.setInputFormatClass(TextInputFormat.class);
          job.setMapperClass(MyMapper.class);
          job.setMapOutputKeyClass(SortType.class);
          job.setMapOutputValueClass(NullWritable.class);

          job.setNumReduceTasks(2);
          job.setPartitionerClass(MyPartitioner.class);


          job.setReducerClass(MyReducer.class);
          job.setOutputKeyClass(SortType.class);
          job.setOutputValueClass(NullWritable.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          FileOutputFormat.setOutputPath(job, new Path(path2));
          job.waitForCompletion(true);    
    }
    public static class MyMapper extends Mapper<LongWritable, Text, SortType, NullWritable>
    {
        protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
        {
            //拿到日志中的一行数据
            String line = v1.toString();
            //切分各个字段
            String[] splited = line.split("\t");
            //获取我们所需要的字段:手机号、上行流量、下行流量、总流量
            String msisdn = splited[0]; 
            long upPayLoad = Long.parseLong(splited[1]);
            long downPayLoad = Long.parseLong(splited[2]);
            long loadSum = Long.parseLong(splited[3]);
            context.write(new SortType(msisdn,upPayLoad,downPayLoad,loadSum), NullWritable.get());
        }
    }
    public static class MyPartitioner extends Partitioner<SortType, NullWritable>
    {
        public int getPartition(SortType k2, NullWritable v2, int num)
        {
              String tele = k2.msisdn;//获取到相应的手机号
              if(tele.length()==11)
                  return 0; //手机号的输出到0区
              else
                  return 1;//非手机号输出到1区
        }
    }

    public static class MyReducer extends Reducer<SortType, NullWritable, SortType, NullWritable>
    {
        protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
        {
                for (NullWritable v2 : v2s)
                {
                    context.write(k2, v2);
                }
        }
    }
}
@SuppressWarnings("rawtypes")
class SortType  implements  WritableComparable
{
    public String msisdn;   //用户手机号
    public long upPayLoad;//上行流量
    public long downPayLoad;//下行流量
    public long loadSum;   //总流量


    public SortType(){}
    public SortType(String msisdn, long upPayLoad, long downPayLoad,long loadSum)
    {
        this.msisdn = msisdn;
        this.upPayLoad = upPayLoad;
        this.downPayLoad = downPayLoad;
        this.loadSum = loadSum;
    }

    public void write(DataOutput fw) throws IOException
    {
        fw.writeUTF(msisdn);
        fw.writeLong(upPayLoad);
        fw.writeLong(downPayLoad); 
        fw.writeLong(loadSum);

    }
    public void readFields(DataInput fr) throws IOException
    {
        this.msisdn = fr.readUTF();
        this.upPayLoad =  fr.readLong();
        this.downPayLoad =  fr.readLong();
        this.loadSum = fr.readLong();   
    }

    public int compareTo(Object obj)  //重写compareTo方法,指定排序标准
    {  
            SortType cc = (SortType)obj;  
            if(this.loadSum!=cc.loadSum)  
                return (int) (this.loadSum - cc.loadSum);//loadSum升序排列  
            else  if(this.loadSum==cc.loadSum)
            {
                return (int) (cc.downPayLoad -this.downPayLoad );//downPayLoad降序排列  
            }
            else
                return (int) (this.upPayLoad -cc.upPayLoad );//upPayLoad升序排列        
    }  
    public String toString()
    {
        return "" + this.msisdn+"\t"+this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
    }
}

查看运行结果:
技术分享
part-r-00000中的内容为:

0区的结果是:
13760778710 120 120 240
13926251106 240 0   240
13719199419 240 0   240
13826544101 264 0   264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512    1644
15989002119 1938    180 2118
18211575961 1527    2106    3633
13602846565 1938    2910    4848
15920133257 3156    2936    6092
13922314466 3008    3720    6728
15013685858 3659    3538    7197
13660577991 6960    690 7650
13560439658 2034    5892    7926
18320173382 9531    2412    11943
13726230503 2481    24681   27162
13925057413 11058   48243   59301
13502468823 7335    110349  117684
13600217502 1080    186852  187932

part-r-00001中的内容为:

1区的结果是:
84138413    4116    1432    5548

上面是在hadoop的本地运行模式下进行调试的结果,我们接下来到hadoop的集群模式进行测试:
修改部分代码:

    public static String path1 = "";
    public static String path2 = "";
    public static void main(String[] args) throws Exception
    {
          path1 = args[0];
          path2 = args[1];
          Configuration conf = new Configuration();
          FileSystem fileSystem = FileSystem.get(conf);

          if(fileSystem.exists(new Path(path2)))
          {
              fileSystem.delete(new Path(path2), true);
          }

          Job job = Job.getInstance(conf);
          job.setJarByClass(FlowSort.class);

          FileInputFormat.setInputPaths(job, new Path(path1));
          job.setInputFormatClass(TextInputFormat.class);
          job.setMapperClass(MyMapper.class);
          job.setMapOutputKeyClass(SortType.class);
          job.setMapOutputValueClass(NullWritable.class);

          job.setNumReduceTasks(2);
          job.setPartitionerClass(MyPartitioner.class);


          job.setReducerClass(MyReducer.class);
          job.setOutputKeyClass(SortType.class);
          job.setOutputValueClass(NullWritable.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          FileOutputFormat.setOutputPath(job, new Path(path2));
          job.waitForCompletion(true);    
    }

jar包方式运行:

[root@hadoop11 local]# hadoop jar Flow.jar  /result1.txt  /resdir

查看运行结果:

[root@hadoop11 local]# hadoop fs -lsr   /resdir/
lsr: DEPRECATED: Please use ‘ls -R‘ instead.
-rw-r--r--   1 root supergroup          0 2016-07-08 10:19 /resdir/_SUCCESS
-rw-r--r--   1 root supergroup        527 2016-07-08 10:19 /resdir/part-r-00000
-rw-r--r--   1 root supergroup         24 2016-07-08 10:19 /resdir/part-r-00001
[root@hadoop11 local]# hadoop fs -cat /resdir/part-r-00000
13760778710     120     120     240
13926251106     240     0       240
13719199419     240     0       240
13826544101     264     0       264
13480253104     180     180     360
13823070001     360     180     540
13926435656     132     1512    1644
15989002119     1938    180     2118
18211575961     1527    2106    3633
13602846565     1938    2910    4848
15920133257     3156    2936    6092
13922314466     3008    3720    6728
15013685858     3659    3538    7197
13660577991     6960    690     7650
13560439658     2034    5892    7926
18320173382     9531    2412    11943
13726230503     2481    24681   27162
13925057413     11058   48243   59301
13502468823     7335    110349  117684
13600217502     1080    186852  187932
[root@hadoop11 local]# hadoop fs -cat /resdir/part-r-00001
84138413        4116    1432    5548

上面就是我们在集群中运行的结果,和期望的是一样的!
如有问题,欢迎留言!

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

标签:

原文地址:http://blog.csdn.net/a2011480169/article/details/51858756

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!