线程池引言

池分类

  1. 线程池
  2. 数据库连接池
  3. 内存池
  4. 异步请求池

池化优势

缓冲, 重复利用, 大大减少重建, 节约资源, 提高效率, 提高利用率

核心优势在哪里?

  1. 提前创建, 申请, 反复利用, 而不是重新创建, 申请.
  2. 反复利用所以利用率高, 也节约了资源
  3. 提前创建, 而不是临时创建, 省去了创建时间, 提高了效率

用在何处

  1. 频繁需要申请释放处。 反正经常用, 我何不提前创建好, 等待你用, 用完我也不扔掉, 继续等你其他时候用.
  2. 多线程处:均可以考虑抛入线程池, 减少线程频繁创建销毁
  3. 注意:频繁是核心.
  4. 生活例子: 蓄水池, 酒池肉林, 好处何在? 方便吧, 提前放好, 随用随取

线程池组件

线程队列:

提前开启线程,

多线程同步消费任务队列中的任务

多线程同步消费:加锁 + 条件消费, 同步等待, 一个线程需要消费必须同时具备两个条件, 1.获取锁 2. 满足条件 (存在任务)

同步:核心在于条件等待, 等待着一个条件满足,然后同步触发一个事件, 阻塞函数, 同步等待.

同步优势:对于共享资源的有序消费, 多线程之间互相等待, 互相同步, 稳定消费

任务队列

也可叫做阻塞队列: blocking queue 学名

阻塞队列作用:异步解耦合, 任务放入任务队列中, 立刻返回, 在工作线程繁忙的时候,不至于需要等待线程空闲.

线程队列

组织成双向队列的多线程. 工作线程, 消费者线程, 提前开启, 等待处理任务

生活实例:办事窗口

线程池

将上述两大组件组合在一起 + mutex锁 + cond条件变量 实现同步消费

Reactor分解

何为Reactor

反应堆, 事件反应堆, 反射堆: 将对IO的操作封装成对事件的操作

Reactor组件

  1. 多路复用器 收集反应事件 EPOLL
  2. 事件处理器 回调处理机制
  3. 利用回调封装事件循环

网络IO处理分解

  1. io检测封装: epoll活跃io事件收集
  2. io操作封装, 读写io
  3. 对数据的解析操作封装, parser 业务逻辑

Reactor抛入线程池的方式

single reactor thread + worker threadpool

抛线程池方式1: 将parser业务处理抛入线程池

单线程reactor + 工作线程池

做法: 将业务逻辑处理单独抛入一个工作线程池进行处理. 实现网络IO跟业务的解耦合

使用场景: 相比IO, 业务逻辑处理耗时相当严重. 比如说写日志呀, XML 文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,它们会拖慢整个反应堆模式的执行效率。此时我们就可以将其单独抛到另外的Thread pool 中去执行业务需求.

好处:反应堆线程仅仅处理网络IO 而 decode、compute、enode 型工作放置到另外的线程池中, 两者解耦,在业务处理耗时情况下大大提高效率

抛线程池方式2: 将IO操作 + parser都抛入线程池处理

使用场景:IO操作跟parser的处理都相当耗时的情境下, 将其放在事件循环中会拖慢整个事件循环的进程。

好处:事件循环可以最快的响应活跃事件.

缺陷:针对IO操作: 我们可能存在对于fd的一个共享问题. 一个线程在操作fd,另外一个线程给fd关闭了,这个就是一个大的问题. (核心在于可能出现fd的多线程共用的问题)

处理方式: 要么不应该使多个线程共享同一个fd,要么对fd进行简单的加锁操作。

充分利用多核CPU,主从Reactor

单Reactor的时候, reactor 反应堆同时分发Acceptor 上的连接建立事件和已建立连接的 I/O 事件。这样对于客户端接入量不高的情况下是完全OK的.

但是一旦客户端接入量特别大的情况下, reactor既要分发连接建立,又分发已建立连接的 I/O,有点忙不过来,在实战中的表现可能就是客户端连接成功率偏低。

引出 — 主从Reactor模式, 多Reactor模型, 将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离

核心思想: main Reactor只负责分发连接建立事件, sub Reactor 来负责已经建立连接的事件的分发.

sub Reactor的数量设置: 依据CPU数量而定

优势:服务器稳定性大大提升, 客户端连接成功率大大提高

?相关视频推荐

linux多线程之epoll原理剖析与reactor原理及应用

网络原理tcp/udp,网络编程epoll/reactor,面试中正经“八股文”

学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂

需要C/C++ Linux后台服务器架构师学习资料加qun812855908(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLRedis,fastdfs,MongoDBZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等)

?面试项目书写小技巧

对比书写, 分版本书写, 不同的版本优势是什么, 加入了什么技术, 带来了什么样的好处, 获益是什么

eg:本文中的reactor.

我在简历项目上书写, 实现了一个多线程reactor网络服务器框架

最开始使用什么样的技术, 实现了单线程reactor的框架, 然后对比单线程reacor跟多线程reactor, 表明加入线程池 + 主从reactor之后的优势所在. 带来了什么样的提升, 服务器稳定性提升呀, 性能抗压性提升等.

如下是我在网上找到的一个大佬写的单Reactor + 线程池的实现, 我觉得写的很棒, 其中的代码很多都是特别值得我们去品味的

reactor: 一个使用c语言实现的“单Reactor多线程”的简易reactor模型。

https://gitee.com/chenwifi/reactor

如下是自己封装的一款单线程 reactor + 方式1 抛入线程池.

threadpool.h

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
 
 
typedef void (*Func)(void*);
 
/*
	线程池组件
	1. 任务队列 (阻塞队列)
	2. 工作队列 (消化任务队列中的任务)
	3. 管理组件, 管理平衡工作 + 任务队列  (线程池)
*/
 
#define LL_ADD(item, list) do {\
	item->pre = NULL;\
	item->next = list;\
	list = item;\
} while (0)
 
 
#define LL_REMOVE(item, list) do {\
	if (item->pre != NULL) item->pre->next = item->next;\
	if (item->next != NULL) item->next->pre = item->pre;\
	if (item == list) list = item->next;\
	item->pre = item->next = NULL;\
} while (0)
 
 
 
 
typedef struct Task {
	Func task_run; //执行task任务, 处理user_data
	void* user_data;
 
	struct Task* pre;
	struct Task* next;
 
} Task;
 
 
//工作线程, 消化执行 task
typedef struct Wocker {
	pthread_t tid;
	int terminate;//终止, 停止工作
 
	struct Mannger* pool; 
 
	struct Wocker* pre;
	struct Wocker* next;
 
} Wocker;
 
//管理组件, 管理平衡上述的Task + Wocker
typedef struct Mannger {
	struct Wocker* wockers;//工作队列
	struct Task* tasks;//任务队列
 
	pthread_mutex_t lock;
	pthread_cond_t cond;
 
} ThreadPool;
 
//线程执行函数, 核心所在
void* thread_routine(void* arg) {
	Wocker* wocker = (Wocker*)arg;
 
	while (1) {
		pthread_mutex_lock(&wocker->pool->lock);
		if (wocker->pool->tasks == NULL) {
			if (wocker->terminate) break;//中断
			pthread_cond_wait(&wocker->pool->cond, &wocker->pool->lock);
		}
		//至此说明获取到锁了
		if (wocker->terminate) {
			pthread_mutex_unlock(&wocker->pool->lock);
			break;
		}
		Task* task = wocker->pool->tasks;
		if (task != NULL) {
			LL_REMOVE(task, wocker->pool->tasks);
		}
 
		pthread_mutex_unlock(&wocker->pool->lock);
		task->task_run(task);
	}
 
	free(wocker);//delete掉
}
 
//创建线程池, 开启消费者线程
ThreadPool* thread_pool_create(int thread_num) {
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	pool->wockers = NULL;
  	pool->tasks = NULL;
	pthread_mutex_init(&pool->lock, NULL);
	pthread_cond_init(&pool->cond, NULL);
	int i = 0;
	for (i = 0; i < thread_num; ++i) {
		Wocker* wocker = (Wocker*)calloc(sizeof(Wocker), 1);
		wocker->pool = pool;
 
		pthread_create(&wocker->tid, NULL, thread_routine, (void*)wocker);
 
		LL_ADD(wocker, pool->wockers);
	}
 
	return pool;
}
 
 
void thread_pool_destroy(ThreadPool* pool) {
	if (pool == NULL) return ;
	Wocker* wocker = NULL;
	for (wocker = pool->wockers; wocker != NULL; wocker = wocker->next) {
		wocker->terminate = 1;//中断运行, 所有的工作线程中断工作
	}
	pthread_mutex_lock(&pool->lock);
 
	pthread_cond_broadcast(&pool->cond);//核心所在
	//广播让所有的工作线程退出工作
	pthread_mutex_unlock(&pool->lock);
 
	pthread_cond_destroy(&pool->cond);
	pthread_mutex_destroy(&pool->lock);
	//释放所有资源
	//free_all(pool);
}
 
void thread_pool_push_task(ThreadPool* pool, Task* task) {
		pthread_mutex_lock(&pool->lock);
		LL_ADD(task, pool->tasks);
		pthread_cond_signal(&pool->cond);//通知有任务可以消费了
		pthread_mutex_unlock(&pool->lock);
}
 
 
#endif

reactor.h

#ifndef _REACTOR_H_
#define _REACTOR_H_
 
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include "threadpool.h"
 
//简易reactor封装, 实现事件循环, 事件驱动
//io检测封装: epoll事件收集
//事件驱动, 事件循环封装, 设置回调
//IO操作封装, 设置收发缓冲区 + 处理返回
 
 
//reactor进一步升级, 更符合业务需求.
//网络 跟 业务需求 隔离解除耦合性
 
 
#define MAX_N 512 
typedef struct sockaddr SA;
 
typedef int (*callback)(int fd, int events, void* arg);
 
#define BUFFSIZE 1024
 
void err_exit(const char* reason) {
	fprintf(stderr, "%s : %d : %s\n", reason, errno, strerror(errno));
	exit(EXIT_FAILURE);
}
 
//封装epoll
typedef struct reactor {
	int epfd;
	struct epoll_event* events;//容器, 收集活跃事件
	int stop;
	ThreadPool* pool;
} reactor;
 
typedef struct sockitem {
	int sockfd;
	//封装回调, 事件驱动
	CallBack callback;
 
	struct reactor* eventloop;
	//每条连接读写缓冲区封装
	char recvbuffer[BUFFSIZE];
	int rlen;
 
	char sendbuffer[BUFFSIZE];
	int slen;
} sockitem;
 
//事件处理器声明
int accept_cb(int fd, int events, void* arg);
int recv_cb(int fd, int events, void* arg);
int send_cb(int fd, int events, void* arg);
 
 
struct reactor* init_reactor(int n) {
	struct reactor* eventloop = (struct reactor*)malloc(sizeof(struct reactor));
 
	eventloop->epfd = epoll_create(1);
	eventloop->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * n);
	eventloop->stop = 0;
	eventloop->pool = thread_pool_create(10);
	return eventloop; 
}
 
 
void release_reactor(struct reactor* eventloop) {
	if (NULL == eventloop) return;
 
	close(eventloop->epfd); //关闭epfd
	free(eventloop);
	return ;
}
 
struct sockitem* init_sockitem(struct reactor* eventloop, int sockfd, CallBack callback) {
	struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
	si->sockfd = sockfd;
	si->callback = callback;
	si->eventloop = eventloop;
	return si;
}
 
 
void setnoblock(int fd) {
	int flag = fcntl(fd, F_GETFL, 0);
	if (-1 == fcntl(fd, F_SETFL, flag | O_NONBLOCK)) {
		err_exit("fcntl");
	}
}
 
 
int add_event(struct reactor* eventloop, int events, sockitem* si) {
	struct epoll_event ev;
 
	Site is undergoing maintenance = events;
	ev.data.ptr = si;
	if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, si->sockfd, &ev)) {
		err_exit("epoll_ctl add");
	}
	return 0;
}
 
 
int del_event(struct reactor* eventloop, sockitem* si) {
	if (NULL == si) return 1;
 
	if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, si->sockfd, NULL)) {
		err_exit("epoll_ctl del");
	}
 
	free(si);
	return 0;
}
 
 
int mod_event(struct reactor* eventloop, int events, sockitem* si) {
	struct epoll_event ev;
 
	Site is undergoing maintenance = events;
	ev.data.ptr = si;
	if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, si->sockfd, &ev)) {
		err_exit("epoll_ctl mod");
	}
	return 0;
 
}
 
int accept_cb(int fd, int events, void* arg) {
	struct sockitem* si = (struct sockitem*)arg;
	socklen_t cli_addr_len = sizeof(struct sockaddr_in);
	struct sockaddr_in cli_addr;
	char ip_buff[INET_ADDRSTRLEN] = {0};
 
	int cli_fd = accept(fd, (SA*)&cli_addr, &cli_addr_len);
	setnoblock(cli_fd);//非阻塞, 避免因为一条连接的事件阻塞其他连接得不到处理
	if (-1 == cli_fd) {
		err_exit("accept");
	}
 
	printf("recv from ip %s at port %d\n", inet_ntop(AF_INET, &cli_addr.sin_addr, ip_buff, sizeof(ip_buff)),
		ntohs(cli_addr.sin_port));
	//封装si事件
	struct sockitem* newsi = init_sockitem(si->eventloop, cli_fd, recv_cb);
	//将si 放入eventloop事件循环
	add_event(si->eventloop, EPOLLIN | EPOLLET, newsi);
 
	return cli_fd;
}
 
void task_run(void* arg) {
	Task* task = (Task*)arg;
	//拿取任务执行
	struct sockitem* si = (struct sockitem*)task->user_data;
	char* data = si->recvbuffer;
	//处理数据, 进行运算
	int i = 0;
	char op;
	int lhs = 0, rhs = 0;
	int flag = 0;//标记lhs, rhs
	int ans = 0;
	while (data[i]) {
		if (data[i] >= '0' && data[i] <= '9') {
			if (!flag) {
				for (; data[i] >= '0' && data[i] <= '9'; ++i) {
					lhs = lhs * 10 + (data[i] - '0');
				}
				flag = 1;
				i -= 1;
			} else {
				for (; data[i] >= '0' && data[i] <= '9'; ++i) {
					rhs = rhs * 10 + (data[i] - '0');
				}
				break;
			}
		} else if (data[i] == ' ') {
			i += 1;
      		continue;
		} else {
			op = data[i];
		} 
		i += 1;
	}
	switch(op) {
		case '+' : {
			ans = lhs + rhs;
		} break;
		case '-' : {
			ans = lhs - rhs;
		} break;
		case '*' : {
			ans = lhs * rhs;
		} break;
		case '/' : {
			ans = lhs / rhs;
		} break;
	}
  	printf("%d %c %d = %d\n", lhs, op, rhs, ans);
	sprintf(si->sendbuffer, "%d %c %d = %d\n", lhs, op, rhs, ans);
	//数据写入到sendbuffer种去
	si->slen = strlen(si->sendbuffer);
	memset(si->recvbuffer, 0, BUFFSIZE);//清空读缓冲区.
	si->rlen = 0;
	si->callback = send_cb;
	mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
	free(task);
}
 
int recv_cb(int fd, int events, void* arg) {
	struct sockitem* si = (struct sockitem*)arg;
	struct epoll_event ev;
  	int ret = 0;
	while (1) {
	  ret = recv(fd, si->recvbuffer, BUFFSIZE, 0);
		if (ret < 0) {
			if (errno == EINTR) {//信号打断
				continue;	
			}
			if (errno == EWOULDBLOCK) {//写缓冲区满了
				break;
			}
			del_event(si->eventloop, si);
			close(fd);
			err_exit("recv");//出错了
 
		} else if (ret == 0) {
			//对端断开连接
			printf("fd %d disconnect\n", fd);
			del_event(si->eventloop, si);
			close(fd);
			return 0;
		} else {
			break;
		}
	}
	//接收到数据之后进行处理, 将其抛入到线程池处理
	#if 0
	//打印接收到的数据
	printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
	//设置sendbuffer
	si->rlen = ret;
	memcpy(si->sendbuffer, si->recvbuffer, si->rlen);
	si->slen = si->rlen;
  	//清空recvbuffer
  	memset(si->recvbuffer, 0,BUFFSIZE);
	si->callback = send_cb;
	mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
	#elif 1
	//如何抛入到线程池进行处理?
	/*
		假设业务逻辑是: 将字符串转换为算式进行计算
	*/
	printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
	si->rlen = ret;
	Task* task = (Task*)malloc(sizeof(Task));
	//先创建任务结构体
	task->next = task->pre = NULL;
	task->task_run = task_run;
	task->user_data = (void*)si; 
 
	thread_pool_push_task(si->eventloop->pool, task);
 
	#endif
	return 0;
}
 
int send_cb(int fd, int events, void* arg) {
	struct sockitem* si = (struct sockitem*)arg;
	while (1) {
		int n = send(fd, si->sendbuffer, BUFFSIZE, 0);
		if (-1 == n) {
			if (errno == EINTR) {//信号打断
				continue;	
			}
			if (errno == EWOULDBLOCK) {//写缓冲区满了
				break;
			}
			//出错了
			err_exit("send");
		}
		printf("send %d bytes\n", n);
    //每一次send之后从新将senbuffer置为空
    memset(si->sendbuffer, 0, BUFFSIZE);
		break;//正常写完
	}
 
	si->callback = recv_cb;
 
	mod_event(si->eventloop, EPOLLIN | EPOLLET, si);
 
}
 
//事件循环一次
void eventloop_once(struct reactor* eventloop) {
	int nready = epoll_wait(eventloop->epfd, eventloop->events, MAX_N, -1);
	int i = 0;
	for (i = 0; i < nready; ++i) {
		int mask = 0;
		struct epoll_event* ev= &eventloop->events[i];
		if (ev->events & EPOLLIN) mask |= EPOLLIN;
        if (ev->events & EPOLLOUT) mask |= EPOLLOUT;
        //将EPOLLERR + EPOLLHUP 的处理交付到io函数中进行处理, 放到回调中处理
        if (ev->events & EPOLLERR) mask |= EPOLLIN|EPOLLOUT;
        if (ev->events & EPOLLHUP) mask |= EPOLLIN|EPOLLOUT; 
 
        if (mask & EPOLLIN) {
        	struct sockitem* si = (struct sockitem*)ev->data.ptr;
        	si->callback(si->sockfd, mask, si);
        }
 
        if (mask & EPOLLOUT) {
        	struct sockitem* si = (struct sockitem*)ev->data.ptr;
        	si->callback(si->sockfd, mask, si);
        }
	}
 
}
 
 
//开启事件循环
void start_eventloop(struct reactor* eventloop) {
	while (!eventloop->stop) {
		eventloop_once(eventloop);
	}
}
 
 
//停止事件循环
void stop_eventloop(struct reactor* eventloop) {
	eventloop->stop = 1;
}
 
#endif

reator.c

#include "reactor.h"
 
int init_sock(short port) {
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
 
	if (-1 == sockfd) {
		err_exit("socket");
	}
 
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = INADDR_ANY;
 
	if (-1 == bind(sockfd, (SA*)&addr, sizeof(addr))) {
		err_exit("bind");
	}
 
	if (-1 == listen(sockfd, 5)) {
		err_exit("listen");
	}
 
	return sockfd;
}
 
int main(int argc, char* argv[]) {
	if (2 != argc) {
		fprintf(stderr, "usage: %s <port>", argv[0]);
		exit(EXIT_FAILURE);
	}
 
	short port = atoi(argv[1]);
 
	int sockfd = init_sock(port);
	//至此完成了网络连接了
 
	struct reactor* mainloop = init_reactor(MAX_N);
 
	//封装listen的 
	struct sockitem* si = init_sockitem(mainloop, sockfd, accept_cb);
	add_event(mainloop, EPOLLIN, si);//对于监视套接字一般是水平触发
 
	start_eventloop(mainloop);
 
	//回收资源
 
	release_reactor(mainloop);
	return 0;
}
加客服微信:qiushu0517,开通VIP下载权限!