码迷,mamicode.com
首页 > 其他好文 > 详细

Zookeeper源码阅读(三)数据存储-日志

时间:2018-09-17 22:59:26      阅读:472      评论:0      收藏:0      [点我收藏+]

标签:前言   could   情况   else   getch   view   tar   理解   RoCE   

前言

接着上篇的内容,上一篇主要说了下Zookeeper内部的数据存储基本单位datanode和内存中维护的Datatree,还有和配额有关的三个实体类。具体在zk初始化时是怎么生成datatree等等后面会说。这一篇主要说下事务日志,后面也有用到。

事务日志

关于zookeeper事务日志的格式,可以参考下 zk事务日志,其实简单说也就是header+body两个部分,header有版本等信息,是定长的。而body不是,body里放的就是真正要存的那些操作的信息和校验值等。

org.apache.zookeeper.server.persistence.TxnLog这个接口里定义了事务日志的一些方法,具体含义在它的实现类中解释。

下面是TxnLog的实现类FileTxnLog中关于事务日志格式的解释,很简单。

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 * 
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes//暂时没用
 *   }
 * 
 * TxnList:
 *     Txn || Txn TxnList
 *     
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 * 
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 * 
 * Txnlen:
 *     len 4bytes
 * 
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *     
 * Record:
 *     See Jute definition file for details on the various record types
 *      
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote> 
 */

下面可以通过FileTxnLog的几个主要方法来看下生成事务日志的主要流程:

日志的写入:
public synchronized boolean append(TxnHeader hdr, Record txn)
    throws IOException
{
    //TxnHeader不可以为空
    if (hdr == null) {
        return false;
    }

    //根据事务id来判断目前最大的zxid,为了判断是否是和上一个可写的事务日志有关联。
    if (hdr.getZxid() <= lastZxidSeen) {
        LOG.warn("Current zxid " + hdr.getZxid()
                + " is <= " + lastZxidSeen + " for "
                + hdr.getType());
    } else {
        lastZxidSeen = hdr.getZxid();
    }

    //logStream即日志流为空,用来存序列化数据
    if (logStream==null) {
        if(LOG.isInfoEnabled()){
            LOG.info("Creating new log file: log." +
                    Long.toHexString(hdr.getZxid()));
        }

        //根据zxid创建新的文件,这里貌似有更新,根据一些博客和书,以前只有在没有关联以前的事务日志时会创建新文件,现在看代码感觉只会在log中有warn,还是会创建新文件
        logFileWrite = new File(logDir, ("log." +
                Long.toHexString(hdr.getZxid())));
        //两个流
        fos = new FileOutputStream(logFileWrite);
        logStream=new BufferedOutputStream(fos);
        oa = BinaryOutputArchive.getArchive(logStream);
        FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
        fhdr.serialize(oa, "fileheader");
        // Make sure that the magic number is written before padding.
        //提取文件流
        logStream.flush();
        currentSize = fos.getChannel().position();
        streamsToFlush.add(fos);
    }
    //这一步判断剩余空间不足4k时填充文件至64M,为了效率,具体的应该是利用操作系统底层的原理。我的理解是一次开一大片区域,这样就不用每次写文件的时候硬盘去寻址什么的了。以后可以研究下。
    padFile(fos);
    //把事务头和事务体序列化
    byte[] buf = Util.marshallTxnEntry(hdr, txn);
    if (buf == null || buf.length == 0) {
        throw new IOException("Faulty serialization for header " +
                "and txn");
    }
    //生成校验值,用了Adler32算法
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    //写入buffer流中
    oa.writeLong(crc.getValue(), "txnEntryCRC");
    Util.writeTxnBytes(oa, buf);

    return true;
}

配合图片(来自 斩秋的专栏)看更好理解:

技术分享图片

在整个写入的方法中总共有5步:

  1. 基本校验即文件名中zxid的生成,文件的生成;
  2. 预分配空间
  3. 事务序列化
  4. 生成校验
  5. 写入buffer流。

在上面代码的注释中应该清楚得能看出。

另外,zookeeper里是在SyncRequestProcessor类中写入日志文件的,大致的策略就是从请求写入的栈中pop出记录,一条一条写入,同时,在写入时会又count去记录,如果count大于snapCount / 2 + randRoll,snapcount时可配的,而randRoll是基于snapcount生成的随机值,此时就会rolllog。开启一个新文件的写入。

但是这时候还没有写入文件!!!只在buffer流中。真正写入文件是在commit方法中。

public synchronized void commit() throws IOException {
    if (logStream != null) {
        logStream.flush();
    }
    for (FileOutputStream log : streamsToFlush) {
        //这里提取文件流
        log.flush();
        //可以通过设置zookeeper.forceSync来控制是否需要主动调用该接口,对应这里的forceSync
        if (forceSync) {
            long startSyncNS = System.nanoTime();
            //写入
            log.getChannel().force(false);

            long syncElapsedMS =
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
            if (syncElapsedMS > fsyncWarningThresholdMS) {
                LOG.warn("fsync-ing the write ahead log in "
                        + Thread.currentThread().getName()
                        + " took " + syncElapsedMS
                        + "ms which will adversely effect operation latency. "
                        + "See the ZooKeeper troubleshooting guide");
            }
        }
    }
    while (streamsToFlush.size() > 1) {
        streamsToFlush.removeFirst().close();
    }
}
获取日志文件:
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
    List<File> files = Util.sortDataDir(logDirList, "log", true);
    long logZxid = 0;
    // Find the log file that starts before or at the same time as the
    // zxid of the snapshot
    //目的就是为了找到在快照之前的最大的那个file的zxid,最大的肯定是最接近快照id的。
    for (File f : files) {
        //从名字中取zxid
        long fzxid = Util.getZxidFromName(f.getName(), "log");
        if (fzxid > snapshotZxid) {
            continue;
        }
        // the files
        // are sorted with zxid's
        if (fzxid > logZxid) {
            logZxid = fzxid;
        }
    }
    List<File> v=new ArrayList<File>(5);
    //遍历,把所有早于这个id的文件都返回。
    for (File f : files) {
        long fzxid = Util.getZxidFromName(f.getName(), "log");
        if (fzxid < logZxid) {
            continue;
        }
        v.add(f);
    }
    return v.toArray(new File[0]);

}
获取日志中最近的zxid:
public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    //取文件名的zxid(这个文件里最早的zxid)
    long maxLog=files.length>0?
            Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

    // if a log file is more recent we must scan it to find
    // the highest zxid
    long zxid = maxLog;
    TxnIterator itr = null;
    try {
        FileTxnLog txn = new FileTxnLog(logDir);
         //所有id>maxlog的文件的都在itr中
        itr = txn.read(maxLog);
        while (true) {
            if(!itr.next())
                break;
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        close(itr);
    }
    return zxid;
}
可以看下 filetxnlog解析关于read的描述很棒。

可以结合下面这张解释反序列化的图看,主要是FileTxnIteratorz这个迭代器的init方法里把所有的日志按需排列之后根据zxid去读的逻辑。

技术分享图片

删除日志:
public boolean truncate(long zxid) throws IOException {
    FileTxnIterator itr = null;
    try {
        itr = new FileTxnIterator(this.logDir, zxid);
        PositionInputStream input = itr.inputStream;
        if(input == null) {
            throw new IOException("No log files found to truncate! This could " +
                    "happen if you still have snapshots from an old setup or " +
                    "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
        }
        //在上面创建FileTxnIterator时位置就已经计算好了,如果大于这个zxid的全删掉,包含这个zxid的会删这个文件内大于这个zxid的部分。
        long pos = input.getPosition();
        // now, truncate at the current position
        RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
        raf.setLength(pos);
        raf.close();
        while (itr.goToNextLog()) {
            if (!itr.logFile.delete()) {
                LOG.warn("Unable to truncate {}", itr.logFile);
            }
        }
    } finally {
        close(itr);
    }
    return true;
}

在zk恢复时,非leader机器上比leader服务器的zxid大的所有事务都要删掉。因为zk的原则时只要集群中有leader,那么所有机器必须与leader同步!一旦有这种情况,leader会给这台机器发送TRUNC命令,然后就会进行这个日志截断的方法。

日志滚动
/**
 * rollover the current log file to a new one.
 * @throws IOException
 */
public synchronized void rollLog() throws IOException {
    if (logStream != null) {
        this.logStream.flush();
        this.logStream = null;
        oa = null;
    }
}

光看这里其实挺难理解这个函数的意义,结合在SyncRequestProcessor的使用

if (zks.getZKDatabase().append(si)) {
    logCount++;
    //上面说append方法的时候说到的产生新文件的条件
    if (logCount > (snapCount / 2 + randRoll)) {
        setRandRoll(r.nextInt(snapCount/2));
        // roll the log
        //结合这里去看就知道在这里把logStream原有的buffer取出然后设置成null,这样下次写的时候就会创建新的日志文件
        zks.getZKDatabase().rollLog();
        // take a snapshot
        if (snapInProcess != null && snapInProcess.isAlive()) {
            LOG.warn("Too busy to snap, skipping");
        } else {
            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                    public void run() {
                        try {
                            zks.takeSnapshot();
                        } catch(Exception e) {
                            LOG.warn("Unexpected exception", e);
                        }
                    }
                };
            snapInProcess.start();
        }
        logCount = 0;
    }
}
可视化

Zk提供了一个负责可视化的工具类LogFormatter,里面有一个main方法可以直接执行去可视化log文件。里面的逻辑大体上说是简易版的read方法。

zk持久化这个博客里做了一个简单的demo,可以看看。之前也跑过这个类,也发现了和博主一样的问题,就是序列化和反序列化的tag不一样,还不太清楚为啥能正常工作。。。

思考

大概知道了怎么去写日志和度日志,但是有几点不是很清楚:

rolllog,append,commit这些里面都会有stream的flush,commit里强制调用了force方法才会强制操作系统写入硬盘,这个还需要再了解了解工作原理;

为什么在写入和可视化的时候用不同的tag能正常解析???

Adler32算法的工作原理

Zookeeper源码阅读(三)数据存储-日志

标签:前言   could   情况   else   getch   view   tar   理解   RoCE   

原文地址:https://www.cnblogs.com/gongcomeon/p/9665285.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!