标签:
.net + sql
#region 构造统计日期临时表
DataTable originalDates = new DataTable();
DataColumn col = new DataColumn("OriginalStartDate", typeof(Int32));
originalDates.Columns.Add(col);
col = new DataColumn("OriginalEndDate", typeof(Int32));
originalDates.Columns.Add(col);
int step = endDate.Subtract(startDate).Days + 1;
for (int i = 1; i <= 30; i++)
{
//如果是天的周期,增加到30日留存
if (period != PeriodOptions.Daily && i > 6) break;
DataRow newRow = originalDates.NewRow();
DateTime pStartDate;
DateTime pEndDate = Utility.GetNextStatDate(period, startDate, endDate, -i, out pStartDate);
newRow["OriginalStartDate"] = int.Parse(pStartDate.ToString("yyyyMMdd"));
newRow["OriginalEndDate"] = int.Parse(pEndDate.ToString("yyyyMMdd"));
originalDates.Rows.Add(newRow);
}
#endregion
//计算汇总
SqlParameter[] paramters = new SqlParameter[]
{
SqlParamHelper.MakeInParam("@dt", SqlDbType.Structured),
SqlParamHelper.MakeInParam("@StartDate", SqlDbType.Int, 4, startDate.ToString("yyyyMMdd")),
SqlParamHelper.MakeInParam("@EndDate", SqlDbType.Int, 4, endDate.ToString("yyyyMMdd")),
SqlParamHelper.MakeInParam("@Period", SqlDbType.TinyInt, 1, (int)period)
};
paramters[0].TypeName = "dbo.OriginalDatesType";
paramters[0].Value = originalDates;
DataSet ds = SqlHelper.ExecuteDataset(ComputingDB_ConnString, CommandType.StoredProcedure, "PR_StatRetainedUsers", paramters);
ALTER PROCEDURE [dbo].[PR_StatRetainedUsers] ( @dt OriginalDatesType readonly, @StartDate int, @EndDate int, @Period tinyint ) AS begin create table #RetainedUsers(SoftID int,Platform tinyint,ChannelID int,OriginalStatDate int,RetainedUserCount int) declare @sql nvarchar(max); if (@Period = 1) begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ...‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B on A.LoginDate=@StartDate and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null) group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ exec sp_executesql @sql, N‘@StartDate int,@dt OriginalDatesType readonly‘, @StartDate, @dt end else begin if (@StartDate/10000 = @EndDate / 10000) begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B on A.Part=@part and A.LoginDate between @StartDate and @EndDate and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null) group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ end else begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ( select * from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ with(nolock) where Part=@part and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null) union all select * from .... ‘ + CAST((@EndDate / 10000) as nvarchar(10)) + N‘ with(nolock) where Part=@part and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null)) A inner join @dt B on A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ end declare @part tinyint = 0; while @part < 128 begin --if (@Period <> 12 or @part = 0) begin exec sp_executesql @sql, N‘@part tinyint,@StartDate int,@EndDate int,@dt OriginalDatesType readonly‘, @Part, @StartDate, @EndDate, @dt; --end set @part = @part + 1 end end select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,-1 ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount from #RetainedUsers group by SoftID,Platform,OriginalStatDate select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,ChannelID ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount from #RetainedUsers group by SoftID,Platform,ChannelID,OriginalStatDate drop table #RetainedUsers end
hadoop
@MapConfig
public static class MapTask extends Mapper<LongWritable, Text, Text, Text> {
private Text mKey = new Text();
private Text mValue = new Text();
private StringBuilder sb = new StringBuilder();
private Map<String,Integer> map=new HashMap<String,Integer>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
String enddate =context.getConfiguration().get("key_enddate");
int period =Integer.parseInt(context.getConfiguration().get("period"));
int step=Integer.parseInt(context.getConfiguration().get("step"));
DateTime curstatdate=DateTime.parseToDateTime(enddate,"yyyyMMdd");
//设置要计算留存的时间
if (period==PeriodOptions.GetValueByEnum(PeriodOptions.Daily)){
for (DateTime startdate=curstatdate.addDays(-30);new Double(DateTime.minusDay(curstatdate,startdate)).intValue()>0;startdate=startdate.addDays(1)){
Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
map.put(startdate.toString("yyyyMMdd"),tmp);
}
}else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.Weekly)){
for (DateTime startdate=curstatdate.addDays(-48);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){
if (WeekOptions.GetEnumByValue(startdate.getDayOfWeek())== WeekOptions.SUNDAY){
Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
for (DateTime substartdate=startdate.addDays(-step);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>=0;substartdate=substartdate.addDays(1)){
map.put(substartdate.toString("yyyyMMdd"),tmp);
}
}
}
}else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.NaturalMonth)){
for (DateTime startdate=curstatdate.addMonths(-7).addDays(1);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){
if (startdate.addDays(1).day()==1){
Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
for (DateTime substartdate=startdate.addMonths(-1).addDays(1);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>0;substartdate=substartdate.addDays(1)){
map.put(substartdate.toString("yyyyMMdd"),tmp);
}
}
}
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(str, "\t");
if (!map.containsKey(params[23]) || (Integer.parseInt(params[15])&1)!=0 ){
return;
}
int firstlogintime=map.get(params[23]);
String enddate =context.getConfiguration().get("key_enddate");
sb.delete(0, sb.length());
//key<0:softid,2:platform,22 firstchannelid>
//value<23 firstlogindate,4:logindate,3:imei>
sb.append(params[0]).append("\t")
.append(params[2]).append("\t")
.append(params[22]);
mKey.set(sb.toString());
mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]);
context.write(mKey, mValue);
sb.delete(0, sb.length());
sb.append(params[0]).append("\t")
.append(params[2]).append("\t")
.append(-1);
mKey.set(sb.toString());
mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]);
context.write(mKey, mValue);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
map.clear();
}
}
//key<0:softid,2:platform,22 firstchannelid>
//value<23 firstlogindate,4:logindate,3:imei>
@CombineConfig
public static class CombineTask extends Reducer<Text, Text, Text, Text> {
Text mvalue=new Text();
//留存用户
private Multiset<String> multiset=HashMultiset.create();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
multiset.clear();
for (Text item:values){
multiset.add(item.toString());
}
for (String item:multiset.elementSet()){
mvalue.set(item);
context.write(key,mvalue);
}
}
}
//key<0:softid,2:platform,22 firstchannelid>
//value<23 firstlogindate,4:logindate,3:imei>
@ReduceConfig
public static class ReduceTask extends Reducer<Text, Text, Text, Text> {
private Text mValue = new Text();
//留存用户
private Map<String, Multiset<Object>> MRetained = new HashMap<>();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) {
MRetained.get(map.getKey()).clear();
}
MRetained.clear();
for (Text item : values) {
String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(item.toString(), "\t");
String mapkey = params[0] + "\t" + params[1];
if (!params[0].equals(params[1])) {
if (!MRetained.containsKey(mapkey)) {
MRetained.put(mapkey, HashMultiset.create());
}
MRetained.get(mapkey).add(params[2]);
}
}
String period = context.getConfiguration().get("period");
for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) {
if (map.getKey().split("\t") == null ) {
continue;
}
mValue.set(period
+ "\t" + map.getKey()
+ "\t" + 0
+ "\t" + map.getValue().elementSet().size());
//softid,platform,channelid,period,originaldate,statdate,OriginalNewUserCount,RetainedUserCount
context.write(key, mValue);
}
}
}
标签:
原文地址:http://my.oschina.net/osenlin/blog/523099