码迷,mamicode.com
首页 > Windows程序 > 详细

C#使用MapReduce实现对分片数据的分组

时间:2017-08-16 20:13:48      阅读:699      评论:0      收藏:0      [点我收藏+]

标签:cti   options   man   api   ring   line   com   min   splay   

事由:mongodb已经进行数据分片,这样就不能使用一些方法就不能使用,例如eval,$group如果尝试使用mongodb会提示

Error: {
    "ok" : 0,
    "errmsg" : "Error: Error: can‘t use sharded collection from db.eval @:2:9\n",
    "code" : 2,
    "codeName" : "BadValue"
} :

错误原因:分片服务端不支持单服务器实例方法

经过查找,分片服务器的查询和操作只能使用MapReduce或者Aggregate(聚合管道)操作,这两个mongodb的高级操作可以完成mongo的几乎所有操作。

需求:查询collection当天满足指定条件的数据,并根据数据进行每个小时的pv,uv,ui统计

collection中存放浏览器访问记录

首先我们先使用mongdb的语句把查询语句写出来,这个很关键。如果不是很了解,就去解读一下官方文档吧,一言难尽。

db.runCommand({
    mapreduce:‘visits‘,   
    query:{ ‘sh.si‘:100650, _id: { $gte: ObjectId(‘59931a800000000000000000‘) }},
    map:function(){        
                emit(                   
                    {hours:this._id.getTimestamp().getHours()},{pv:1,ip:this.ip,ui:this.ui,hours:this._id.getTimestamp().getHours()}        
                    );
                },
    reduce:function(k,values){
        var data = {
            hours:0,
            pv: 0,
            ip:[],
            ui:[]                        
        };        
        values.forEach(function (v) {              
            data.hours=v.hours;
            data.pv+=v.pv;
            if(v.ui)
            data.ui.push(v.ui);
             if(v.ip)
            data.ip.push(v.ip);
            });        
        return data;           
        },
        finalize:function(key,v){
            v.hours=NumberInt(v.hours);
            v.pv=NumberInt(v.pv);          
            if(v.ui)
            v.ui=NumberInt(ArrayGroup(v.ui).uni - 1);
            else
            v.ui=0; 
            if(v.ip)
            v.ip=NumberInt(ArrayGroup(v.ip).uni - 1);
            else
            v.ip=0;
            return v;
            },
        out:{inline:1}
    });

执行之后会得到这样的结果

技术分享
/* 1 */
{
    "results" : [ 
        {
            "_id" : {
                "hours" : 1.0
            },
            "value" : {
                "pv" : 1,
                "ip" : 0,
                "ui" : 0.0,
                "hours" : 1
            }
        }, 
        {
            "_id" : {
                "hours" : 4.0
            },
            "value" : {
                "pv" : 1,
                "ip" : 0,
                "ui" : 0.0,
                "hours" : 4
            }
        }, 
        {
            "_id" : {
                "hours" : 5.0
            },
            "value" : {
                "pv" : 1,
                "ip" : 0,
                "ui" : 0.0,
                "hours" : 5
            }
        }, 
        {
            "_id" : {
                "hours" : 6.0
            },
            "value" : {
                "hours" : 6,
                "pv" : 4,
                "ip" : 0,
                "ui" : 0
            }
        }, 
        {
            "_id" : {
                "hours" : 7.0
            },
            "value" : {
                "pv" : 1,
                "ip" : 0,
                "ui" : 0,
                "hours" : 7
            }
        }, 
        {
            "_id" : {
                "hours" : 8.0
            },
            "value" : {
                "hours" : 8,
                "pv" : 8,
                "ip" : 5,
                "ui" : 4
            }
        }, 
        {
            "_id" : {
                "hours" : 9.0
            },
            "value" : {
                "hours" : 9,
                "pv" : 10,
                "ip" : 3,
                "ui" : 0
            }
        }, 
        {
            "_id" : {
                "hours" : 10.0
            },
            "value" : {
                "hours" : 10,
                "pv" : 9,
                "ip" : 5,
                "ui" : 2
            }
        }, 
        {
            "_id" : {
                "hours" : 11.0
            },
            "value" : {
                "hours" : 11,
                "pv" : 17,
                "ip" : 9,
                "ui" : 1
            }
        }, 
        {
            "_id" : {
                "hours" : 12.0
            },
            "value" : {
                "hours" : 12,
                "pv" : 20,
                "ip" : 8,
                "ui" : 2
            }
        }, 
        {
            "_id" : {
                "hours" : 13.0
            },
            "value" : {
                "hours" : 13,
                "pv" : 23,
                "ip" : 5,
                "ui" : 2
            }
        }, 
        {
            "_id" : {
                "hours" : 14.0
            },
            "value" : {
                "hours" : 14,
                "pv" : 33,
                "ip" : 9,
                "ui" : 2
            }
        }, 
        {
            "_id" : {
                "hours" : 15.0
            },
            "value" : {
                "hours" : 15,
                "pv" : 52,
                "ip" : 14,
                "ui" : 4
            }
        }
    ],
    "counts" : {
        "input" : NumberLong(180),
        "emit" : NumberLong(180),
        "reduce" : NumberLong(26),
        "output" : NumberLong(13)
    },
    "timeMillis" : 250,
    "timing" : {
        "shardProcessing" : 152,
        "postProcessing" : 97
    },
    "shardCounts" : {
        "ins-cluster-01/ins-mongodb-01:27018,ins-mongodb-03:27018" : {
            "input" : 90,
            "emit" : 90,
            "reduce" : 9,
            "output" : 12
        },
        "ins-cluster-02/ins-mongodb-02:27019,ins-mongodb-04:27019" : {
            "input" : 90,
            "emit" : 90,
            "reduce" : 8,
            "output" : 10
        }
    },
    "postProcessCounts" : {
        "ins-cluster-01/ins-mongodb-01:27018,ins-mongodb-03:27018" : {
            "input" : NumberLong(22),
            "reduce" : NumberLong(9),
            "output" : NumberLong(13)
        }
    },
    "ok" : 1.0
}
results

只看results返回的数据就可以

接着就可以借助这里的js脚本移植到C#程序中

  public dynamic Today(int id)
        {
            //var rel = db.DataBase.Eval("show_visited_by_hours(" + id + ")").ToBsonDocument().ToDynamic();
            //return rel;

            var query = Query.And(
                        Query.EQ("sh.si", id),
                        Query.GTE("_id", new ObjectId(new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day), 0, 0, 0))
                    );//筛选数据
            var map = new BsonJavaScript(@"function () {
                emit(                   
                    {hours:this._id.getTimestamp().getHours()},{pv:1,ip:this.ip,ui:this.ui,hours:this._id.getTimestamp().getHours()}        
                    );
                 }");//数据按需分组

            var reduce = new BsonJavaScript(@"function(k,values){
            var data = {
                hours:0,
                pv: 0,
                ip:[],
                ui: []                        
               };
             values.forEach(function (v) {
                        data.hours=v.hours;           
                        data.pv+=v.pv;
                        data.ui.push(v.ui);
                        data.ip.push(v.ip);
                        });        
                    return data; 
            }
            ");//根据map传递参数做数据操作
            var finalize = new BsonJavaScript(@"function(key,v){
            v.hours=NumberInt(v.hours);
            v.pv=NumberInt(v.pv);
            if(v.ui)
            v.ui=NumberInt(ArrayGroup(v.ui).uni - 1);
            else
            v.ui=NumberInt(0);
            if(v.ip)
            v.ip=NumberInt(ArrayGroup(v.ip).uni - 1);
            else
            v.ip=NumberInt(0);
            return v;
            }");//按照指定格式编排输出数据格式
            MapReduceOptionsBuilder mrob = new MapReduceOptionsBuilder();//定义附加项MapReduceOptions
            mrob.SetFinalize(finalize);
            mrob.SetQuery(query);
            mrob.SetOutput(MapReduceOutput.Inline);//选择直接输出结果
            var rel = db.DataBase.GetCollection("visits").MapReduce(map,reduce, mrob); //提交执行MapReduce
            var valuearray=rel.InlineResults;//获取执行结果集
            var pvcount = 0;
            var ipcount = 0;
            var uicount = 0;
            var count = new List<BsonDocument>();
            foreach (var vitem in valuearray)
            {
                pvcount+= (int)vitem["value"]["pv"];
                ipcount += (int)vitem["value"]["ip"];
                uicount += (int)vitem["value"]["ui"];
                count.Add(vitem["value"].ToBsonDocument());
            }
            var result = new { pv= pvcount, ip = ipcount, ui = uicount, count = count };//按指定格式组装数据
            return result.ToBsonDocument().ToDynamic();
        }

过程还需要多去揣摩至于Aggregate就相对很简单了,把所有的查询对象,查询聚合使用BsonDocument做好参数格式化。

再附上一个Aggregate在C#中的使用方法,相对较容易理解。

public string func_api_site_count(int sid = 0, int st = 20130226, int et = 20141231, string flag = "year", string order = "pv desc")
        {
            var sitesday = _dbs.DataBase.GetCollection("log_sites_day");
            var match = new QueryDocument();
            match.AddRange(new Dictionary<string, object> {
                { "_id.day", new Dictionary<string, object> { { "$gte", st }, { "$lte", et } }},
                { "_id.sid", new Dictionary<string, object> { { "$eq", sid } }}
            });
            var group = new Dictionary<string, object>();
            var sort = new Dictionary<string, BsonValue>();
            var query = new Dictionary<string, object>();
            switch (flag)
            {
                case "month":
                    query.Add("$substr", new List<object>() { "$_id.day", 0, 6 });
                    break;
                case "year":
                    query.Add("$substr", new List<object>() { "$_id.day", 0, 4 });
                    break;
                case "day":
                    query.Add("$_id.day", "$_id.day");
                    break;
                case "week":
                    break;
            }

            group.Add("_id", query);
            group.Add("pv", new BsonDocument("$sum", "$pv"));
            group.Add("uv", new BsonDocument("$sum", "$uv"));
            group.Add("ui", new BsonDocument("$sum", "$ui"));
            group.Add("ip", new BsonDocument("$sum", "$ip"));
            group.Add("nuv", new BsonDocument("$sum", "$nuv"));
            group.Add("area", new BsonDocument("$push", "$area"));
            group.Add("rf", new BsonDocument("$push", "$rfsite"));
            var groupby = new Dictionary<string, BsonValue>{
                 {"_id",null},
                 {"day",new BsonDocument("$push", "$_id")},
                 {"uv",new BsonDocument("$push", "$uv")},
                 {"pv",new BsonDocument("$push", "$pv")},
                 {"ui",new BsonDocument("$push", "$ui")},
                 {"ip",new BsonDocument("$push", "$ip")},
                 {"nuv",new BsonDocument("$push", "$nuv")},
                 {"area",new BsonDocument("$push", "$area")},
                 {"rfsite",new BsonDocument("$push", "$rf")},
                 {"total",new BsonDocument("$sum", 1)},
                 {"totalpv",new BsonDocument("$sum", "$pv")},
                 {"totaluv",new BsonDocument("$sum", "$uv")},
                 {"totalui",new BsonDocument("$sum", "$ui")},
                 };

            var project = new Dictionary<string, BsonValue>
            {
                {"day",1},
                 {"pv",1},
                 {"_id",0},
                 {"total",new BsonDocument() {
                     new BsonElement("row","$total"),
                     new BsonElement("pv","$totalpv"),
                     new BsonElement("uv","$totaluv"),
                     new BsonElement("ui","$totalui"),
                 }},
                 {"area",1},
                  {"uv",1},
                  {"ui",1},
                  {"nuv",1},
                  {"ip",1},
                  {"rfsite",1},
            };

            var sitedayOptions = new List<BsonDocument>() {
                 new BsonDocument("$match",BsonDocument.Create(match)),
                 new BsonDocument("$group",BsonDocument.Create(group)),
                 new BsonDocument("$sort",new BsonDocument("_id",1)),
                 new BsonDocument("$group",BsonDocument.Create(groupby)),
                  new BsonDocument("$project",BsonDocument.Create(project)),
            };

            var rel = sitesday.Aggregate(sitedayOptions);
            //return rel.ToBsonDocument().ToString();

            var result = rel.ResultDocuments.ElementAtOrDefault(0);
            if (rel != null && rel.ResultDocuments.Count() > 0)
            {
                var rfbson= new BsonDocument() { };
                result.ToBsonDocument().Add("rf", new BsonDocument() { });
                foreach (var rfitem in result["rfsite"][0].AsBsonArray)
                {
                    foreach (var ritem in rfitem.ToBsonDocument())
                    {
                        if (result["rf"].ToBsonDocument().FirstOrDefault(e => e.Name.Equals(ritem.Name)) == null)
                        {
                            rfbson.Add(ritem.Name, ritem.Value.AsInt32);
                        }
                        else
                        {
                            rfbson[ritem.Name] = (rfbson[ritem.Name].AsInt32 + ritem.Value.AsInt32);
                        }
                    }
                }
                result["rf"] = rfbson;
            }
            return result.ToBsonDocument().ToString();
        }

 

C#使用MapReduce实现对分片数据的分组

标签:cti   options   man   api   ring   line   com   min   splay   

原文地址:http://www.cnblogs.com/loyung/p/7375036.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!