demo源码 LF线程池部分¶
threadpool.h基于非阻塞队列实现的简易L/F线程池。
L/F 领导者跟随者模式 :在LF线程池中,线程可处在3种线程状态之一: leader、follower或processor。处于leader状态的线程负责监听网络端口,当有消息到达时,该线程负责消息分离,并从处于 follower状态中的线程中按照某种机制如FIFO或基于优先级等选出一个来当新的leader,然后将自己设置为processor状态去分配和处 理该事件。处理完毕后线程将自身的状态设置为follower状态去等待重新成为leader。在整个线程池中同一时刻只有一个线程可以处于leader 状态,这保证了同一事件不会被多个线程重复处理。
缺点:实现复杂性和缺乏灵活性;
优点:增强了CPU高速缓存相似性,消除了动态内存分配和线程间的数据交换。
该文件中实现了两个类: 1,抽象类ThreadHandle 作为线程处理的接口,为调用派生类 event(异步事件驱动框架)对象提供基类指针。 2,L/F线程池类ThreadPool 该类中主要实现了如下函数: 1,构造函数 主要是封装pthread_create,并调用process_task函数。 也因为pthread_create函数参数的需要,将process_task设置为静态函数,这样process_task在ThreadPool类未实例化时就有一个稳定的函数起始地址。 2,析构函数 不谈。 3,static void *process_task(void * arg) 一个静态的线程任务处理函数,在该函数中不停(while(1))从线程handle队列中pop句柄并通过调用Event类的任务执行函数threadhandle->threadhandle();(在event类中实现)来处理。 promote_leader和join_follwer作为process_task的工具函数,做到在调用process_task时,始终是leader线程在处理任务。
#ifndef _THREADPOOL_H_
#define _THREADPOO_H_
#include <pthread.h>
#include <vector>
#include <stdio.h>
#include "queue.h"
//线程池类
#define THREADNUM_MAX 64
namespace ThreadPool
{
class ThreadHandle
{
friend class ThreadPool;
public:
virtual ~ThreadHandle(){};
virtual void threadhandle() = 0;
};
class ThreadPool
{
public:
typedef void *(threadproc_t)(void *);
typedef std::vector<pthread_t> vector_tid_t;
typedef Queue::Queue<ThreadHandle *> queue_handle_t;
private:
size_t m_threadnum;//线程数量
vector_tid_t m_thread;//存放线程的顺序表
queue_handle_t m_taskqueue;//线程句柄队列
bool m_hasleader;//是否有leader
pthread_cond_t m_befollower_cond;//
pthread_mutex_t m_identify_mutex;//
public:
ThreadPool(size_t threadnum, size_t tasknum): m_taskqueue(tasknum), m_hasleader(false)
{
m_threadnum = (threadnum > THREADNUM_MAX)? THREADNUM_MAX: threadnum;
pthread_attr_t thread_attr;//线程属性
pthread_attr_init(&thread_attr);//属性初始化
for(size_t i = 0; i < m_threadnum; i++){
pthread_t tid = i;//线程标志符
pthread_create(&tid, &thread_attr, process_task, (void *)this);// (void *)this是process_task需要的参数
m_thread.push_back(tid);
}
pthread_attr_destroy(&thread_attr);//属性去除初始化
}
~ThreadPool()
{
void *retval = NULL;
vector_tid_t::iterator itor = m_thread.begin();
for(; itor != m_thread.end(); itor++){
pthread_cancel(*itor) < 0 || pthread_join(*itor, &retval) ;
m_thread.clear();
}
}
// 往线程池放入任务,非阻塞方式
int pushtask(ThreadHandle *handle)
{
return m_taskqueue.push_nonblock(handle);
}
private:
void promote_leader()
{
pthread_mutex_lock(&m_identify_mutex);
while(m_hasleader){ // 已经有leader,阻塞
pthread_cond_wait(&m_befollower_cond, &m_identify_mutex);
}
m_hasleader = true;
pthread_mutex_unlock(&m_identify_mutex);
}
void join_follwer()
{
pthread_mutex_lock(&m_identify_mutex);
m_hasleader = false;
pthread_cond_signal(&m_befollower_cond);
pthread_mutex_unlock(&m_identify_mutex);
}
static void *process_task(void * arg)
{
ThreadPool &threadpool = *(ThreadPool *)arg;
while(true){
threadpool.promote_leader();
ThreadHandle *threadhandle = NULL;
int ret = threadpool.m_taskqueue.pop(threadhandle);
threadpool.join_follwer();
if(ret == 0 && threadhandle)//返回成功并且有任务需处理
threadhandle->threadhandle();
}
pthread_exit(NULL);
}
};
}
#endif