标签:
HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行。
如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件。
首先添加一个属性,可配置为day,hour,min
private String timeRollerFlag;
timeRollerFlag = context.getString("hdfs.timeroller.flag", Constants.defaultTimeRollerFlagDay);public class Constants {
public static final String defaultTimeRollerFlagDay = "day";
public static final String timeRollerFlagHour = "hour";
public static final String timeRollerFlagMin = "min";
}BucketWriter bucketWriter = new BucketWriter(rollInterval,
rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, closeCallback,
lookupPath, callTimeout, callTimeoutPool, retryInterval,
tryCount,timeRollerFlag);下面看org.apache.flume.sink.hdfs.BucketWriter的修改:
private final String timeRollerFlag; private Calendar calendar = Calendar.getInstance(); private int lastDayOfYear; private int lastYear; private int lastHour; private int lastMin; private int nowDayOfYear; private int nowYear; private int nowHour; private int nowMin;
private static Date fileOpenTime = null;
<pre name="code" class="java">// when open the file in hdfs with inUseSuffix,instantiate the
// fileOpenTime
fileOpenTime = new Date();
} catch (Exception ex) {if (!isOpen) {
if (closed) {
throw new BucketClosedException("This bucket writer was closed and "
+ "this handle is thus no longer valid");
}
open();
} else {
LOG.debug("##############the file is opened");
calendar.setTime(fileOpenTime);
lastDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
lastYear = calendar.get(Calendar.YEAR);
lastHour = calendar.get(Calendar.HOUR_OF_DAY);
lastMin = calendar.get(Calendar.MINUTE);
Date now = new Date();
calendar.setTime(now);
nowDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
nowYear = calendar.get(Calendar.YEAR);
nowHour = calendar.get(Calendar.HOUR_OF_DAY);
nowMin = calendar.get(Calendar.MINUTE);
LOG.debug("fileOpenTime = {},nowTime = {}", JodaTimeUtil.parseToString(fileOpenTime,
JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL), JodaTimeUtil.parseToString(now,
JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL));
// 年份相同,日期+1,年份+1,now日期=1
boolean condition1 = (lastYear == nowYear && (nowDayOfYear == (lastDayOfYear + 1)))
|| (nowYear == (lastYear + 1) && nowDayOfYear == 1);
// day相同,小时+1,或者day不同,小时=0
boolean condition2 = (lastDayOfYear == nowDayOfYear && nowHour == (lastHour + 1))
|| (lastDayOfYear != nowDayOfYear && nowHour == 0);
// hour相同,分钟+1,或者hour不同,分钟=0
boolean condition3 = (lastHour == nowHour && nowMin == (lastMin + 1))
|| (lastHour != nowHour && nowMin == 0);
// 判断滚动标识
if (timeRollerFlag.equals(Constants.defaultTimeRollerFlagDay)) {
if (condition1) {
LOG.debug("rollflag = {},rolling", Constants.defaultTimeRollerFlagDay);
close();
open();
}
} else if (timeRollerFlag.equals(Constants.timeRollerFlagHour)) {
if (condition2) {
LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagHour);
close();
open();
}
} else if (timeRollerFlag.equals(Constants.timeRollerFlagMin)) {
if (condition3) {
LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagMin);
close();
open();
}
}
}
望各位不吝指教!!
【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现
标签:
原文地址:http://blog.csdn.net/simonchi/article/details/45365377