CGraph源码导读与线程池优化研读
更好的排版请阅读: https://we5lw6jk7r.feishu.cn/wiki/IhxUwMQ0gi8SjbkeIFyctZusnoc?from=from_copylink
CGraph
核心主工程代码
GPipeline: 表示一个任务流的核心数据结构
_class_ GPipeline : _public_ CObject {
_public:_
/**
* 初始化pipeline信息
* _@return_
*/
CSTATUS init() override;
/**
* 执行pipeline信息
* _@return_
*/
CSTATUS run() override;
/**
* 逆初始化pipeline信息
* _@return_
*/
CSTATUS deinit() override;
/**
* 一次性执行完成初始化,执行runTimes次,和逆初始化的过程
* _@param_ _runTimes_
* _@return_
*/
CSTATUS process(_int_ _runTimes_ = 1);
/**
* 根据传入的info信息,创建node节点
* _@tparam_ T
* _@param_ _info_
* _@return_
*/
_template_<_typename_ T>
GElementPtr createGNode(const GNodeInfo &_info_);
/**
* 根据传入的信息,创建Group信息
* _@tparam_ T
* _@param_ _elements_
* _@param_ _dependElements_
* _@param_ _name_
* _@param_ _loop_
* _@return_
*/
_template_<_typename_ T>
GElementPtr createGGroup(const GElementPtrArr &_elements_,
const GElementPtrSet &_dependElements_ = std::initializer_list<GElementPtr>(),
const std::string &_name_ = "",
_int_ _loop_ = 1);
/**
* 在图中注册一个Element信息
* 如果注册的是可执行节点,则内部自动生成;如果注册的是不可执行节点(如region或cluster),则需外部提前生成,然后注册进来
* _@tparam_ T
* _@param_ _elementRef_
* _@param_ _dependElements_
* _@param_ _name_
* _@param_ _loop_
* _@return_
*/
_template_<_typename_ T>
CSTATUS registerGElement(GElementPtr *_elementRef_,
const GElementPtrSet &_dependElements_ = std::initializer_list<GElementPtr>(),
const std::string &_name_ = "",
_int_ _loop_ = 1);
_protected:_
explicit GPipeline();
~GPipeline() override;
/**
* element元素,添加依赖节点信息
* _@param_ _element_
* _@param_ _dependElements_
* _@return_
*/
CSTATUS addDependElements(GElementPtr _element_,
const std::set<GElementPtr> &_dependElements_) const;
_private:_
_bool_ is_init_; // 标志位
GElementManagerPtr element_manager_; // 节点管理类(管理所有注册过的element信息)
GraphThreadPoolPtr thread_pool_; // 线程池类
GElementPtrSet element_repository_; // 标记创建的所有节点,最终释放使用
GParamManagerPtr param_manager_;
friend _class_ GPipelineFactory;
};
工厂类 GPipelineFactory: 基于工厂模式来生成对应的 Pipeline, 并用于保存 pipeline 内容
- 工厂模式的代码其实都可以用 static 去实现,连保存的 Pipeline 列表也要用 static 存储,可以实现全局共享!
_class_ GPipelineFactory : _public_ CObject {
_public:_
/**
* 创建一个pipeline信息
* _@return_
*/
static GPipelinePtr create();
/**
* 销毁一个pipeline信息
* _@return_
*/
static _void_ destroy(GPipelinePtr _pipeline_);
/**
* 清空所有的pipeline信息
*/
static _void_ clear();
_protected:_
CSTATUS run();
GPipelineFactory();
_private:_
static GPipelinePtrList pipeline_list_; // 记录所有的
static std::mutex lock_;
};
函数 registerGElement():用于注册一个新的 Element 节点到 element_manager_中
- 先判断 Element 是否存在于 element_manager_,如果存在就要删掉,然后重新进行生成
- 设置当前 Element 节点的 ParamManager,每个 Pipeline 只有一个 ParamManager
- 传入 name 表示当前节点的名字
- 传入 loop 表示当前节点需要被循环遍历的次数
-
【核心功能】传入依赖的 element 集合,然后在 DAG 图中将这些依赖的 element 进行处理
- 无法依赖空的 element
- 无法依赖自身 element
- 需要将被依赖的 element 的 run_before_(set<GElement*>)数据结构中加入当前的 element
- 需要在当前 element 中加入被依赖的 element
- 最后对当前的 element 设定 left_depend_字段,表示距离这个 element 可以执行还有多少任务的依赖!
CSTATUS GPipeline::addDependElements(GElementPtr element, const std::set
// TODO 这个功能可以下沉到element里去实现
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(_element_)
for (GElementPtr cur : _dependElements_) {
// 如果传入的信息中,有nullptr,则所有的信息均不参与计算
CGRAPH_ASSERT_NOT_NULL(cur);
}
for (GElementPtr cur : _dependElements_) {
if (cur == _element_) {
continue; // 本节点无法依赖本节点
}
cur->run_before_.insert(_element_);
_element_->dependence_.insert(cur);
}
_element_->left_depend_ = _element_->dependence_.size();
CGRAPH_FUNCTION_END
}
- 在element_manager_中加入当前element
- 在element_repository_中加入当前的element
```cpp
_template_<_typename_ T>
CSTATUS GPipeline::registerGElement(GElementPtr *_elementRef_,
const GElementPtrSet &_dependElements_,
const std::string &_name_,
_int_ _loop_) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
if (element_manager_->hasElement(*_elementRef_)) {
element_manager_->deleteElement(*_elementRef_); // 每次注册,都默认为是新的节点
}
/**
* 如果是GNode类型,则直接创建节点
* 如果不是GNode类型,则需要外部创建好,然后注册进来
* */
if (std::is_base_of<GNode, T>::value) {
(*_elementRef_) = new(std::nothrow) T();
}
CGRAPH_ASSERT_NOT_NULL(*elementRef)
status = (*_elementRef_)->setParamManager(this->param_manager_);
CGRAPH_FUNCTION_CHECK_STATUS
(*_elementRef_)->setName(_name_);
(*_elementRef_)->setLoop(_loop_);
status = addDependElements(*_elementRef_, _dependElements_);
CGRAPH_FUNCTION_CHECK_STATUS
status = element_manager_->addElement(dynamic_cast<GElementPtr>(*_elementRef_));
CGRAPH_FUNCTION_CHECK_STATUS
element_repository_.insert(*_elementRef_);
CGRAPH_FUNCTION_END
}
函数 init():初始化并 mock 执行以分析整张 DAG 图
-
检查核心成员变量是否存在
- thread_pool_
- element_manager_
- param_maneger_
-
初始化 element_manager_:用于管理整张图的 Element
- 第一步,判断 DAG 图的每个节点是否是独立联通的 isLinkable,【每个节点只能有一个依赖,每个节点的依赖节点只有一个后继节点且为当前(不能并行???)】
- 第二步,分析 DAG 图【只会遍历分析 left_depend_小于等于 0 的节点,且节点不是 isLinkable 的】
- 针对每个 element,都会新建一个 GCluster,并将当前遍历到的节点加入到 GCluster 中,然后不断遍历节点的后续节点,一起放到 GCluster 中。【这里似乎是一个 DFS 的遍历逻辑,如果一个节点有多个后继节点,只会选择第一个联通的 element 放入 GCluster 中】
- 总共会创建 element 数量个 GCluster,多个 Cluster 理论上应该是可以并行的?放置在临时 vector 变量 curClusterArr 中,同时 para_cluster_arrs_需要将这个 curClusterArr 加入状态中?
- 拷贝 curClusterArr 为 runnableClusterArr 并清空 curClusterArr,其中每个 GCluster 都要进行一次 mock 执行,不会执行实际的 Run,只是做一些逻辑上的处理,然后每次都构建新的 curClusterArr,不太清楚为什么要这么构造
- 第三步,初始化所有 element
函数 run():异步执行 DAG 图上的任务
-
检查核心字段是否正常
- is_init_
- thread_pool_
- element_manager_
- param_maneger_
- 构建一个 future
的数组 -
遍历可以并行的 cluster 数组,每次都要把 futures 数组清空,然后再遍历 GCluster
- 针对每个 GCluster,需要让 Pipeline 的线程池去 commit GCluster【本质上就是任务添加到队列中,队列的内容就是一个个 lambda 函数,如果线程池中空闲的线程少了,就增加点新的线程】,返回的 future 会被加入到 futures 中
- 遍历所有的 futures,用于获取状态【不知道为什么】
CSTATUS GPipeline::run() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
CGRAPH_ASSERT_NOT_NULL(thread_pool_)
CGRAPH_ASSERT_NOT_NULL(element_manager_)
CGRAPH_ASSERT_NOT_NULL(param_manager_)
int runElementSize = 0;
std::vector<std::future<CSTATUS>> futures;
for (GClusterArr& clusterArr : element_manager_->para_cluster_arrs_) {
futures.clear();
for (GCluster& cluster : clusterArr) {
futures.push_back(std::move(this->thread_pool_->commit(cluster)));
runElementSize += cluster.getElementNum();
}
for (auto& fut : futures) {
status = fut.get();
CGRAPH_FUNCTION_CHECK_STATUS
}
}
param_manager_->reset();
status = element_manager_->afterRunCheck(runElementSize);
CGRAPH_FUNCTION_END
}
函数 deinit():用于逆初始化,结束计算
- 核心的内容是对 param_manager_的哈希表进行清空,删除 value 指针的对象,然后将整个哈希表 clear
函数 process(): init+run+deinit 的集合
函数 createGGroup(): 用于创建一个节点组合
- 创建一种 GGroup,可以是 GCluster 或者是 GCondition,然后将每一个 element 加入 vector 中
函数 createGNode():用于创建一个自定义的节点
- GNode 是继承 GElement 的一个对象,可以根据 Info 来创建
核心成员变量
- bool is_init_:表示当前的 pipeline 是否被初始化了
- GElementManagerPtr element_manager_: 用于管理所有的 element
- GraphThreadPoolPtr thread_pool_: 用于放置线程池
- GElementPtrSet element_repository_: 用于标记所有被创建的节点,可以在最终释放的时候被使用!
- GParamManagerPtr param_manager_:参数管理器,整个 pipeline 会全局共享所有的参数
GElement: 表示一个 DAG 图中的节点
管理类 GElementManager
函数 preRunCheck():用于在执行之前检查每个 Element 是否为 linkable 的
CSTATUS GElementManager::preRunCheck() {
CGRAPH_FUNCTION_BEGIN
/**
* 认定图可以连通的判定条件:
* 1,当前节点仅有一个依赖
* 2,当前节点依赖的节点,只有一个后继
* 3,当前节点的依赖的后继,仍是当前节点
*/
for (GElement* element : manager_elements_) {
if (1 == element->dependence_.size()
&& 1 == (*element->dependence_.begin())->run_before_.size()
&& (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end()) {
element->linkable_ = true;
}
}
CGRAPH_FUNCTION_END
}
函数 analyse(): 用于分析一张 DAG 图中如何构建多个支持并行的 Cluster
核心成员变量
- GElementPtrSet manager_elements_; // 保存节点信息的内容
- ParaWorkedClusterArrs para_cluster_arrs_; // 可以并行的 cluster 数组
核心成员变量
- bool done_ { false }; // 判定被执行结束
- bool is_init_ { false }; // 是否初始化了
- bool linkable_ { false }; // 判定是否可以连通计算
- int loop_ { 1 }; // 节点执行次数
- std::string name_; // 节点名称
- std::string session_; // 节点唯一 id 信息
- std::set<GElement *> run_before_; // 被依赖的节点
- std::set<GElement *> dependence_; // 依赖的节点信息
- std::atomic
left_depend_{ 0 }; // 当 left_depend_ 值为 0 的时候,即可以执行该 node 信息 - GParamManagerPtr param_manager_; // 整体流程的参数管理类,所有 pipeline 中的所有节点共享
GGroup: 表示节点组合的一个基类, 所有节点组合的功能都来自于此类
GCluster: 表示 DAG 图中的一个簇(可以线性执行的几个 element)
- 线性同步执行的一个组合
CSTATUS GCluster::run() {
CGRAPH_FUNCTION_BEGIN
for (GElementPtr element : this->cluster_elements_) {
_int_ elementLoop = element->loop_;
while (elementLoop--) {
// element需要被执行loop次
status = element->run();
CGRAPH_FUNCTION_CHECK_STATUS
}
}
CGRAPH_FUNCTION_END
}
GCondition: 表示 DAG 图中的一个条件类, 选择 group 中的其中一个 element 执行
- 核心思想,在 GCondition 进行 run()的时候,根据 choose()函数来选择对应的执行 element
CSTATUS GCondition::run() {
CGRAPH_FUNCTION_BEGIN
_int_ loop = 0;
_int_ index = this->choose();
if (GROUP_LAST_ELEMENT_INDEX == index && this->condition_elements_.size() > 0) {
// 如果返回-1,则直接执行最后一个条件(模仿default功能)
loop = condition_elements_.back()->loop_;
while (loop--) {
status = condition_elements_.back()->run();
CGRAPH_FUNCTION_CHECK_STATUS
}
} else if (0 <= index && index < condition_elements_.size()) {
// 如果返回的内容,在元素范围之内,则直接执行元素的内容
loop = condition_elements_[index]->loop_;
while (loop--) {
status = condition_elements_[index]->run();
CGRAPH_FUNCTION_CHECK_STATUS
}
} else {
// 设定的index不在范围内,返回错误信息
status = STATUS_ERR;
}
CGRAPH_FUNCTION_END
}
- 自定义注册一个 choose()方法
_class_ MyParamCondition : _public_ GCondition {
_public:_
/**
* 在这里主要演示condition中可以通过获取上方参数的形式,
* 来决定执行执行当前的第几个逻辑
* _@return_
*/
_int_ choose () override {
MyParam* myParam = this->getGParam<MyParam>("param1");
if (nullptr == myParam) {
return GROUP_LAST_ELEMENT_INDEX; // 如果没获取到,固定执行最后一个逻辑
}
_int_ cnt = 0;
{
CGRAPH_PARAM_READ_CODE_BLOCK(myParam) // 如果当前算子,跟其他相关依赖算子不存在并行关系,则参数可以直接使用,不需要加锁
cnt = myParam->iCount;
}
return (cnt % getRange());
}
};
GRegion: 表示子图内部还是可以并行的
- 此时在 GRegion 内部需要从外部注入一个 Thread_pool 进去,一般来说是和整个 DAG 共用一个线程池
重写函数 run(): 和 Pipeline 的执行流程类似, 支持子图内并行
CSTATUS GRegion::run() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
CGRAPH_ASSERT_NOT_NULL(thread_pool_)
CGRAPH_ASSERT_NOT_NULL(manager_)
_int_ runNodeSize = 0;
std::vector<std::future<CSTATUS>> futures;
for (GClusterArr& clusterArr : manager_->para_cluster_arrs_) {
futures.clear();
for (GCluster& cluster : clusterArr) {
futures.push_back(std::move(this->thread_pool_->commit(cluster)));
runNodeSize += cluster.getElementNum();
}
for (_auto_& fut : futures) {
status = fut.get();
CGRAPH_FUNCTION_CHECK_STATUS
}
}
status = manager_->afterRunCheck(runNodeSize);
CGRAPH_FUNCTION_END
}
核心成员变量
- GElementManagerPtr manager_; // 在 GRegion 内部也是需要一个 ElementManager 管理器的,实例化一个 Region 的时候新建
- GraphThreadPoolPtr thread_pool_; // 这里需要使用 GPipeline 类的线程池
GraphThreadPool: 核心的线程池类
核心成员变量
- std::vectorstd::thread pool_; // 线程池
- std::queue
tasks_que_; // 任务队列 - std::mutex mtx_; // 同步
- std::condition_variable task_cond_; // 条件阻塞
- std::atomic
run_{ true }; // 线程池是否执行 - std::atomic
idl_thd_num_{ 0 }; // 空闲线程数量 - std::atomic
max_thd_num_{ MAX_THREAD_NUM }; // 最大线程数
run(): 线程空转函数
- 不断执行,每一次都是加锁后先进先出从队列中取一个 task【一般要用 std::move()实现对象的移动】,然后执行对应函数
- 如果当前队列为空或者整个线程处于 idle 没有被使用的状态,会阻塞在 task_cond_的逻辑中,直到其他线程发起通知才可以继续进行。
_int_ run() {
while (run_) {
TaskFunc curFunc = nullptr;
{
std::unique_lock<std::mutex> lock{ mtx_ };
task_cond_.wait(lock, [this]{
// 值为false的时候,会阻塞当前线程
// 收到其他线程通知后,值为true的时候,会解除阻塞
return (!run_ || !tasks_que_.empty());
});
if (!run_ && tasks_que_.empty()) {
return 0;
}
curFunc = std::move(tasks_que_.front()); // 按先进先出从队列取一个 task
tasks_que_.pop();
}
idl_thd_num_--;
if (curFunc) {
curFunc();
}
idl_thd_num_++;
}
return 0;
}
commit(): 将一个 Cluster 的内容加入到任务队列中
- 利用 std::bind()将 GCluster::process 和具体要执行的 GCluster 实例绑定在一起
- 利用 std::packaged_tack<int()> 制作一个异步执行任务,然后将任务和对应的 future 绑定在一起,可以通过 get_future()来获取对应的句柄
- 利用 std::make_shared<> 来构建 future 的智能指针,基于引用计数的方案可以实现 RAII
- 利用 std::lock_guard 的对锁的管理属于 RAII 风格用法(Resource Acquisition Is Initialization),在构造函数中自动绑定它的互斥体并加锁,在析构函数中解锁,大大减少了死锁的风险。
std::future<_int_> commit(const GCluster& _cluster_) {
_auto_ curTask = std::make_shared<std::packaged_task<_int_()>>(std::bind(&GCluster::process, _cluster_, false));
std::future<_int_> future = curTask->get_future();
{
// 添加任务到队列
std::lock_guard<std::mutex> lock{ mtx_ };
tasks_que_.push([curTask]() {
(*curTask)();
});
if (idl_thd_num_ < 1 && pool_.size() < max_thd_num_) {
// 如果空闲thread小于1,并且不超过max限制
addThread(1);
}
}
task_cond_.notify_one();
return future;
}
GParam: 表示参数的类
-
核心思想:读写锁管理一份共享的内存参数
- std::mutex 互斥锁 => std::lock_guardstd::mutex 实现 RAII
- std::shared_mutex 读写锁(共享级别:多个线程可以共享锁的所有权; 独占级别:仅有一个线程可以获得锁)
- std::shared_lockstd::shared_mutex:实现共享级别的 RAII
- std::unique_lockstd::shared_mutex: 实现互斥级别的 RAII
-
中心化存储所有的参数数据,函数是没有返回值的也不会有参数,所有的函数都要重新自己去注册,写起来似乎也不是很方便。
GAspect: 表示切面, 可以为某个 Element 增加装饰器
- 一般来说需要协商好切面实现的接口
- beginInit
- finishInit
- beginRun
- finishRun
- beginDestroy
- finishDestroy
_class_ MyTraceAspect : _public_ CGraph::GAspect {
_public:_
/**
* 实现几个切面函数逻辑,模拟在对应执行的位置,打印trace信息
*/
CStatus beginInit() override {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] init begin ...", this->getName().c_str());
return CStatus();
}
CVoid finishInit(const CStatus& _curStatus_) override {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] init finished, error code is [%d] ...", this->getName().c_str(),
_curStatus_.getCode());
}
CStatus beginRun() override {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run begin ...", this->getName().c_str());
return CStatus();
}
CVoid finishRun(const CStatus& _curStatus_) override {
if (!_curStatus_.isOK()) {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run finished, status is ok ...", this->getName().c_str());
} else {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run finished, error code is [%d] ...", this->getName().c_str(),
_curStatus_.getCode());
}
}
CStatus beginDestroy() override {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] destroy begin ...", this->getName().c_str());
return CStatus();
}
CVoid finishDestroy(const CStatus& _curStatus_) override {
CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] destroy finished, error code is [%d] ...", this->getName().c_str(),
_curStatus_.getCode());
}
};
- 本质上就是在执行之前,根据反射方法找到当前需要执行的 Aspect 类型,然后调用执行即可
CStatus GElement::fatProcessor(const CFunctionType& type, CSize loop) {
CGRAPH_FUNCTION_BEGIN
while (loop--) {
switch (type) {
case CFunctionType::RUN: {
/** 执行带切面的run方法 */
status = doAspect(GAspectType::BEGIN_RUN);
CGRAPH_FUNCTION_CHECK_STATUS
do {
status = run();
/**
* 如果状态是ok的,并且被条件hold住,则循环执行
* 默认所有element的isHold条件均为false,即不hold,即执行一次
* 可以根据需求,对任意element类型,添加特定的isHold条件
* */
} while (status.isOK() && this->isHold());
doAspect(GAspectType::FINISH_RUN, status);
break;
}
case CFunctionType::INIT: {
status = doAspect(GAspectType::BEGIN_INIT);
CGRAPH_FUNCTION_CHECK_STATUS
status = init();
doAspect(GAspectType::FINISH_INIT, status);
break;
}
case CFunctionType::DESTROY: {
status = doAspect(GAspectType::BEGIN_DESTROY);
CGRAPH_FUNCTION_CHECK_STATUS
status = destroy();
doAspect(GAspectType::FINISH_DESTROY, status);
break;
}
default:
CGRAPH_RETURN_ERROR_STATUS("get function type error")
}
}
CGRAPH_FUNCTION_END
}
GDaemon: 表示每隔几秒之后就要执行一次逻辑
- std::async:C++11 提出的异步逻辑,会返回一个 std::future
- 参数 1:启动策略
- std::launch::async: 保证异步行为,异步函数将会在独立的线程中执行
- std::launch::deferred: 当其他线程调用 get()来访问状态的时候,将调用非异步行为
-
std::launch::async std::launch::deferred: 由系统来决定是否异步执行
- 参数 2:实际的异步函数和回调返回
- 直接使用函数
- 使用函数对象 【比如在类中重载一个 operator() 】
- lambda 表达式
- 参数 1:启动策略
_template_<_typename_ FunctionType>
CVoid start(CMSec _interval_, const FunctionType& _task_) {
if (!is_stop_) {
return; // 如果正在执行中,则无法继续执行
}
is_stop_ = false;
/**
* std::launch::async:在调用async就开始创建线程。
* std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。
* 后期会考虑将这个功能融合到线程池中去
*/
future_ = std::async(std::launch::async, [this, _interval_, _task_]() {
while (!is_stop_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
const _auto_& result = cv_.wait_for(lk, std::chrono::milliseconds(_interval_));
if (std::cv_status::timeout == result) {
_task_();
}
}
});
}
DAG 图调度优化
DAG 图的最大并发量计算
- 第一步:求出全路径
- 第二步:根据全路径,得出所有可达点对
- 第三步,根据上面的点集信息,得到对应的二维矩阵,其中 A 可以到达 BCDEF,B 可以到达 ADEF
- 第四步,求出二维矩阵的最大独立集【独立集是一个顶点的集合,集合中的顶点两两不相邻】
退出、暂停和恢复机制
线程池优化方案
-
C++ 线程池的问题
- C++11 之前线程基本是没有标准库线程可以用的
- threadpool 和 barrier 需要自己去实现
-
任务队列
- 存放的元素都是一个个自定义输入输出的任务,任务入队之前要对 queue 的尾部加锁
- 几个 thread 会竞争获取任务队列的头部任务,任务出队的时候要对 queue 的头部加锁
-
并发优化的基本点
- 增加扇入扇出
- 增加负载
-
目标
- 开箱即用,使用 std 库手工实现
- 简单易用,任务队列可以支持任意格式的任务
- 性能强
local-thread 机制: 本地队列(类似 GMP 中的 MP 模型)
- 修改方案:
- n 个线程都有私有的本地线程队列,线程执行任务的时候就不需要加锁争抢,直接从本地任务队列中获取即可。
- 本线程中产生的 Task 尽可能放在本线程的 Queue 中执行【Golang 的 GMP 也实现了这个】
local-free 机制: 无锁编程
- 无锁编程的实现方案
- 基于 atomic 的原子操作 实现临界区的修改,本质上是在执行原子指令(比如 i++)的时候,变量内部维护了一个自旋锁(确保当前的线程不会被切换)和一个 CAS(一种乐观锁,确保写入的数值是正确的)
- 基于内部封装的 mutex
- 基于 CAS 机制 【Compare-And-Swap】
work-stealing 机制: GMP 中的实现也存在这个方案
- 当某个线程的本地执行队列为空的时候,从另一个线程的本地执行队列中 steal 一些任务
- 一般来说都是按照 thread 的序号顺序,thread5 会按 thread6, thread7, thread0, … 这样的顺序来偷取
-
每个 thread 执行的任务顺序
- 本地队列中的任务
- 线程池 pool 中的任务
- 从其他队列 steal 来的任务
-
缺点和优化方案
- 为了偷取一个任务,需要遍历所有的 thread 的本地队列来实现偷取,效率太低 => 只考虑从相邻的 3 个线程中偷取
自动扩缩容机制: 弹性伸缩 thread(machine)机制
- 核心思想:在负载繁忙的时候,多增加一些 thread;当负载空闲的时候,就对 thread 进行自动回收
-
主线程(PrimaryThread)和辅助线程(SecondaryThread)和监控线程(MonitorThread)
- 主线程恒定不变
- 辅助线程可以根据负载繁忙情况来进行动态伸缩
- 监控线程会每隔 TTL 时间,轮询检测所有的主线程是否都在 running 状态,如果是就是忙碌状态,需要增加一个新的辅助线程;否则则认为 Pool 处于空闲状态,会销毁当前的辅助线程。
- 注意事项
- 线程不是越多越好的,因为线程的切换开销成本是比较大的
- 一般计算密集型任务开辟和 cpu 核心数(n)相同的线程数来执行
- 一般 I/O 密集型任务开辟 2n+1 个线程来执行
- 最佳线程数量 = ((线程等待时间 + 线程 CPU 时间)/线程 CPU 时间 )* CPU 数目
批量处理机制: 一次获取多个 Task 执行
- 主线程和辅助线程在从队列中获取/偷取任务的时候,整批整批地获取,减少争抢锁的次数,用于提高性能
-
注意事项
- 这种机制会打乱 queue 中任务的执行顺序 => 解决方案:DAG 图调度的场景中,放入线程池中执行的任务,均是“互不依赖”的,有依赖的任务节点会等被依赖的节点执行完毕之后才会放入 Pool 中
负载均衡机制
- 核心机制
- 尽可能保证当前主线程产生的子 Task 会被放入到当前的队列中
- 尽可能保证每个主线程队列中所有任务的总耗时基本一致
避免忙等待(避免阻塞等待, try_lock+yield)
-
这个核心思想在 Golang 和 Python 中均有体现
- yield 类似 return,在 Python 中可以基于此来实现一个生成器,调用 next()会从 yield 返回的位置开始执行。这里的 yield 类似 Golang 的 runtime.Gosched()
- try_lock 在 Golang 中也有实现,如果此时业务不能拿到锁,也不要让他阻塞等待着,而是利用 yield 来让出 CPU
-
忙等待的方法主要运用在自旋锁场景中,可以用于 CAS 操作和 Atomic 操作中
- CAS 需要一致比较当前值和期望值是否一致
- Atomic 需要保证当前执行线程不退出
分支预测优化(少见)
减少无用的 Copy(std::move+emplace)
- std::move 用于直接转移对象的内容
-
指针的选型要避免 shared_ptr 和 unique_ptr 不断多次申请和来回赋值的问题
- 禁止内部使用 shared_ptr【本质上就是引用计数管理内存,当指向某个实例的指针引用计数降低到 0,会自动释放动态分配的资源】【shared_ptr 内部存在 CAS 校验机制,占据内存也比较大】【shared_ptr 是线程安全的,引用计数是存在锁机制的!】【拷贝使得对象的引用计数增加 1,赋值使得原对象引用计数减 1,当计数为 0 时,自动释放内存。后来指向的对象引用计数加 1,指向后来的对象】
- 采用 unique_ptr 来管理个别不定期申请和释放的资源【独享所有权的指针,不支持普通的拷贝和赋值操作,只能支持移动语义操作,如果转移一个 unique_ptr,会发生对象的所有权从源指针到目标指针的转移过程】
- weak_ptr【弱引用,避免循环引用的场景】:指向一个由 shared_ptr 管理的对象,只会引用但是不会计数,当一个对象同时被一个 shared_ptr 和一个 weak_ptr 引用,不管 weak_ptr 是否引用这个对象,shared_ptr 析构之后都会导致对象被释放
- auto_ptr【解决的是有异常抛出的时候发生内存泄漏的问题】:当抛出异常的时候,指针 p 指向的空间需要被合理回收。【auto_ptr 的构造函数是 explicit 函数,阻止指针的隐式转换,不能将一般的指针类型赋值给 auto_ptr,必须要用 auto_ptr 的构造函数来创建对象。被 explicit 修饰的构造函数的类,不能发生相应的隐式类型转换】【auto_ptr 析构函数中删除对象是用 delete,而不是 delete[],不能用于管理数组】
执行超时机制
- 需要限制单个线程的执行时间,做一个时间限定,如果超过了限定时间就进行阻塞并返回错误信息。
任务组(UTaskGroup)
- 一批任务完成之后再执行其他的任务
- 给同一批任务设定统一的等待时长
- 在多批任务执行结束的时候,固定执行某个回调逻辑
优先级和阻塞执行
- 用于给任务设置优先级