demo源码 epoll异步事件驱动框架部分

在该部分中,主要实现了两个类: 1抽象类EventHandle 作为event类中部分函数调用派生类 httpserver与httpstream(完整http套接字封装)对象提供基类指针。

2,event类 该类中主要实现了如下关键函数: 1,构造函数 主要是封装epoll_create,,对类中ThreadPool对象初始化,并调用eventwait_thread函数。 也因为pthread_create函数参数的需要,将process_task设置为静态函数,这样process_task在ThreadPool类未实例化时就有一个稳定的函数起始地址。 跟L/F线程池类ThreadPool的构造函数描述差不太多。

2,static void *eventwait_thread(void *arg); 一个静态的等待处理线程任务函数,在该函数中封装epoll_wait不停(while(1))获取套接字描述符fd并将其push入任务池(typedef std::map<int, EventType> EventTask_t;)与线程池。(此处note:fd从线程池中的弹出是在线程池类的process_task函数中实现的fd从任务池中弹出是process_task调用的event类的threadhandle()函数中实现的)

3,threadhandle()函数 从任务池弹出fd,调用http的handle_xxx函数对fd进行处理,然后调用epoll_ctl函数del掉事件。

4,register_event函数 封装epoll_ctl在epoll框架中add fd事件。 这个函数时在httpserver的构造函数里实现的。

event.h

#ifndef _EVENT_
#define _EVENT_
#include <sys/epoll.h>
#include <map>
#include "queue.h"
#include "threadpool.h"
#include "http.h"

namespace Event
{
enum EventType
{
	EIN = EPOLLIN,		  // 读事件
	EOUT = EPOLLOUT,	  // 写事件
	ECLOSE = EPOLLRDHUP,  // 对端关闭连接或者写半部
	EPRI = EPOLLPRI,	  // 紧急数据到达
	EERR = EPOLLERR,	  // 错误事件
	EET = EPOLLET, 		  // 边缘触发
	EDEFULT = EIN | ECLOSE | EERR | EET
};

class EventHandle
{
	public:
		int register_event(int fd, EventType type = EDEFULT);
		int register_event(Socket::ISocket &socket, EventType type = EDEFULT);
		int shutdown_event(int fd);
		int shutdown_event(Socket::ISocket &);
	protected:
		virtual void handle_in(int) = 0;
};

class Event: public ThreadPool::ThreadHandle
{
    private:

		enum Limit{
			EventBuffLen = 1024, CommitAgainNum = 2,
		};
		int m_epollfd;//句柄
		struct epoll_event m_eventbuff[EventBuffLen];//事件集合缓冲区

		pthread_t m_detectionthread;

		pthread_mutex_t m_events_mutex;

		ThreadPool::ThreadPool *m_ithreadpool;        	

		typedef std::map<int, ThreadHandle *> EventMap_t;

		typedef std::map<int, EventType> EventTask_t;

		EventTask_t m_events;


	public:
		Event(size_t eventmax);
		~Event();
		int register_event(int, EventHandle *, EventType);
		int shutdown_event(int);
		void threadhandle();
	private:
		static void *eventwait_thread(void *arg);	
		int pushtask(int fd, EventType event);
		int poptask(int &fd, EventType &event);
		int Event::cleartask(int fd);
		ThreadHandle *get_observer(int fd);

};
}
#endif

event.cpp


#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include "threadpool.h"
#include "event.h"
#include "socket.h"
#include "http.h"

#define INVALID_FD(fd) (fd < 0)
#define INVALID_POINTER(p) (p == NULL)

namespace Event
{

Event::Event(size_t eventmax):   m_detectionthread(0)
{
	bzero((void *)&m_eventbuff, sizeof(m_eventbuff));//初始化事件集合缓冲区
	
	m_epollfd = epoll_create(eventmax);

	m_ithreadpool = new ThreadPool::ThreadPool(4, 1024);
	
	pthread_t tid = 0;
	if(pthread_create(&tid, NULL, eventwait_thread, (void *)this) == 0)//创建一条线程eventwait_thread,在这里调用epoll_wait等待事件
		m_detectionthread = tid;
}

Event::~Event()
{
	if(m_detectionthread != 0 && pthread_cancel(m_detectionthread) == 0){	//cancel thread
		pthread_join(m_detectionthread, (void **)NULL);//pthread_cancel后,部分资源仍未回收,调用pthread_join等待线程结束,到线程的退出代码后回收其资源
	}
	
	if(!INVALID_FD(m_epollfd))
		close(m_epollfd);
}

int Event::register_event(int fd, EventHandle *handle, EventType type)
{
	struct epoll_event newevent;
	newevent.data.fd = fd;
	newevent.events = type;
	epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &newevent) ;
	return 0;
}


void *Event::eventwait_thread(void *arg)
{
	Event &cevent = *(Event *)(arg);
	for(;;){
		int eventnum = epoll_wait(cevent.m_epollfd, &cevent.m_eventbuff[0], EventBuffLen, -1);//参数cevent.m_eventbuff[0]用来从内核得到事件的集合

		for(int i = 0; i < eventnum; i++){//事件发生后将事件放入队列并向线程池投入任务。

			int fd = cevent.m_eventbuff[i].data.fd;
			EventType events = static_cast<EventType>(cevent.m_eventbuff[i].events);
			
			if(cevent.pushtask(fd, events) == 0){
				cevent.m_ithreadpool->pushtask(&cevent);
			}
		}
	}
	pthread_exit(NULL);
}

int Event::pushtask(int fd, EventType event)
{
	pthread_mutex_lock(&m_events_mutex);
	EventTask_t::iterator itor = m_events.find(fd);
	if(itor == m_events.end()){
		m_events[fd] = event;
		pthread_mutex_unlock(&m_events_mutex);
		return 0;
	}
	// exist, update it
	itor->second = (EventType)(itor->second | event);	
	pthread_mutex_unlock(&m_events_mutex);
	return 0;
}


void Event::threadhandle()
{
	int fd = 0x00;
	EventType events;
	poptask(fd, events) ;


	Socket::Client  ob(fd);
	Socket::Client  *obs=&ob;
	Http::HttpStream  obse(obs);
	Http::HttpStream *observer=&obse;

	if(observer == NULL)
		return;

	if(events & ECLOSE){
		cleartask(fd);		
	
	}else{	
		if(events & EIN){
			observer->handle_in(fd);
		}

	}	
	
	epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, NULL);
	observer->handle_close(fd);	
	
}



int Event::poptask(int &fd, EventType &event)
{
	pthread_mutex_lock(&m_events_mutex);
	EventTask_t::iterator itor = m_events.begin();
	if(itor == m_events.end()){
		pthread_mutex_unlock(&m_events_mutex);
		return -1;
	}
		

	fd = itor->first;
	event = itor->second;

	m_events.erase(itor);
	pthread_mutex_unlock(&m_events_mutex);
	return 0;
}

int Event::cleartask(int fd)
{
	pthread_mutex_lock(&m_events_mutex);
	if(fd == -1){	// clear all
		m_events.clear();
		pthread_mutex_unlock(&m_events_mutex);
		return 0;
		
	}else if(fd >= 0){
		EventTask_t::iterator itor = m_events.find(fd);
		if(itor == m_events.end()){
			pthread_mutex_unlock(&m_events_mutex);
			return -1;
		}
			

		m_events.erase(itor);
		pthread_mutex_unlock(&m_events_mutex);
		return 0;	
	}

	return -1;
}

}