C++ Muduo 库 https://www.bilibili.com/video/BV1UE4m1R72y/?spm_id_from=333.337.search-card.all.click&vd_source=28887ecca1f25a715214067a2b3b58ac
noncopyable 禁止拷贝 我们在看Muduo库代码的时候可以看到,基本上大多数class都会继承noncopyable类,我们看一下noncopyable.h代码
#ifndef NONCOPYABLE_H #define NONCOPYABLE_H class noncopyable {public : noncopyable (const noncopyable&) = delete ; noncopyable& operator =(const noncopyable&) = delete ; protected : noncopyable () = default ; ~noncopyable () = default ; }; #endif
noncopyable会取消拷贝构造,这样子类继承它之后,在调用构造函数进行构造的时候会默认调用父类的构造函数来构造父类的属性,因此当进行拷贝构造的时候就会发现,父类没有拷贝构造,因此就无法实现,从而无法实现拷贝构造
Timestamp 时间类 主要是用于格式化输出当前的时间,向外提供接口就好,非常的简单
#ifndef TIMESTAMP_H #define TIMESTAMP_H #include <iostream> #include <string> class Timestamp {public : Timestamp (); explicit Timestamp (int64_t microSecondsSinceEpoch) ; static Timestamp now () ; std::string toString () const ; private : int64_t microSecondsSinceEpoch_; }; #endif
#include "Timestamp.h" #include <time.h> Timestamp::Timestamp ():microSecondsSinceEpoch_ (0 ) {} Timestamp::Timestamp (int64_t microSecondsSinceEpoch) : microSecondsSinceEpoch_ (microSecondsSinceEpoch) {} Timestamp Timestamp::now () { return Timestamp (time (NULL )); } std::string Timestamp::toString () const { char buf[128 ] = {0 }; tm *tm_time = localtime (µSecondsSinceEpoch_); snprintf (buf, 128 , "%4d/%02d/%02d %02d:%02d:%02d" , tm_time->tm_year + 1900 , tm_time->tm_mon + 1 , tm_time->tm_mday, tm_time->tm_hour, tm_time->tm_min, tm_time->tm_sec); return buf; }
Logger 日志 向外提供打印的接口,用以输出log信息,一共由四种信息
enum LogLevel { INFO, ERROR, FATAL, DEBUG, };
Logger class 里面只需要设置最为简单的接口就好,不需要写在txt里面,如果写的话涉及到日志的回滚等操作,很复杂。详细的可以看webserver的日志
那么这里只需要
class Logger : noncopyable {public : static Logger& instance () ; void setLogLevel (int level) ; void log (std::string message) ; private : int logLevel_; Logger (){} };
单例模式,使用static,以及构造函数私有化等操作,其他的还有设置日志级别以及写日志的基础操作就好
Logger& Logger::instance () { static Logger logger; return logger; } void Logger::setLogLevel (int level) { logLevel_ = level; } void Logger::log (std::string message) { switch (logLevel_) { case INFO: std::cout << "[INFO]" ; break ; case ERROR: std::cout << "[ERROR]" ; break ; case FATAL: std::cout << "[FATAL]" ; break ; case DEBUG: std::cout << "[DEBUG]" ; break ; default : break ; } std::cout << Timestamp::now ().toString () << " : " << message << std::endl; }
基本上到这里日志class就已经完成的,但是如果用户使用的话就需要先 instance 初始化 再 setLogLevel 才能 log 打印,会很麻烦,而且logLevel不应该由用户来设置,咱们只需要定义一些宏向外提供接口就好
#define LOG_INFO(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(INFO);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_ERROR(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(ERROR);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_FATAL(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(FATAL);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ exit(-1);\ }while(0) #ifdef MUDEBUG #define LOG_DEBUG(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(DEBUG);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #else #define LOG_DEBUG(logMsgFormat,...) #endif
完整的代码
#ifndef LOGGER_H #define LOGGER_H #include <string> #include "noncopyable.h" enum LogLevel { INFO, ERROR, FATAL, DEBUG, }; class Logger : noncopyable {public : static Logger& instance () ; void setLogLevel (int level) ; void log (std::string message) ; private : int logLevel_; Logger (){} }; #define LOG_INFO(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(INFO);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_ERROR(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(ERROR);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #define LOG_FATAL(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(FATAL);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ exit(-1);\ }while(0) #ifdef MUDEBUG #define LOG_DEBUG(logMsgFormat,...)\ do \ { \ Logger& logger = Logger::instance();\ logger.setLogLevel(DEBUG);\ char buf[1024] = {0};\ snprintf(buf,1024,logMsgFormat,##__VA_ARGS__);\ logger.log(buf);\ }while(0) #else #define LOG_DEBUG(logMsgFormat,...) #endif #endif
#include "Logger.h" #include "Timestamp.h" #include <iostream> Logger& Logger::instance () { static Logger logger; return logger; } void Logger::setLogLevel (int level) { logLevel_ = level; } void Logger::log (std::string message) { switch (logLevel_) { case INFO: std::cout << "[INFO]" ; break ; case ERROR: std::cout << "[ERROR]" ; break ; case FATAL: std::cout << "[FATAL]" ; break ; case DEBUG: std::cout << "[DEBUG]" ; break ; default : break ; } std::cout << Timestamp::now ().toString () << " : " << message << std::endl; }
InetAddress address封装 主要是将IP和PORT进行封装,提供一些基础的接口 比如 toIp toIpPort toPort等,这里只做了IPV4的支持
#ifndef INETADDRESS_H #define INETADDRESS_H #include <arpa/inet.h> #include <netinet/in.h> #include <string> class InetAddress {public : explicit InetAddress (uint16_t port = 0 , std::string ip = "127.0.0.1" ) ; explicit InetAddress (const sockaddr_in &addr) :addr_(addr){ }; std::string toIp () const ; std::string toIpPort () const ; uint16_t toPort () const ; const sockaddr_in* getSockAddr () const {return &addr_;} void setSockAddr (const sockaddr_in &addr) { addr_ = addr; } private : sockaddr_in addr_; }; #endif
#include "InetAddress.h" #include <string.h> InetAddress::InetAddress (uint16_t port, std::string ip ){ bzero (&addr_,sizeof (addr_)); addr_.sin_family = AF_INET; addr_.sin_port = htons (port); addr_.sin_addr.s_addr = inet_addr (ip.c_str ()); }; std::string InetAddress::toIp () const { char buf[64 ] = {0 }; inet_ntop (AF_INET,&addr_.sin_addr,buf,sizeof (buf)); return buf; }; std::string InetAddress::toIpPort () const { char buf[64 ] = {0 }; inet_ntop (AF_INET,&addr_.sin_addr,buf,sizeof (buf)); size_t end = strlen (buf); uint16_t port = ntohs (addr_.sin_port); sprintf (buf+end,":%u" ,port); return buf; }; uint16_t InetAddress::toPort () const { return ntohs (addr_.sin_port); };
Channel 通道 简单来说,就是将fd封装起来,和poller做配合。核心思想就是自己的事情自己做
fd一般会注册到poller里面,设置一些感兴趣的事件,比如 EPOLLIN EPOLLOUT 等,然后设置一些回调事件,当发生事件的时候就执行这些回调
要注意的是Channel会注册到poller之中,但是他俩都是由EventLoop来控制的,EventLoop里面有一个poller和多个Channel
Channel的核心思想就是自己的事情自己做,因此会有很多的操作本身的操作,比如注册事件、删除本身等操作,这些事件会会在class类里面提供接口,但是需要EventLoop来调用从而操作poller
#ifndef CHANNEL_H #define CHANNEL_H #include "noncopyable.h" #include "Timestamp.h" #include <functional> #include <memory> class EventLoop ; class Channel : noncopyable {public : using EventCallback = std::function<void ()>; using ReadEventCallback = std::function<void (Timestamp)>; Channel (EventLoop *loop,int fd); ~Channel (); void handleEvent (Timestamp receiveTime) ; void setReadCallback (ReadEventCallback cb) { readCallback_ = std::move (cb); } void setWriteCallback (EventCallback cb) { writeCallback_ = std::move (cb); } void setCloseCallback (EventCallback cb) { closeCallback_ = std::move (cb); } void setErrorCallback (EventCallback cb) { errorCallback_ = std::move (cb); } void tie (const std::shared_ptr<void > &) ; int fd () const { return fd_; } int events () const { return events_; } void set_revents (int revt) { revents_ = revt; } void enableReading () { events_ |= kReadEvent; update (); } void disableReading () { events_ &= ~kReadEvent; update (); } void enableWriting () { events_ |= kWriteEvent; update (); } void disableWriting () { events_ &= ~kWriteEvent; update (); } void disableAll () { events_ = kNoneEvent; update (); } bool isNoneEvent () const { return events_ == kNoneEvent; } bool isWriting () const { return events_ == kWriteEvent; } bool isReading () const { return events_ == kReadEvent; } int index () { return index_; } void set_index (int index) { index_ = index; } EventLoop* ownerLoop () { return loop_; } void remove () ; private : void update () ; void handleEventWithGuard (Timestamp receiveTime) ; static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop *loop_; const int fd_; int events_; int revents_; int index_; std::weak_ptr<void > tie_; bool tied_; ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; }; #endif
#include "Channel.h" #include "EventLoop.h" #include "Logger.h" #include <sys/epoll.h> const int Channel::kNoneEvent = 0 ; const int Channel::kReadEvent = EPOLLIN | EPOLLPRI; const int Channel::kWriteEvent = EPOLLOUT; Channel::Channel (EventLoop *loop,int fd) :loop_ (loop),fd_ (fd),events_ (0 ),revents_ (0 ),index_ (-1 ),tied_ (false ){} Channel::~Channel (){} void Channel::tie (const std::shared_ptr<void > &obj) { tie_ = obj; tied_ = true ; } void Channel::update () { loop_->updateChannel (this ); } void Channel::remove () { loop_->removeChannel (this ); } void Channel::handleEvent (Timestamp receiveTime) { if (tied_){ std::shared_ptr<void > guard = tie_.lock (); if (guard){ handleEventWithGuard (receiveTime); } }else { handleEventWithGuard (receiveTime); } } void Channel::handleEventWithGuard (Timestamp receiveTime) { LOG_INFO ("channel handleEvent revents:%d\n" , revents_); if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) { if (closeCallback_) { closeCallback_ (); } } if (revents_ & EPOLLERR) { if (errorCallback_) { errorCallback_ (); } } if (revents_ & (EPOLLIN | EPOLLPRI)) { if (readCallback_) { readCallback_ (receiveTime); } } if (revents_ & EPOLLOUT) { if (writeCallback_) { writeCallback_ (); } } }
Poller 封装 Muduo提供了poll与Epoll,处于复用的考虑,在使用的时候不可能提供固定poll与Epoll,因此要将两者封装一下,使用一个抽象类Poller,poll与Epoll来继承Poller,这样使用的时候直接用Poller来替代即可
抽象类Poller提供一些通用的接口,由poll与Epoll来实现,比如updateChannel removeChannel 等事件
#ifndef POLLER_H #define POLLER_H #include "noncopyable.h" #include "Timestamp.h" #include <vector> #include <unordered_map> class Channel ;class EventLoop ;class Poller : noncopyable {public : using ChannelList = std::vector<Channel*>; Poller (EventLoop *loop); virtual ~Poller () = default ; virtual Timestamp poll (int timeoutMs,ChannelList *activeChannels) = 0 ; virtual void updateChannel (Channel *channel) = 0 ; virtual void removeChannel (Channel *channel) = 0 ; bool hasChannel (Channel *channel) const ; static Poller* newDefaultPoller (EventLoop *loop) ; protected : using ChannelMap = std::unordered_map<int ,Channel*>; ChannelMap channels_; private : EventLoop *ownerLoop_; }; #endif
#include "Poller.h" #include "Channel.h" Poller::Poller (EventLoop *loop):ownerLoop_ (loop){} bool Poller::hasChannel (Channel *channel) const { auto it = channels_.find (channel->fd ()); return it != channels_.end () && it->second == channel; }
#include "Poller.h" #include <stdlib.h> #include "EpollPoller.h" Poller* Poller::newDefaultPoller (EventLoop *loop) { if (::getenv ("MUDUO_USE_POLL" )) { return nullptr ; } else { return new EpollPoller (loop); } }
EpollPoller 就是真正的 IO多路复用的实现了 poll没有实现
EpollPoller要实现Poller的接口和一些基本的操作,主要是将epoll的一些事情封装起来
#ifndef EPOLLPOLLER_H #define EPOLLPOLLER_H #include "Poller.h" #include <vector> #include <sys/epoll.h> class Channel ; class EpollPoller : public Poller {public : EpollPoller (EventLoop* loop); ~EpollPoller () override ; Timestamp poll (int timeoutMs,ChannelList *activeChannels) override ; void updateChannel (Channel *channel) override ; void removeChannel (Channel *channel) override ; private : static const int kInitEventListSize = 16 ; void fillActiveChannels (int numEvents,ChannelList *activeChannels) const ; void update (int operation,Channel *channel) ; using EventList = std::vector<epoll_event>; int epollfd_; EventList events_; }; #endif
#include "EpollPoller.h" #include "Logger.h" #include "Channel.h" #include <errno.h> #include <unistd.h> #include <strings.h> const int kNew = -1 ; const int kAdded = 1 ;const int kDeleted = 2 ;EpollPoller::EpollPoller (EventLoop* loop) :Poller (loop), epollfd_ (::epoll_create1 (EPOLL_CLOEXEC)), events_ (kInitEventListSize) { if (epollfd_ < 0 ){ LOG_FATAL ("epoll_create error:%d \n" ,errno); } } EpollPoller::~EpollPoller (){ ::close (epollfd_); } Timestamp EpollPoller::poll (int timeoutMs,ChannelList *activeChannels) { LOG_INFO ("func=%s => fd total count:%lu \n" , __FUNCTION__, channels_.size ()); int numEvents = ::epoll_wait (epollfd_,&*events_.begin (),static_cast <int >(events_.size ()),timeoutMs); int saveErrno = errno; Timestamp now (Timestamp::now()) ; if (numEvents > 0 ){ LOG_INFO ("%d events happened \n" , numEvents); fillActiveChannels (numEvents,activeChannels); if (numEvents == events_.size ()){ events_.resize (events_.size () * 2 ); } }else if (numEvents == 0 ){ LOG_DEBUG ("%s timeout! \n" , __FUNCTION__); }else { if (saveErrno != EINTR){ errno = saveErrno; LOG_ERROR ("EPollPoller::poll() err!" ); } } return now; } void EpollPoller::updateChannel (Channel *channel) { const int index = channel->index (); LOG_INFO ("func=%s => fd=%d events=%d index=%d \n" , __FUNCTION__, channel->fd (), channel->events (), index); if (index == kNew || index == kDeleted){ if (index == kNew){ int fd = channel->fd (); channels_[fd] = channel; } channel->set_index (kAdded); update (EPOLL_CTL_ADD,channel); } else { int fd = channel->fd (); if (channel->isNoneEvent ()){ update (EPOLL_CTL_DEL,channel); channel->set_index (kDeleted); }else { update (EPOLL_CTL_MOD,channel); } } } void EpollPoller::removeChannel (Channel *channel) { int fd = channel->fd (); channels_.erase (fd); LOG_INFO ("func=%s => fd=%d\n" , __FUNCTION__, fd); int index = channel->index (); if (index == kAdded){ update (EPOLL_CTL_DEL,channel); } channel->set_index (kNew); } void EpollPoller::fillActiveChannels (int numEvents,ChannelList *activeChannels) const { for (int i=0 ;i<numEvents;i++){ Channel* channel = static_cast <Channel*>(events_[i].data.ptr); channel->set_revents (events_[i].events); activeChannels->push_back (channel); } } void EpollPoller::update (int operation,Channel *channel) { epoll_event event; bzero (&event,sizeof event); int fd = channel->fd (); event.events = channel->events (); event.data.fd = fd; event.data.ptr = channel; if (::epoll_ctl (epollfd_,operation,fd,&event) < 0 ){ if (operation == EPOLL_CTL_DEL){ LOG_ERROR ("epoll_ctl del error:%d\n" , errno); }else { LOG_FATAL ("epoll_ctl add/mod error:%d\n" , errno); } } }
其他的倒是挺常规的,要注意的是kNew kAdded kDeleted三个状态,EventList 与 ChannelList的管理,activeChannels的管理等操作
还有的是update函数里面event.data.ptr = channel;这一行代码,与fillActiveChannels里面的操作相联动
前面已经说了,poller是由EventLoop来管理的,因此Timestamp EpollPoller::poll(int timeoutMs,ChannelList *activeChannels)是由EventLoop来调用的
EventLoop传给Timestamp EpollPoller::poll(int timeoutMs,ChannelList *activeChannels) activeChannels,就会得到相应的channel,后面再由EventLoop调用channel的handleEvent来处理事件就好
EventLoop Reactor 上面已经说了EventLoop负责管理一个poller与很多个Channel,这个有点复杂 要注意的是四个函数 void wakeup(); void loop(); void runInLoop(Functor cb); void queueInLoop(Functor cb); 要不然可能会迷糊,其他的就挺常规的
先看代码,先了解一下其他的函数,这里就不多讲了
#ifndef EVENTLOOP_H #define EVENTLOOP_H #include <functional> #include <vector> #include <atomic> #include <memory> #include <mutex> #include "noncopyable.h" #include "Timestamp.h" #include "CurrentThread.h" class Channel ;class Poller ;class EventLoop {public : using Functor = std::function<void ()>; EventLoop (); ~EventLoop (); void loop () ; void quit () ; Timestamp pollReturnTime () const { return pollReturnTime_; } void runInLoop (Functor cb) ; void queueInLoop (Functor cb) ; void wakeup () ; void updateChannel (Channel *channel) ; void removeChannel (Channel *channel) ; bool hasChannel (Channel *channel) ; bool isInLoopThread () const { return threadId_ == CurrentThread::tid (); } private : void handleRead () ; void doPendingFunctors () ; using ChannelList = std::vector<Channel*>; std::atomic_bool looping_; std::atomic_bool quit_; const pid_t threadId_; Timestamp pollReturnTime_; std::unique_ptr<Poller> poller_; int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_; ChannelList activeChannels_; std::atomic_bool callingPendingFunctors_; std::vector<Functor> pendingFunctors_; std::mutex mutex_; }; #endif
#include "EventLoop.h" #include "Logger.h" #include "Poller.h" #include "Channel.h" #include <sys/eventfd.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #include <memory> __thread EventLoop *t_loopInThisThread = nullptr ; const int kPollTimeMs = 10000 ;int createEventfd () { int evtfd = ::eventfd (0 , EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0 ){ LOG_FATAL ("eventfd error:%d \n" , errno); } return evtfd; } EventLoop::EventLoop () : looping_ (false ) , quit_ (false ) , callingPendingFunctors_ (false ) , threadId_ (CurrentThread::tid ()) , poller_ (Poller::newDefaultPoller (this )) , wakeupFd_ (createEventfd ()) , wakeupChannel_ (new Channel (this , wakeupFd_)) { LOG_DEBUG ("EventLoop created %p in thread %d \n" , this , threadId_); if (t_loopInThisThread) { LOG_FATAL ("Another EventLoop %p exists in this thread %d \n" , t_loopInThisThread, threadId_); }else { t_loopInThisThread = this ; } wakeupChannel_->setReadCallback (std::bind (&EventLoop::handleRead,this )); wakeupChannel_->enableReading (); } EventLoop::~EventLoop () { wakeupChannel_->disableAll (); wakeupChannel_->remove (); ::close (wakeupFd_); t_loopInThisThread = nullptr ; } void EventLoop::handleRead () { uint64_t one = 1 ; ssize_t n = read (wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR ("EventLoop::handleRead() reads %lu bytes instead of 8" , n); } } void EventLoop::loop () { looping_ = true ; quit_ = false ; LOG_INFO ("EventLoop %p start looping \n" , this ); while (!quit_){ activeChannels_.clear (); pollReturnTime_ = poller_->poll (kPollTimeMs,&activeChannels_); for (Channel* channel:activeChannels_){ channel->handleEvent (pollReturnTime_); } doPendingFunctors (); } LOG_INFO ("EventLoop %p stop looping. \n" , this ); looping_ = false ; } void EventLoop::quit () { quit_ = true ; if (!isInLoopThread ()) { wakeup (); } } void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()){ cb (); }else { queueInLoop (cb); } } void EventLoop::queueInLoop (Functor cb) { { std::unique_lock<std::mutex> lock (mutex_) ; pendingFunctors_.emplace_back (cb); } if (!isInLoopThread () || callingPendingFunctors_){ wakeup (); } } void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = write (wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR ("EventLoop::wakeup() writes %lu bytes instead of 8 \n" , n); } } void EventLoop::updateChannel (Channel *channel) { poller_->updateChannel (channel); } void EventLoop::removeChannel (Channel *channel) { poller_->removeChannel (channel); } bool EventLoop::hasChannel (Channel *channel) { return poller_->hasChannel (channel); } void EventLoop::doPendingFunctors () { std::vector<Functor> functors; callingPendingFunctors_ = true ; { std::unique_lock<std::mutex> lock (mutex_) ; functors.swap (pendingFunctors_); } for (const Functor &functor: functors){ functor (); } callingPendingFunctors_ = false ; }
首先是wakeup函数, EventLoop肯定会调用poller_->poll (也就是epoll_wait),这里肯定会阻塞,但是有一些特殊情况可能需要唤醒这个(这个特殊情况一会再说),因此需要wakeup来实现
我们看到EventLoop里面有一个wakeupFd_的属性,系统会提供一个eventfd函数,这个和socketfd用法差不多也可以用来读和写,将eventfd封装成channel再用poller来监听,这样的话要实现wakeup的话就简单了,只需要注册读事件,然后往wakeupFd_里面写一些东西就好
我们可以看到构造函数里面wakeupChannel_注册了enableReading读事件,setReadCallback设置的回调是EventLoop::handleRea
我们把wakeup与handleRead拿出来看一下
void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = write (wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR ("EventLoop::wakeup() writes %lu bytes instead of 8 \n" , n); } }
void EventLoop::handleRead () { uint64_t one = 1 ; ssize_t n = read (wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR ("EventLoop::handleRead() reads %lu bytes instead of 8" , n); } }
然后看void loop();
这个就比较简单了,主要是逻辑,为什么说比较简单,我们看代码就知道
void EventLoop::loop () { looping_ = true ; quit_ = false ; LOG_INFO ("EventLoop %p start looping \n" , this ); while (!quit_){ activeChannels_.clear (); pollReturnTime_ = poller_->poll (kPollTimeMs,&activeChannels_); for (Channel* channel:activeChannels_){ channel->handleEvent (pollReturnTime_); } doPendingFunctors (); } LOG_INFO ("EventLoop %p stop looping. \n" , this ); looping_ = false ; }
基本上就几行,调用poller_->poll(kPollTimeMs,&activeChannels_)获取那些channel发生事件了,然后回调事件channel可以自己做,这里做一下调用就行,其他的就是doPendingFunctors
doPendingFunctors就涉及到void runInLoop(Functor cb); void queueInLoop(Functor cb); 这两个函数了
runInLoop比较好理解,就是当前loop所在的线程执行的函数,这样的话就直接执行就好了
void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()){ cb (); }else { queueInLoop (cb); } }
当在非当前loop线程中执行cb的时候就需要调用queueInLoop函数了
void EventLoop::queueInLoop (Functor cb) { { std::unique_lock<std::mutex> lock (mutex_) ; pendingFunctors_.emplace_back (cb); } if (!isInLoopThread () || callingPendingFunctors_){ wakeup (); } }
queueInLoop会把cb放置到pendingFunctors_里面,然后唤醒当前loop(原来的loop可能正在阻塞的状态),然后loop就会调用doPendingFunctors
void EventLoop::doPendingFunctors () { std::vector<Functor> functors; callingPendingFunctors_ = true ; { std::unique_lock<std::mutex> lock (mutex_) ; functors.swap (pendingFunctors_); } for (const Functor &functor: functors){ functor (); } callingPendingFunctors_ = false ; }
逐一执行pendingFunctors_里面的函数
Thread EventLoopThread EventLoopThreadPool Thread是对库的封装,其他没有特殊的
#ifndef THREAD_H #define THREAD_H #include "noncopyable.h" #include <functional> #include <thread> #include <memory> #include <unistd.h> #include <string> #include <atomic> class Thread : noncopyable {public : using ThreadFunc = std::function<void ()>; explicit Thread (ThreadFunc,const std::string &name = std::string()) ; ~Thread (); void start () ; void join () ; bool started () const { return started_; } pid_t tid () const { return tid_; } const std::string& name () const { return name_; } static int numCreated () { return numCreated_; } private : void setDefaultName () ; bool started_; bool joined_; std::shared_ptr<std::thread> thread_; pid_t tid_; ThreadFunc func_; std::string name_; static std::atomic_int numCreated_; }; #endif
#include "Thread.h" #include "CurrentThread.h" #include <semaphore.h> std::atomic_int Thread::numCreated_ (0 ) ;Thread::Thread (ThreadFunc func,const std::string &name) : started_ (false ) , joined_ (false ) , tid_ (0 ) , func_ (std::move (func)) , name_ (name) { setDefaultName (); } Thread::~Thread (){ if (started_ && !joined_){ thread_->detach (); } } void Thread::setDefaultName () { int num = ++ numCreated_; if (name_.empty ()){ char buf[32 ] = {0 }; snprintf (buf, sizeof buf, "Thread%d" , num); name_ = buf; } } void Thread::start () { started_ = true ; sem_t sem; sem_init (&sem,false ,0 ); thread_ = std::shared_ptr <std::thread>(new std::thread ([&](){ tid_ = CurrentThread::tid (); sem_post (&sem); func_ (); })); sem_wait (&sem); } void Thread::join () { joined_ = true ; thread_->join (); }
这里我们看,我们用的是std::shared_ptr thread_; 来存储的,正常的话如果我们直接申请一个thread,那么它就直接start了,这样的话我们就可以通过调用void Thread::start()来开始了
EventLoopThread 将Eventloop和Thread结合起来,简单来说就是创建线程,循环里面是一个loop,这就muduo所追求的 one loop per thread
#ifndef EVENTLOOPTHREAD_H #define EVENTLOOPTHREAD_H #include "noncopyable.h" #include "Thread.h" #include <functional> #include <mutex> #include <condition_variable> #include <string> class EventLoop ;class EventLoopThread :noncopyable {public : using ThreadInitCallback = std::function<void (EventLoop*)>; EventLoopThread (const ThreadInitCallback &cb = ThreadInitCallback (),const std::string &name = std::string ()); ~EventLoopThread (); EventLoop* startLoop () ; private : void threadFunc () ; EventLoop* loop_; bool exiting_; Thread thread_; std::mutex mutex_; std::condition_variable cond_; ThreadInitCallback callback_; }; #endif
#include "EventLoopThread.h" #include "EventLoop.h" EventLoopThread::EventLoopThread (const ThreadInitCallback &cb, const std::string &name) : loop_ (nullptr ) , exiting_ (false ) , thread_ (std::bind (&EventLoopThread::threadFunc, this ), name) , mutex_ () , cond_ () , callback_ (cb) { } EventLoopThread::~EventLoopThread (){ exiting_ = true ; if (loop_ != nullptr ){ loop_->quit (); thread_.join (); } } EventLoop* EventLoopThread::startLoop () { thread_.start (); EventLoop *loop = nullptr ; { std::unique_lock<std::mutex> lock (mutex_) ; while (loop_ == nullptr ){ cond_.wait (lock); } loop = loop_; } return loop; } void EventLoopThread::threadFunc () { EventLoop loop; if (callback_){ callback_ (&loop); } { std::unique_lock<std::mutex> lock (mutex_) ; loop_ = &loop; cond_.notify_one (); } loop.loop (); std::unique_lock<std::mutex> lock (mutex_) ; loop_ = nullptr ; }
EventLoopThreadPool 声明多个EventLoop 以供选择,主要是setThreadNum函数以及 EventLoop* getNextLoop();方法
#ifndef EVENTLOOPTHREADPOOL_H #define EVENTLOOPTHREADPOOL_H #include "noncopyable.h" #include <functional> #include <string> #include <vector> #include <memory> class EventLoop ;class EventLoopThread ;class EventLoopThreadPool : noncopyable {public : using ThreadInitCallback = std::function<void (EventLoop*)>; EventLoopThreadPool (EventLoop *baseLoop, const std::string &nameArg); ~EventLoopThreadPool (); void setThreadNum (int numThreads) { numThreads_ = numThreads; } void start (const ThreadInitCallback &cb = ThreadInitCallback()) ; EventLoop* getNextLoop () ; std::vector<EventLoop*> getAllLoops () ; bool started () const { return started_; } const std::string name () const { return name_; } private : EventLoop *baseLoop_; std::string name_; bool started_; int numThreads_; int next_; std::vector<std::unique_ptr<EventLoopThread>> threads_; std::vector<EventLoop*> loops_; }; #endif
#include "EventLoopThreadPool.h" #include "EventLoopThread.h" #include <memory> EventLoopThreadPool::EventLoopThreadPool (EventLoop *baseLoop, const std::string &nameArg) : baseLoop_ (baseLoop) , name_ (nameArg) , started_ (false ) , numThreads_ (0 ) , next_ (0 ) {} EventLoopThreadPool::~EventLoopThreadPool () {} void EventLoopThreadPool::start (const ThreadInitCallback &cb) { started_ = true ; for (int i = 0 ; i < numThreads_; ++i) { char buf[name_.size () + 32 ]; snprintf (buf, sizeof buf, "%s%d" , name_.c_str (), i); EventLoopThread *t = new EventLoopThread (cb, buf); threads_.push_back (std::unique_ptr <EventLoopThread>(t)); loops_.push_back (t->startLoop ()); } if (numThreads_ == 0 && cb) { cb (baseLoop_); } } EventLoop* EventLoopThreadPool::getNextLoop () { EventLoop *loop = baseLoop_; if (!loops_.empty ()) { loop = loops_[next_]; ++next_; if (next_ >= loops_.size ()) { next_ = 0 ; } } return loop; } std::vector<EventLoop*> EventLoopThreadPool::getAllLoops () { if (loops_.empty ()) { return std::vector <EventLoop*>(1 , baseLoop_); } else { return loops_; } }
这里我们看start的时候通过numThreads_来初始化EventLoopThread的数量numThreads_是subReactor的数量,这个和原始的muduo不一样,原始的muduo是baseloop + subReactor的数量
最为主要的是通过轮询的方法来选择subReactor(EventLoop)看EventLoop* EventLoopThreadPool::getNextLoop(),如果loops_为空的话就只会return baseLoop_了
Socket Socket的封装 前面我们封装了IP 与 PORT (InetAddress) 这里的Socket是对 socket fd的封装 也就是 初始化 bind listen accept
#ifndef SOCKET_H #define SOCKET_H #include "noncopyable.h" class InetAddress ;class Socket : noncopyable{ public : explicit Socket (int sockfd) : sockfd_(sockfd) { } ~Socket (); int fd () const { return sockfd_; } void bindAddress (const InetAddress &localaddr) ; void listen () ; int accept (InetAddress *peeraddr) ; void shutdownWrite () ; void setTcpNoDelay (bool on) ; void setReuseAddr (bool on) ; void setReusePort (bool on) ; void setKeepAlive (bool on) ; private : const int sockfd_; }; #endif
#include "Socket.h" #include "Logger.h" #include "InetAddress.h" #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <strings.h> #include <netinet/tcp.h> #include <sys/socket.h> Socket::~Socket (){ close (sockfd_); } void Socket::bindAddress (const InetAddress &localaddr) { if (0 != ::bind (sockfd_,(sockaddr*)localaddr.getSockAddr (),sizeof (sockaddr_in))){ LOG_FATAL ("bind sockfd:%d fail \n" , sockfd_); } } void Socket::listen () { if (0 != ::listen (sockfd_,1024 )){ LOG_FATAL ("listen sockfd:%d fail \n" , sockfd_); } } int Socket::accept (InetAddress *peeraddr) { sockaddr_in addr; socklen_t len = sizeof addr; bzero (&addr,sizeof addr); int connfd = ::accept4 (sockfd_,(sockaddr*)&addr,&len,SOCK_NONBLOCK | SOCK_CLOEXEC); if (connfd >= 0 ){ peeraddr->setSockAddr (addr); } return connfd; } void Socket::shutdownWrite () { if (::shutdown (sockfd_,SHUT_WR) < 0 ){ LOG_ERROR ("shutdownWrite error" ); } } void Socket::setTcpNoDelay (bool on) { int optval = on ? 1 : 0 ; ::setsockopt (sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval); } void Socket::setReuseAddr (bool on) { int optval = on ? 1 : 0 ; ::setsockopt (sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); } void Socket::setReusePort (bool on) { int optval = on ? 1 : 0 ; ::setsockopt (sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval); } void Socket::setKeepAlive (bool on) { int optval = on ? 1 : 0 ; ::setsockopt (sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval); }
Acceptor 新连接接收 Acceptor会工作在mainLoop之中,负责接受新的连接
#ifndef ACCEPTOR_H #define ACCEPTOR_H #include "noncopyable.h" #include "Socket.h" #include "Channel.h" #include <functional> class EventLoop ;class InetAddress ;class Acceptor : noncopyable{ public : using NewConnectionCallback = std::function<void (int sockfd, const InetAddress&)>; Acceptor (EventLoop *loop, const InetAddress &listenAddr, bool reuseport); ~Acceptor (); void setNewConnectionCallback (const NewConnectionCallback &cb) { newConnectionCallback_ = cb; } bool listenning () const { return listenning_; } void listen () ; private : void handleRead () ; EventLoop *loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listenning_; }; #endif
我们可以看到acceptSocket_ 就值服务器的 socket(ip 与 port) 客户端连接的时候就是连接这个
还可以看到 Channel acceptChannel_; 这就是用来处理新的连接的,acceptChannel_会监听读事件并且注册到mainLoop之中(有新连接的时候会相应)
绑定的回调事件为handleRead 其实是 NewConnectionCallback
#include "Acceptor.h" #include "Logger.h" #include "InetAddress.h" #include <sys/types.h> #include <sys/socket.h> #include <errno.h> #include <unistd.h> static int createNonblocking () { int sockfd = ::socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0 ); if (sockfd < 0 ) { LOG_FATAL ("%s:%s:%d listen socket create err:%d \n" , __FILE__, __FUNCTION__, __LINE__, errno); } return sockfd; } Acceptor::Acceptor (EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_ (loop) , acceptSocket_ (createNonblocking ()) , acceptChannel_ (loop, acceptSocket_.fd ()) , listenning_ (false ) { acceptSocket_.setReuseAddr (true ); acceptSocket_.setReusePort (true ); acceptSocket_.bindAddress (listenAddr); acceptChannel_.setReadCallback (std::bind (&Acceptor::handleRead, this )); } Acceptor::~Acceptor () { acceptChannel_.disableAll (); acceptChannel_.remove (); } void Acceptor::listen () { listenning_ = true ; acceptSocket_.listen (); acceptChannel_.enableReading (); } void Acceptor::handleRead () { InetAddress peerAddr; int connfd = acceptSocket_.accept (&peerAddr); if (connfd >= 0 ) { if (newConnectionCallback_) { newConnectionCallback_ (connfd, peerAddr); } else { ::close (connfd); } } else { LOG_ERROR ("%s:%s:%d accept err:%d \n" , __FILE__, __FUNCTION__, __LINE__, errno); if (errno == EMFILE) { LOG_ERROR ("%s:%s:%d sockfd reached limit! \n" , __FILE__, __FUNCTION__, __LINE__); } } }
主要是handleRead函数,直接看 通过peerAddr接收连接sockaddr_in,然后connfd接收socket fd,简单的判断一下传递给newConnectionCallback_就行了,这个由调用方自己设置怎么处理,Acceptor不用管,这不是他应该处理的逻辑
Buffer 缓冲区 和webserver的缓冲区一样,准确来说,webserver的缓冲区是仿照这个来写的,这里就不做多解释了webserver里面已经解释的很清楚了
|————-A———|————B———–|———–C————|
#ifndef BUFFER_H #define BUFFER_H #include <vector> #include <string> #include <algorithm> class Buffer { public : static const size_t kCheapPrepend = 8 ; static const size_t kInitialSize = 1024 ; explicit Buffer (size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize) , readerIndex_(kCheapPrepend) , writerIndex_(kCheapPrepend) { } size_t readableBytes () const { return writerIndex_ - readerIndex_; } size_t writableBytes () const { return buffer_.size () - writerIndex_; } size_t prependableBytes () const { return readerIndex_; } const char * peek () const { return begin () + readerIndex_; } void retrieve (size_t len) { if (len < readableBytes ()) { readerIndex_ += len; } else { retrieveAll (); } } void retrieveAll () { readerIndex_ = writerIndex_ = kCheapPrepend; } std::string retrieveAllAsString () { return retrieveAsString (readableBytes ()); } std::string retrieveAsString (size_t len) { std::string result (peek(), len) ; retrieve (len); return result; } void ensureWriteableBytes (size_t len) { if (writableBytes () < len) { makeSpace (len); } } void append (const char *data, size_t len) { ensureWriteableBytes (len); std::copy (data, data+len, beginWrite ()); writerIndex_ += len; } char * beginWrite () { return begin () + writerIndex_; } const char * beginWrite () const { return begin () + writerIndex_; } ssize_t readFd (int fd, int * saveErrno) ; ssize_t writeFd (int fd, int * saveErrno) ; private : char * begin () { return &*buffer_.begin (); } const char * begin () const { return &*buffer_.begin (); } void makeSpace (size_t len) { if (writableBytes () + prependableBytes () < len + kCheapPrepend) { buffer_.resize (writerIndex_ + len); } else { size_t readalbe = readableBytes (); std::copy (begin () + readerIndex_, begin () + writerIndex_, begin () + kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readalbe; } } std::vector<char > buffer_; size_t readerIndex_; size_t writerIndex_; }; #endif
#include "Buffer.h" #include <errno.h> #include <sys/uio.h> #include <unistd.h> ssize_t Buffer::readFd (int fd, int * saveErrno) { char extrabuf[65536 ] = {0 }; struct iovec vec[2 ]; const size_t writable = writableBytes (); vec[0 ].iov_base = begin () + writerIndex_; vec[0 ].iov_len = writable; vec[1 ].iov_base = extrabuf; vec[1 ].iov_len = sizeof extrabuf; const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1 ; const ssize_t n = ::readv (fd, vec, iovcnt); if (n < 0 ) { *saveErrno = errno; } else if (n <= writable) { writerIndex_ += n; } else { writerIndex_ = buffer_.size (); append (extrabuf, n - writable); } return n; } ssize_t Buffer::writeFd (int fd, int * saveErrno) { ssize_t n = ::write (fd, peek (), readableBytes ()); if (n < 0 ) { *saveErrno = errno; } return n; }
TcpConnection 连接管理 主要负责管理TCP的连接,比如新连接的建立(connectionCallback_)这个回调由调用者设置,读数据(handleRead messageCallback_) 写数据(send handleWrite),其他的函数都是一些辅助性的,很好理解
#ifndef TCPCONNECTION_H #define TCPCONNECTION_H #include "noncopyable.h" #include "InetAddress.h" #include "Callbacks.h" #include "Buffer.h" #include "Timestamp.h" #include <memory> #include <string> #include <atomic> class Channel ;class EventLoop ;class Socket ; class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>{public : TcpConnection (EventLoop *loop, const std::string &name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection (); EventLoop* getLoop () const { return loop_; } const std::string& name () const { return name_; } const InetAddress& localAddress () const { return localAddr_; } const InetAddress& peerAddress () const { return peerAddr_; } bool connected () const { return state_ == kConnected; } void send (const std::string &buf) ; void shutdown () ; void setConnectionCallback (const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback (const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } void setHighWaterMarkCallback (const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } void setCloseCallback (const CloseCallback& cb) { closeCallback_ = cb; } void connectEstablished () ; void connectDestroyed () ; private : enum StateE {kDisconnected,kConnecting,kConnected,kDisconnecting}; void setState (StateE state) { state_ = state; } void handleRead (Timestamp receiveTime) ; void handleWrite () ; void handleClose () ; void handleError () ; void sendInLoop (const void * data,size_t len) ; void shutdownInLoop () ; EventLoop *loop_; const std::string name_; std::atomic_int state_; bool reading_; std::unique_ptr<Socket> socket_; std::unique_ptr<Channel> channel_; const InetAddress localAddr_; const InetAddress peerAddr_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; HighWaterMarkCallback highWaterMarkCallback_; CloseCallback closeCallback_; size_t highWaterMark_; Buffer inputBuffer_; Buffer outputBuffer_; }; #endif
#include "TcpConnection.h" #include "Logger.h" #include "Socket.h" #include "Channel.h" #include "EventLoop.h" #include <functional> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <strings.h> #include <netinet/tcp.h> #include <sys/socket.h> #include <string> static EventLoop* CheckLoopNotNull (EventLoop *loop) { if (loop == nullptr ) { LOG_FATAL ("%s:%s:%d TcpConnection Loop is null! \n" , __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpConnection::TcpConnection (EventLoop *loop, const std::string &nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_ (CheckLoopNotNull (loop)) , name_ (nameArg) , state_ (kConnecting) , reading_ (true ) , socket_ (new Socket (sockfd)) , channel_ (new Channel (loop, sockfd)) , localAddr_ (localAddr) , peerAddr_ (peerAddr) , highWaterMark_ (64 *1024 *1024 ) { channel_->setReadCallback ( std::bind (&TcpConnection::handleRead, this , std::placeholders::_1) ); channel_->setWriteCallback ( std::bind (&TcpConnection::handleWrite, this ) ); channel_->setCloseCallback ( std::bind (&TcpConnection::handleClose, this ) ); channel_->setErrorCallback ( std::bind (&TcpConnection::handleError, this ) ); LOG_INFO ("TcpConnection::ctor[%s] at fd=%d\n" , name_.c_str (), sockfd); socket_->setKeepAlive (true ); } TcpConnection::~TcpConnection (){ LOG_INFO ("TcpConnection::dtor[%s] at fd=%d state=%d \n" , name_.c_str (), channel_->fd (), (int )state_); } void TcpConnection::send (const std::string &buf) { if (state_ == kConnected){ if (loop_->isInLoopThread ()){ sendInLoop (buf.c_str (),buf.size ()); }else { loop_->runInLoop (std::bind ( &TcpConnection::sendInLoop, this , buf.c_str (), buf.size () )); } } } void TcpConnection::sendInLoop (const void * data, size_t len) { ssize_t nwrote = 0 ; size_t remaining = len; bool faultError = false ; if (state_ == kDisconnected){ LOG_ERROR ("disconnected, give up writing!" ); return ; } if (!channel_->isWriting () && outputBuffer_.readableBytes () == 0 ){ nwrote = ::write (channel_->fd (),data,len); if (nwrote >= 0 ){ remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_){ loop_->queueInLoop ( std::bind (writeCompleteCallback_,shared_from_this ()) ); } }else { nwrote = 0 ; if (errno != EWOULDBLOCK){ LOG_ERROR ("TcpConnection::sendInLoop" ); if (errno == EPIPE || errno == ECONNRESET) { faultError = true ; } } } } if (!faultError && remaining > 0 ) { size_t oldLen = outputBuffer_.readableBytes (); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop ( std::bind (highWaterMarkCallback_,shared_from_this (),oldLen+remaining) ); } outputBuffer_.append ((char *)data + nwrote,remaining); if (!channel_->isWriting ()){ channel_->enableWriting (); } } } void TcpConnection::shutdown () { if (state_ == kConnected){ setState (kDisconnecting); loop_->runInLoop ( std::bind (&TcpConnection::shutdownInLoop,this ) ); } } void TcpConnection::shutdownInLoop () { if (!channel_->isWriting ()){ socket_->shutdownWrite (); } } void TcpConnection::connectEstablished () { setState (kConnected); channel_->tie (shared_from_this ()); channel_->enableReading (); connectionCallback_ (shared_from_this ()); } void TcpConnection::connectDestroyed () { if (state_ == kConnected){ setState (kDisconnected); channel_->disableAll (); connectionCallback_ (shared_from_this ()); } channel_->remove (); } void TcpConnection::handleRead (Timestamp receiveTime) { int savedErrno = 0 ; ssize_t n = inputBuffer_.readFd (channel_->fd (),&savedErrno); if (n > 0 ){ messageCallback_ (shared_from_this (),&inputBuffer_,receiveTime); }else if (n == 0 ){ handleClose (); }else { errno = savedErrno; LOG_ERROR ("TcpConnection::handleRead" ); handleError (); } } void TcpConnection::handleWrite () { if (channel_->isWriting ()){ int savedErrno = 0 ; ssize_t n = outputBuffer_.writeFd (channel_->fd (),&savedErrno); if (n > 0 ){ outputBuffer_.retrieve (n); if (outputBuffer_.readableBytes () == 0 ){ channel_->disableWriting (); if (writeCompleteCallback_){ loop_->queueInLoop ( std::bind (writeCompleteCallback_, shared_from_this ()) ); } if (state_ == kDisconnected){ shutdownInLoop (); } } }else { LOG_ERROR ("TcpConnection::handleWrite" ); } }else { LOG_ERROR ("TcpConnection fd=%d is down, no more writing \n" , channel_->fd ()); } } void TcpConnection::handleClose () { LOG_INFO ("TcpConnection::handleClose fd=%d state=%d \n" , channel_->fd (), (int )state_); setState (kDisconnected); channel_->disableAll (); TcpConnectionPtr connPtr (shared_from_this()) ; connectionCallback_ (connPtr); closeCallback_ (connPtr); } void TcpConnection::handleError () { int optval; socklen_t optlen = sizeof optval; int err = 0 ; if (::getsockopt (channel_->fd (), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0 ) { err = errno; } else { err = optval; } LOG_ERROR ("TcpConnection::handleError name:%s - SO_ERROR:%d \n" , name_.c_str (), err); }
读数据 handleRead 主要是调用messageCallback_还有一些错误处理 messageCallback_ 也是调用者(TcpServer)设置的(其实也是用户设置的)
void TcpConnection::handleRead (Timestamp receiveTime) { int savedErrno = 0 ; ssize_t n = inputBuffer_.readFd (channel_->fd (),&savedErrno); if (n > 0 ){ messageCallback_ (shared_from_this (),&inputBuffer_,receiveTime); }else if (n == 0 ){ handleClose (); }else { errno = savedErrno; LOG_ERROR ("TcpConnection::handleRead" ); handleError (); } }
写数据 send handleWrite 基本思路:send会先发送一份,如果数据比较少,一次可以写完的话就直接写,不用直接调用buffer等操作,但是如果没发送完就把剩余的写进buf里面,然后注册写事件,这样的话会触发poller通知,从而触发channel的handleWrite回调,从而把剩下的数据给发送除去
void TcpConnection::sendInLoop (const void * data, size_t len) { ssize_t nwrote = 0 ; size_t remaining = len; bool faultError = false ; if (state_ == kDisconnected){ LOG_ERROR ("disconnected, give up writing!" ); return ; } if (!channel_->isWriting () && outputBuffer_.readableBytes () == 0 ){ nwrote = ::write (channel_->fd (),data,len); if (nwrote >= 0 ){ remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_){ loop_->queueInLoop ( std::bind (writeCompleteCallback_,shared_from_this ()) ); } }else { nwrote = 0 ; if (errno != EWOULDBLOCK){ LOG_ERROR ("TcpConnection::sendInLoop" ); if (errno == EPIPE || errno == ECONNRESET) { faultError = true ; } } } } if (!faultError && remaining > 0 ) { size_t oldLen = outputBuffer_.readableBytes (); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop ( std::bind (highWaterMarkCallback_,shared_from_this (),oldLen+remaining) ); } outputBuffer_.append ((char *)data + nwrote,remaining); if (!channel_->isWriting ()){ channel_->enableWriting (); } } } void TcpConnection::handleWrite () { if (channel_->isWriting ()){ int savedErrno = 0 ; ssize_t n = outputBuffer_.writeFd (channel_->fd (),&savedErrno); if (n > 0 ){ outputBuffer_.retrieve (n); if (outputBuffer_.readableBytes () == 0 ){ channel_->disableWriting (); if (writeCompleteCallback_){ loop_->queueInLoop ( std::bind (writeCompleteCallback_, shared_from_this ()) ); } if (state_ == kDisconnected){ shutdownInLoop (); } } }else { LOG_ERROR ("TcpConnection::handleWrite" ); } }else { LOG_ERROR ("TcpConnection fd=%d is down, no more writing \n" , channel_->fd ()); } }
其他的就是关闭连接 错误处理等操作了,需要注意的是里面针对channel等操作
TcpServer 模块连接 所有一切的基础功能,上面的模块已经完成了,在TcpServer里面主要是将这些基础的功能连接在一起
比如:Adaptor如何分配connection给subReactor,newConnection的回调是什么(connectionCallback_ messageCallback_ 交给用户自己来设置了)等
其他的还有一些像EventLoopThreadPool的setThreadNum,还有removeConnection等简单操作了,这些操作一般就是直接调用写好模块的函数再加一些简单的逻辑就好
TcpServer:
有一个baseLoop(loop_) 是mainLoop 负责分发任务,需要用户自己定义,然后在TcpServer初始化的时候传给TcpServer
有一个EventLoopThreadPool,根据初始化的numThreads的数量初始化subLoop的数量,负责处理新连接以外的各种事物
有一个Acceptor,负责新连接,注册到baseLoop里面的poller,监听读事件,设置TcpServer::newConnection的回调,也就是说,当有新连接的时候就会触发读事件从而通知Acceptor里面的channel执行回调(newConnection)
#ifndef TCPSERVER_H #define TCPSERVER_H #include "EventLoop.h" #include "Acceptor.h" #include "InetAddress.h" #include "noncopyable.h" #include "EventLoopThreadPool.h" #include "Callbacks.h" #include "TcpConnection.h" #include "Buffer.h" #include <functional> #include <string> #include <memory> #include <atomic> #include <unordered_map> class TcpServer : noncopyable {public : using ThreadInitCallback = std::function<void (EventLoop*)>; enum Option { kNoReusePort, kReusePort, }; TcpServer (EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option = kNoReusePort); ~TcpServer (); void setThreadInitcallback (const ThreadInitCallback &cb) { threadInitCallback_ = cb; } void setConnectionCallback (const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback (const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } void setThreadNum (int numThreads) ; void start () ; private : void newConnection (int sockfd, const InetAddress &peerAddr) ; void removeConnection (const TcpConnectionPtr &conn) ; void removeConnectionInLoop (const TcpConnectionPtr &conn) ; using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; EventLoop *loop_; const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; std::shared_ptr<EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; ThreadInitCallback threadInitCallback_; std::atomic_int started_; int nextConnId_; ConnectionMap connections_; }; #endif
#include "TcpServer.h" #include "Logger.h" #include "TcpConnection.h" #include <strings.h> #include <functional> static EventLoop* CheckLoopNotNull (EventLoop *loop) { if (loop == nullptr ){ LOG_FATAL ("%s:%s:%d mainLoop is null! \n" , __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpServer::TcpServer (EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_ (CheckLoopNotNull (loop)) , ipPort_ (listenAddr.toIpPort ()) , name_ (nameArg) , acceptor_ (new Acceptor (loop, listenAddr, option == kReusePort)) , threadPool_ (new EventLoopThreadPool (loop, name_)) , connectionCallback_ () , messageCallback_ () , nextConnId_ (1 ) , started_ (0 ) { acceptor_->setNewConnectionCallback (std::bind (&TcpServer::newConnection, this , std::placeholders::_1, std::placeholders::_2)); } TcpServer::~TcpServer () { for (auto &item:connections_){ TcpConnectionPtr conn (item.second) ; item.second.reset (); conn->getLoop ()->runInLoop ( std::bind (&TcpConnection::connectDestroyed,conn) ); } } void TcpServer::setThreadNum (int numThreads) { threadPool_->setThreadNum (numThreads); } void TcpServer::start () { if (started_++ == 0 ){ threadPool_->start (threadInitCallback_); loop_->runInLoop (std::bind (&Acceptor::listen,acceptor_.get ())); } } void TcpServer::newConnection (int sockfd, const InetAddress &peerAddr) { EventLoop *ioLoop = threadPool_->getNextLoop (); char buf[64 ] = {0 }; snprintf (buf, sizeof buf, "-%s#%d" , ipPort_.c_str (), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO ("TcpServer::newConnection [%s] - new connection [%s] from %s \n" , name_.c_str (), connName.c_str (), peerAddr.toIpPort ().c_str ()); sockaddr_in local; ::bzero (&local,sizeof local); socklen_t addrlen = sizeof local; if (::getsockname (sockfd, (sockaddr*)&local, &addrlen) < 0 ) { LOG_ERROR ("sockets::getLocalAddr" ); } InetAddress localAddr (local) ; TcpConnectionPtr conn (new TcpConnection( ioLoop, connName, sockfd, localAddr, peerAddr)) ; connections_[connName] = conn; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpServer::removeConnection, this , std::placeholders::_1) ); ioLoop->runInLoop (std::bind (&TcpConnection::connectEstablished, conn)); } void TcpServer::removeConnection (const TcpConnectionPtr &conn) { loop_->runInLoop ( std::bind (&TcpServer::removeConnectionInLoop, this , conn) ); } void TcpServer::removeConnectionInLoop (const TcpConnectionPtr &conn) { LOG_INFO ("TcpServer::removeConnectionInLoop [%s] - connection %s\n" , name_.c_str (), conn->name ().c_str ()); connections_.erase (conn->name ()); EventLoop *ioLoop = conn->getLoop (); ioLoop->queueInLoop ( std::bind (&TcpConnection::connectDestroyed,conn) ); }
我们直接来看newConnection这一关键函数
void TcpServer::newConnection (int sockfd, const InetAddress &peerAddr) { EventLoop *ioLoop = threadPool_->getNextLoop (); char buf[64 ] = {0 }; snprintf (buf, sizeof buf, "-%s#%d" , ipPort_.c_str (), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO ("TcpServer::newConnection [%s] - new connection [%s] from %s \n" , name_.c_str (), connName.c_str (), peerAddr.toIpPort ().c_str ()); sockaddr_in local; ::bzero (&local,sizeof local); socklen_t addrlen = sizeof local; if (::getsockname (sockfd, (sockaddr*)&local, &addrlen) < 0 ) { LOG_ERROR ("sockets::getLocalAddr" ); } InetAddress localAddr (local) ; TcpConnectionPtr conn (new TcpConnection( ioLoop, connName, sockfd, localAddr, peerAddr)) ; connections_[connName] = conn; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpServer::removeConnection, this , std::placeholders::_1) ); ioLoop->runInLoop (std::bind (&TcpConnection::connectEstablished, conn)); }
直接通过EventLoopThreadPool的轮询算法选择一个subReactor(ioLoop) 然后初始化一些这个新连接的一些信息,后面再通过ioLoop直接调用TcpConnection的connectEstablished函数
void TcpConnection::connectEstablished () { setState (kConnected); channel_->tie (shared_from_this ()); channel_->enableReading (); connectionCallback_ (shared_from_this ()); }
监听新连接的读事件,当有信息发过来,有可读信息的时候,就会触发channel_里面读的回调从而调用回调函数,也就是用户设置的messageCallback
其他的到没有啥需要特别说明的了,针对启动一切的start函数的话,就直接调用地层模块的启动就好
void TcpServer::start () { if (started_++ == 0 ){ threadPool_->start (threadInitCallback_); loop_->runInLoop (std::bind (&Acceptor::listen,acceptor_.get ())); } }
这样的话我们就理清楚了,留给用户需要做的就是,setConnectionCallback(可选) setMessageCallback(必须) setThreadNum(不做的话就只有mainbReactor在工作了,非常不好)
编译成库添加环境变量路径 <!-- CMakeLists.txt --> cmake_minimum_required (VERSION 2.5 )project (mymuduo)set (LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR} /lib)set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11 -fPIC" )aux_source_directory (. SRC_LIST)add_library (mymuduo SHARED ${SRC_LIST} )
#!/bin/bash set -eif [ ! -d `pwd `/build ]; then mkdir `pwd `/build fi rm -rf `pwd `/build/*cd `pwd `/build && cmake .. && make cd ..if [ ! -d /usr/include/mymuduo ]; then mkdir /usr/include/mymuduo fi for header in `ls *.h`do cp $header /usr/include/mymuduo done cp `pwd `/lib/libmymuduo.so /usr/libldconfig
测试代码 #include <mymuduo/TcpServer.h> #include <mymuduo/Logger.h> #include <string> #include <functional> class EchoServer {public : EchoServer (EventLoop *loop, const InetAddress &addr, const std::string &name) : server_ (loop, addr, name) , loop_ (loop) { server_.setConnectionCallback ( std::bind (&EchoServer::onConnection, this , std::placeholders::_1) ); server_.setMessageCallback ( std::bind (&EchoServer::onMessage, this , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3) ); server_.setThreadNum (3 ); } void start () { server_.start (); } private : void onConnection (const TcpConnectionPtr &conn) { if (conn->connected ()){ LOG_INFO ("Connection UP : %s" , conn->peerAddress ().toIpPort ().c_str ()); }else { LOG_INFO ("Connection DOWN : %s" , conn->peerAddress ().toIpPort ().c_str ()); } } void onMessage (const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg = buf->retrieveAllAsString (); conn->send (msg); conn->shutdown (); } EventLoop *loop_; TcpServer server_; }; int main () { EventLoop loop; InetAddress addr (8000 ) ; EchoServer server (&loop, addr, "EchoServer-01" ) ; server.start (); loop.loop (); return 0 ; }
测试: telnet 127.0.0.1 8000