码迷,mamicode.com
首页 > 其他好文 > 详细

muduo源码解析18-asynclogging类

时间:2020-09-04 17:22:47      阅读:44      评论:0      收藏:0      [点我收藏+]

标签:维护   剩余空间   左值   wap   新建   ogg   include   main   names   

asynclogging类:

作用:

这是一个异步日志类:前端多个线程只管向这个日志类的缓冲区中写入日志,后端利用一个线程把缓冲区中的日志写入文件
因此:日志数据流向过程是 [日志->缓冲区->文件]
这是一个多生产者,单消费者的任务场景,多生产者负责把日志写入缓冲区,单消费者负责把缓冲区中数据写入文件
如果只用一个缓冲区,不光要同步各个生产者,还要同步生产者和消费者,
最重要的是需要保证生产者与消费者的并发,也就是前端不断写日志到缓冲区的同时,后端可以把缓冲区写入文件
muduo利用双缓冲区,前端与后端分别维护一个vector<buffer>,里面存放多个小的buffer
前端不断地向vector<buffer>1中写入数据,后端不断地从vector<buffer>2中写日志数据到文件中
每隔一段时间(3s)或者1个buffer满了就交换vector<buffer>1和vector<buffer>2,
由此来保证前端和后端日志线程的并发执行.


前后端缓冲区结构示例:

前端缓冲区 m_buffers [m_currentBuffer][m_nextBuffer][...]
后端缓冲区 bufferToWrite [newBuffer1][newBuffer2][...]

下面是操作步骤:

多个前端线程:
获得锁,
m_currentBuffer空间可用,写入m_currentBuffer
如果m_currentBuffer空间满了,立马把m_currentBuffer写入到m_buffers中;如果m_nextBuffer没满就让
m_nextBuffer作为m_currentBuffer;也就是m_currentBuffer=std::move(m_nextBuffer);
m_nextBuffer也满了就新建一个buffer作为m_currentBuffer;m_currentBuffer.reset(new Buffer);


单个后端线程:
不断等待条件变量m_cond(唤醒条件:过了3s或者m_currentBuffer满了),
被唤醒后把m_currentBuffer中的数据加入到m_buffers;让m_newBuffer1作为m_currentBuffer;
交换m_buffers和buffersToWrite,如果需要,让m_newBuffer2作为m_nextBuffer;
把buffersToWrite中的数据写入到文件中,写完之后留两块buffer作为newBuffer1和newBuffer2

asynclogging.h:

#ifndef ASYNCLOGGING_H
#define ASYNCLOGGING_H

#include"base/blockingqueue.h"
#include"base/boundedblockingqueue.h"
#include"base/countdownlatch.h"
#include"base/mutex.h"
#include"base/thread.h"
#include"base/logstream.h"

#include<atomic>
#include<vector>

namespace mymuduo{

class asynclogging:noncopyable
{
public:
    asynclogging(const string& basename,off_t rollSize,int flushInterval=3);
    ~asynclogging()
    {
        if(m_running)
            this->stop();
    }

    //前端线程使用,把数据追加到缓冲区中
    void append(const char* logline, int len);

    void start()
    {
        m_running=true;
        //m_thread在构造函数中被定义为thread(std::bind(&asynclogging::threadFunc,this),"Loggins")
        //m_thread要执行的线程函数就是threadFunc()函数
        m_thread.start();
        //必须等到m_latch=0时才能返回,m_latch在threadFunc()执行时变为0,
        //保证初始化工作都做好了,后端日志线程已经启动才可以返回.
        m_latch.wait();
    }

    //停止写日志线程
    void stop()
    {
        m_running=false;
        m_cond.notify();
        m_thread.join();
    }

private:
    //后端日志线程函数,用于把缓冲区日志写到文件中去.
    void threadFunc();
    //kLargeBuffer=4000*1000 也就是FixedBuffer内部字符数组大小为4000*1000,4MB
    typedef detail::FixedBuffer<detail::kLargeBuffer> Buffer;
    typedef std::vector<std::unique_ptr<Buffer>> BufferVector;//vector,里面存放Buffer之智能指针
    typedef BufferVector::value_type BufferPtr; //std::unique_ptr<Buffer>类型

    const int m_flushInterval;      //超时时间,每隔一段时间写日志,3s
    std::atomic<bool> m_running;    //是否正在运行
    const string m_basename;        //日志名字
    const off_t m_rollSize;         //预留的日志大小
    mymuduo::thread m_thread;       //后端线程,用于把日志写入日志文件
    mymuduo::countdownlatch m_latch;//倒计时,用于指示日志记录器何时开始工作
    mymuduo::mutexlock m_mutex;     //互斥锁
    mymuduo::condition m_cond;      //条件变量

    BufferPtr m_currentBuffer;      //当前缓冲区
    BufferPtr m_nextBuffer;         //预留缓冲区
    BufferVector m_buffers;         //缓冲区队列,待写入文件

};

}

#endif // ASYNCLOGGING_H

asynclogging.cpp:

 

#include "asynclogging.h"

#include"base/logfile.h"
#include"base/timestamp.h"

namespace mymuduo {


asynclogging::asynclogging(const string& basename,off_t rollSize,int flushInterval)
    :m_flushInterval(flushInterval),m_running(false),m_basename(basename),
      m_rollSize(rollSize),m_thread(std::bind(&asynclogging::threadFunc,this),"Loggins"),
      m_latch(1),m_mutex(),m_cond(m_mutex),m_currentBuffer(new Buffer),
      m_nextBuffer(new Buffer),m_buffers()
{
    m_currentBuffer->bzero();
    m_nextBuffer->bzero();
    m_buffers.reserve(16);
}

//所有LOG_*宏定义都会调用到这个append函数,用于把日志写到缓冲区中
//2个默认缓冲区,m_currentBuffer和m_nextBuffer
//current写满了换next,next也写满了新建一个
void asynclogging::append(const char* logline,int len)
{
    //加锁操作日志缓冲区
    mutexlockguard mlg(m_mutex);
    //当前缓冲区剩余空间够,直接写到当前缓冲区
    if(m_currentBuffer->avail()>len)
        m_currentBuffer->append(logline,len);
    else        //当前缓冲区被写满日志了,换m_nextBuffer来写
    {

        //std::move把左值强制转化成右值引用,可以通过右值引用使用该值,来用于移动语义,
        //例如用vector::push_back()会开辟内存拷贝元素,若是使用了std::move就不会发生复制
        //在这里意思就是把m_currentBuffer对象的所有权剥夺,转移到m_buffers.back()上面
        //不需要开辟内存拷贝元素,由于m_currentBuffer所有权被剥夺,因此m_currentBuffer变为空
        m_buffers.push_back(std::move(m_currentBuffer));
        //m_nextBuffer可用就用m_nextBuffer,否则就新建一个额外的缓冲区来写
        if(m_nextBuffer)
            m_currentBuffer=std::move(m_nextBuffer);
        else
            m_currentBuffer.reset(new Buffer);
        //把日志写到m_currentBuffer中
        m_currentBuffer->append(logline,len);
        //通知m_buffers中有数据了,可以把m_buffers中数据写到文件中去了
        m_cond.notify();
    }
}

//这个函数是后端线程函数,用于周期性地把日志写入到文件中去
//每隔m_flushInterval(3)秒 或者是 m_currentBuffer满了 , 就把m_currentBuffer加入到m_buffers中
//
void asynclogging::threadFunc()
{
    assert(m_running==true);
    m_latch.countDown();    //latch变为0,start函数可以返回了
    logfile output(m_basename,m_rollSize,false);    //logfile用于写日志到文件中
    //又新建了2块缓冲区,属于后端线程的2块缓冲区
    BufferPtr newBuffer1(new Buffer);
    BufferPtr newBuffer2(new Buffer);
    newBuffer1->bzero();
    newBuffer2->bzero();
    //这块缓冲区vector属于后端线程,用于和前端的m_buffers进行交换
    BufferVector buffersToWrite;
    buffersToWrite.reserve(16);

    while(m_running)
    {
        //保证后端缓冲区newBuffer1,2和后端缓冲区队列buffersToWrite是空的
        assert(newBuffer1 && newBuffer1->length() == 0);
        assert(newBuffer2 && newBuffer2->length() == 0);
        assert(buffersToWrite.empty());

        //这一块主要是用于把m_buffers中的数据交换到buffersToWrite中
        {
            mutexlockguard mlg(m_mutex);
            //每隔3s或是m_currentBuffer满了,就把当前m_currentBuffer加入到m_buffers中
            if(m_buffers.empty())
                m_cond.waitForSeconds(m_flushInterval);
            m_buffers.push_back(std::move(m_currentBuffer));//m_currentBuffer添加到m_buffers
            m_currentBuffer=std::move(newBuffer1);  //m_currentBuffer现在是newBuffer1

            buffersToWrite.swap(m_buffers);  //交换buffersToWrite和m_buffers
            if(!m_nextBuffer)
                m_nextBuffer=std::move(newBuffer2); //m_nextBuffer现在是newBuffer2
        }

        assert(!buffersToWrite.empty());    //保证始终有数据可以写入到文件中去

        //把m_buffers中的数据转移到bufferToWrite中
        //如果要写入文件的buffersToWrite中buffer数目大于25,就删除多余的数据
        //删除的目的是为了解决日志消息堆积的问题:前端日志记录太快,后端来不及写到文件中去
        if (buffersToWrite.size() > 25)
        {
            char buf[256];
            snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
                     timestamp::now().toFormattedString().c_str(),
                     buffersToWrite.size()-2);
            fputs(buf, stderr);
            output.append(buf, static_cast<int>(strlen(buf)));
            //在buffersToWrite中就留两块缓冲区,其他多余的缓冲区的全部删除
            buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
        }
        //把bufferToWrite中的数据全部写入到文件中
        for (const auto& buffer : buffersToWrite)
        {
            // FIXME: use unbuffered stdio FILE ? or use ::writev ?
            output.append(buffer->data(), buffer->length());
        }

        //重新调整bufferToWrite的大小,仅保留2个buffer用于newBuffer1和newBuffer2
        if (buffersToWrite.size() > 2)
        {
            // drop non-bzero-ed buffers, avoid trashing
            buffersToWrite.resize(2);
        }


        if (!newBuffer1)
        {
            assert(!buffersToWrite.empty());
            //从bufferToWrite中弹出一个buffer作为newBuffer1
            newBuffer1 = std::move(buffersToWrite.back());
            buffersToWrite.pop_back();
            newBuffer1->reset();
        }

        if (!newBuffer2)
        {
            assert(!buffersToWrite.empty());
            //在弹出一个buffer作为newBuffer2
            newBuffer2 = std::move(buffersToWrite.back());
            buffersToWrite.pop_back();
            newBuffer2->reset();
        }

        buffersToWrite.clear(); //保证buffersToWrite为空,要不然和前端m_buffers交换会出问题
        output.flush();
    }
    output.flush();

}


}//namespace mymuduo

测试代码:

#include "base/asynclogging.h"
#include "base/logging.h"
#include "base/timestamp.h"
#include"base/threadpool.h"

#include <stdio.h>
#include <sys/resource.h>
#include <unistd.h>
#include<string>

#include<iostream>

off_t kRollSize = 500*1000*1000;

//全局异步日志类指针,只想主线程唯一的异步日志对象
mymuduo::asynclogging* g_asyncLog = NULL;

//日志输出函数,替换掉默认的输出到stdout上,现在利用asynclogging把日志输出到文件中
void asyncOutput(const char* msg, int len)
{
    g_asyncLog->append(msg, len);
}

//线程函数,每隔一段时间写一个日志
void workerThread()
{
    mymuduo::currentthread::sleepUsec(1000*500);
    //LOG_WARN实质是创建一个临时变量logger,析构时会调用output函数,就是上面那个asyncOutPut
    LOG_WARN<<mymuduo::currentthread::tid();
}

int main()
{

    //设置日志参数
    mymuduo::logger::setOutput(asyncOutput);    //输出方式,改成文件
    mymuduo::logger::setLogLevel(mymuduo::logger::WARN);

    //主线程唯一的一步日志类
    mymuduo::asynclogging log("zqc",kRollSize);
    log.start();
    g_asyncLog=&log;

    //typedef void (*outputFunc)(const char* msg,int len);

    //用线程池实现多线程打印日志
    mymuduo::threadpool tp("testlog_threadpool");
    tp.setMaxQueueSize(50);
    tp.start(5);
    for(int i=0;i<100;i++)
        tp.run(workerThread);

    mymuduo::currentthread::sleepUsec(1000*1000*5);

    tp.stop();

}

部分结果(文件:zqc.20200826-162021.master.72464.log)

20年08月27日 星期4 00:20:21.1598458821 72482 WARN 72482 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72483 WARN 72483 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72485 WARN 72485 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72484 WARN 72484 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72482 WARN 72482 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72485 WARN 72485 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72484 WARN 72484 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72486 WARN 72486 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72483 WARN 72483 - main.cpp:26

 

muduo源码解析18-asynclogging类

标签:维护   剩余空间   左值   wap   新建   ogg   include   main   names   

原文地址:https://www.cnblogs.com/woodineast/p/13569046.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!