更好的排版请阅读: https://we5lw6jk7r.feishu.cn/wiki/IhxUwMQ0gi8SjbkeIFyctZusnoc?from=from_copylink

CGraph

核心主工程代码

GPipeline: 表示一个任务流的核心数据结构

_class_ GPipeline : _public_ CObject {

_public:_
    /**
     * 初始化pipeline信息
     * _@return_
     */
    CSTATUS init() override;

    /**
     * 执行pipeline信息
     * _@return_
     */
    CSTATUS run() override;

    /**
     * 逆初始化pipeline信息
     * _@return_
     */
    CSTATUS deinit() override;

    /**
     * 一次性执行完成初始化,执行runTimes次,和逆初始化的过程
     * _@param_ _runTimes_
     * _@return_
     */
    CSTATUS process(_int_ _runTimes_ = 1);

    /**
     * 根据传入的info信息,创建node节点
     * _@tparam_ T
     * _@param_ _info_
     * _@return_
     */
    _template_<_typename_ T>
    GElementPtr createGNode(const GNodeInfo &_info_);

    /**
     * 根据传入的信息,创建Group信息
     * _@tparam_ T
     * _@param_ _elements_
     * _@param_ _dependElements_
     * _@param_ _name_
     * _@param_ _loop_
     * _@return_
     */
    _template_<_typename_ T>
    GElementPtr createGGroup(const GElementPtrArr &_elements_,
                             const GElementPtrSet &_dependElements_ = std::initializer_list<GElementPtr>(),
                             const std::string &_name_ = "",
                             _int_ _loop_ = 1);

    /**
     * 在图中注册一个Element信息
     * 如果注册的是可执行节点,则内部自动生成;如果注册的是不可执行节点(如region或cluster),则需外部提前生成,然后注册进来
     * _@tparam_ T
     * _@param_ _elementRef_
     * _@param_ _dependElements_
     * _@param_ _name_
     * _@param_ _loop_
     * _@return_
     */
    _template_<_typename_ T>
    CSTATUS registerGElement(GElementPtr *_elementRef_,
                             const GElementPtrSet &_dependElements_ = std::initializer_list<GElementPtr>(),
                             const std::string &_name_ = "",
                             _int_ _loop_ = 1);

_protected:_
    explicit GPipeline();

    ~GPipeline() override;

    /**
     * element元素,添加依赖节点信息
     * _@param_ _element_
     * _@param_ _dependElements_
     * _@return_
     */
    CSTATUS addDependElements(GElementPtr _element_,
                              const std::set<GElementPtr> &_dependElements_) const;

_private:_
    _bool_ is_init_;                                       // 标志位
    GElementManagerPtr element_manager_;                 // 节点管理类(管理所有注册过的element信息)
    GraphThreadPoolPtr thread_pool_;                     // 线程池类
    GElementPtrSet element_repository_;                  // 标记创建的所有节点,最终释放使用
    GParamManagerPtr param_manager_;

    friend _class_ GPipelineFactory;
};

工厂类 GPipelineFactory: 基于工厂模式来生成对应的 Pipeline, 并用于保存 pipeline 内容

  • 工厂模式的代码其实都可以用 static 去实现,连保存的 Pipeline 列表也要用 static 存储,可以实现全局共享!
_class_ GPipelineFactory : _public_ CObject {

_public:_
    /**
     * 创建一个pipeline信息
     * _@return_
     */
    static GPipelinePtr create();

    /**
     * 销毁一个pipeline信息
     * _@return_
     */
    static _void_ destroy(GPipelinePtr _pipeline_);

    /**
     * 清空所有的pipeline信息
     */
    static _void_ clear();

_protected:_
    CSTATUS run();
    GPipelineFactory();

_private:_
    static GPipelinePtrList pipeline_list_;    // 记录所有的
    static std::mutex lock_;
};

函数 registerGElement():用于注册一个新的 Element 节点到 element_manager_中

  • 先判断 Element 是否存在于 element_manager_,如果存在就要删掉,然后重新进行生成
  • 设置当前 Element 节点的 ParamManager,每个 Pipeline 只有一个 ParamManager
  • 传入 name 表示当前节点的名字
  • 传入 loop 表示当前节点需要被循环遍历的次数
  • 【核心功能】传入依赖的 element 集合,然后在 DAG 图中将这些依赖的 element 进行处理

    • 无法依赖空的 element
    • 无法依赖自身 element
    • 需要将被依赖的 element 的 run_before_(set<GElement*>)数据结构中加入当前的 element
    • 需要在当前 element 中加入被依赖的 element
    • 最后对当前的 element 设定 left_depend_字段,表示距离这个 element 可以执行还有多少任务的依赖!

CSTATUS GPipeline::addDependElements(GElementPtr element, const std::set& _dependElements_) const { CGRAPH_FUNCTION_BEGIN

// TODO 这个功能可以下沉到element里去实现
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(_element_)

for (GElementPtr cur : _dependElements_) {
    // 如果传入的信息中,有nullptr,则所有的信息均不参与计算
    CGRAPH_ASSERT_NOT_NULL(cur);
}

for (GElementPtr cur : _dependElements_) {
    if (cur == _element_) {
        continue;        // 本节点无法依赖本节点
    }

    cur->run_before_.insert(_element_);
    _element_->dependence_.insert(cur);
}

_element_->left_depend_ = _element_->dependence_.size();

CGRAPH_FUNCTION_END

}


- 在element_manager_中加入当前element

- 在element_repository_中加入当前的element

```cpp
_template_<_typename_ T>
CSTATUS GPipeline::registerGElement(GElementPtr *_elementRef_,
                                    const GElementPtrSet &_dependElements_,
                                    const std::string &_name_,
                                    _int_ _loop_) {
    CGRAPH_FUNCTION_BEGIN
    CGRAPH_ASSERT_INIT(false)

    if (element_manager_->hasElement(*_elementRef_)) {
        element_manager_->deleteElement(*_elementRef_);    // 每次注册,都默认为是新的节点
    }

    /**
     * 如果是GNode类型,则直接创建节点
     * 如果不是GNode类型,则需要外部创建好,然后注册进来
     * */
    if (std::is_base_of<GNode, T>::value) {
        (*_elementRef_) = new(std::nothrow) T();
    }

    CGRAPH_ASSERT_NOT_NULL(*elementRef)
    status = (*_elementRef_)->setParamManager(this->param_manager_);
    CGRAPH_FUNCTION_CHECK_STATUS

    (*_elementRef_)->setName(_name_);
    (*_elementRef_)->setLoop(_loop_);
    status = addDependElements(*_elementRef_, _dependElements_);
    CGRAPH_FUNCTION_CHECK_STATUS

    status = element_manager_->addElement(dynamic_cast<GElementPtr>(*_elementRef_));
    CGRAPH_FUNCTION_CHECK_STATUS
    element_repository_.insert(*_elementRef_);
    CGRAPH_FUNCTION_END
}

函数 init():初始化并 mock 执行以分析整张 DAG 图

  • 检查核心成员变量是否存在

    • thread_pool_
    • element_manager_
    • param_maneger_
  • 初始化 element_manager_:用于管理整张图的 Element

    • 第一步,判断 DAG 图的每个节点是否是独立联通的 isLinkable,【每个节点只能有一个依赖,每个节点的依赖节点只有一个后继节点且为当前(不能并行???)】
    • 第二步,分析 DAG 图【只会遍历分析 left_depend_小于等于 0 的节点,且节点不是 isLinkable 的】
      • 针对每个 element,都会新建一个 GCluster,并将当前遍历到的节点加入到 GCluster 中,然后不断遍历节点的后续节点,一起放到 GCluster 中。【这里似乎是一个 DFS 的遍历逻辑,如果一个节点有多个后继节点,只会选择第一个联通的 element 放入 GCluster 中】
      • 总共会创建 element 数量个 GCluster,多个 Cluster 理论上应该是可以并行的?放置在临时 vector 变量 curClusterArr 中,同时 para_cluster_arrs_需要将这个 curClusterArr 加入状态中?
      • 拷贝 curClusterArr 为 runnableClusterArr 并清空 curClusterArr,其中每个 GCluster 都要进行一次 mock 执行,不会执行实际的 Run,只是做一些逻辑上的处理,然后每次都构建新的 curClusterArr,不太清楚为什么要这么构造
    • 第三步,初始化所有 element

函数 run():异步执行 DAG 图上的任务

  • 检查核心字段是否正常

    • is_init_
    • thread_pool_
    • element_manager_
    • param_maneger_
  • 构建一个 future的数组
  • 遍历可以并行的 cluster 数组,每次都要把 futures 数组清空,然后再遍历 GCluster

    • 针对每个 GCluster,需要让 Pipeline 的线程池去 commit GCluster【本质上就是任务添加到队列中,队列的内容就是一个个 lambda 函数,如果线程池中空闲的线程少了,就增加点新的线程】,返回的 future 会被加入到 futures 中
    • 遍历所有的 futures,用于获取状态【不知道为什么】
CSTATUS GPipeline::run() {
    CGRAPH_FUNCTION_BEGIN

    CGRAPH_ASSERT_INIT(true)
    CGRAPH_ASSERT_NOT_NULL(thread_pool_)
    CGRAPH_ASSERT_NOT_NULL(element_manager_)
    CGRAPH_ASSERT_NOT_NULL(param_manager_)

    int runElementSize = 0;
    std::vector<std::future<CSTATUS>> futures;

    for (GClusterArr& clusterArr : element_manager_->para_cluster_arrs_) {
        futures.clear();

        for (GCluster& cluster : clusterArr) {
            futures.push_back(std::move(this->thread_pool_->commit(cluster)));
            runElementSize += cluster.getElementNum();
        }

        for (auto& fut : futures) {
            status = fut.get();
            CGRAPH_FUNCTION_CHECK_STATUS
        }
    }

    param_manager_->reset();
    status = element_manager_->afterRunCheck(runElementSize);
    CGRAPH_FUNCTION_END
}

函数 deinit():用于逆初始化,结束计算

  • 核心的内容是对 param_manager_的哈希表进行清空,删除 value 指针的对象,然后将整个哈希表 clear

函数 process(): init+run+deinit 的集合

函数 createGGroup(): 用于创建一个节点组合

  • 创建一种 GGroup,可以是 GCluster 或者是 GCondition,然后将每一个 element 加入 vector 中

函数 createGNode():用于创建一个自定义的节点

  • GNode 是继承 GElement 的一个对象,可以根据 Info 来创建

核心成员变量

  • bool is_init_:表示当前的 pipeline 是否被初始化了
  • GElementManagerPtr element_manager_: 用于管理所有的 element
  • GraphThreadPoolPtr thread_pool_: 用于放置线程池
  • GElementPtrSet element_repository_: 用于标记所有被创建的节点,可以在最终释放的时候被使用!
  • GParamManagerPtr param_manager_:参数管理器,整个 pipeline 会全局共享所有的参数

GElement: 表示一个 DAG 图中的节点

管理类 GElementManager

函数 preRunCheck():用于在执行之前检查每个 Element 是否为 linkable 的

CSTATUS GElementManager::preRunCheck() {
    CGRAPH_FUNCTION_BEGIN

    /**
     * 认定图可以连通的判定条件:
     * 1,当前节点仅有一个依赖
     * 2,当前节点依赖的节点,只有一个后继
     * 3,当前节点的依赖的后继,仍是当前节点
     */
    for (GElement* element : manager_elements_) {
        if (1 == element->dependence_.size()
            && 1 == (*element->dependence_.begin())->run_before_.size()
            && (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end()) {
            element->linkable_ = true;
        }
    }

    CGRAPH_FUNCTION_END
}

函数 analyse(): 用于分析一张 DAG 图中如何构建多个支持并行的 Cluster

核心成员变量

  • GElementPtrSet manager_elements_; // 保存节点信息的内容
  • ParaWorkedClusterArrs para_cluster_arrs_; // 可以并行的 cluster 数组

核心成员变量

  • bool done_ { false }; // 判定被执行结束
  • bool is_init_ { false }; // 是否初始化了
  • bool linkable_ { false }; // 判定是否可以连通计算
  • int loop_ { 1 }; // 节点执行次数
  • std::string name_; // 节点名称
  • std::string session_; // 节点唯一 id 信息
  • std::set<GElement *> run_before_; // 被依赖的节点
  • std::set<GElement *> dependence_; // 依赖的节点信息
  • std::atomic left_depend_{ 0 }; // 当 left_depend_ 值为 0 的时候,即可以执行该 node 信息
  • GParamManagerPtr param_manager_; // 整体流程的参数管理类,所有 pipeline 中的所有节点共享

GGroup: 表示节点组合的一个基类, 所有节点组合的功能都来自于此类

GCluster: 表示 DAG 图中的一个簇(可以线性执行的几个 element)

  • 线性同步执行的一个组合
CSTATUS GCluster::run() {
    CGRAPH_FUNCTION_BEGIN
    for (GElementPtr element : this->cluster_elements_) {
        _int_ elementLoop = element->loop_;
        while (elementLoop--) {
            // element需要被执行loop次
            status = element->run();
            CGRAPH_FUNCTION_CHECK_STATUS
        }
    }

    CGRAPH_FUNCTION_END
}

GCondition: 表示 DAG 图中的一个条件类, 选择 group 中的其中一个 element 执行

  • 核心思想,在 GCondition 进行 run()的时候,根据 choose()函数来选择对应的执行 element
CSTATUS GCondition::run() {
    CGRAPH_FUNCTION_BEGIN

    _int_ loop = 0;
    _int_ index = this->choose();
    if (GROUP_LAST_ELEMENT_INDEX == index && this->condition_elements_.size() > 0) {
        // 如果返回-1,则直接执行最后一个条件(模仿default功能)
        loop = condition_elements_.back()->loop_;
        while (loop--) {
            status = condition_elements_.back()->run();
            CGRAPH_FUNCTION_CHECK_STATUS
        }
    } else if (0 <= index && index < condition_elements_.size()) {
        // 如果返回的内容,在元素范围之内,则直接执行元素的内容
        loop = condition_elements_[index]->loop_;
        while (loop--) {
            status = condition_elements_[index]->run();
            CGRAPH_FUNCTION_CHECK_STATUS
        }
    } else {
        // 设定的index不在范围内,返回错误信息
        status = STATUS_ERR;
    }

    CGRAPH_FUNCTION_END
}
  • 自定义注册一个 choose()方法
_class_ MyParamCondition : _public_ GCondition {

_public:_
    /**
     * 在这里主要演示condition中可以通过获取上方参数的形式,
     * 来决定执行执行当前的第几个逻辑
     * _@return_
     */
    _int_ choose () override {
        MyParam* myParam = this->getGParam<MyParam>("param1");
        if (nullptr == myParam) {
            return GROUP_LAST_ELEMENT_INDEX;    // 如果没获取到,固定执行最后一个逻辑
        }

        _int_ cnt = 0;
        {
            CGRAPH_PARAM_READ_CODE_BLOCK(myParam)    // 如果当前算子,跟其他相关依赖算子不存在并行关系,则参数可以直接使用,不需要加锁
            cnt = myParam->iCount;
        }
        return (cnt % getRange());
    }
};

GRegion: 表示子图内部还是可以并行的

  • 此时在 GRegion 内部需要从外部注入一个 Thread_pool 进去,一般来说是和整个 DAG 共用一个线程池

重写函数 run(): 和 Pipeline 的执行流程类似, 支持子图内并行

CSTATUS GRegion::run() {
    CGRAPH_FUNCTION_BEGIN
    CGRAPH_ASSERT_INIT(true)
    CGRAPH_ASSERT_NOT_NULL(thread_pool_)
    CGRAPH_ASSERT_NOT_NULL(manager_)

    _int_ runNodeSize = 0;
    std::vector<std::future<CSTATUS>> futures;

    for (GClusterArr& clusterArr : manager_->para_cluster_arrs_) {
        futures.clear();

        for (GCluster& cluster : clusterArr) {
            futures.push_back(std::move(this->thread_pool_->commit(cluster)));
            runNodeSize += cluster.getElementNum();
        }

        for (_auto_& fut : futures) {
            status = fut.get();
            CGRAPH_FUNCTION_CHECK_STATUS
        }
    }

    status = manager_->afterRunCheck(runNodeSize);
    CGRAPH_FUNCTION_END
}

核心成员变量

  • GElementManagerPtr manager_; // 在 GRegion 内部也是需要一个 ElementManager 管理器的,实例化一个 Region 的时候新建
  • GraphThreadPoolPtr thread_pool_; // 这里需要使用 GPipeline 类的线程池

GraphThreadPool: 核心的线程池类

核心成员变量

  • std::vectorstd::thread pool_; // 线程池
  • std::queue tasks_que_; // 任务队列
  • std::mutex mtx_; // 同步
  • std::condition_variable task_cond_; // 条件阻塞
  • std::atomic run_{ true }; // 线程池是否执行
  • std::atomic idl_thd_num_{ 0 }; // 空闲线程数量
  • std::atomic max_thd_num_{ MAX_THREAD_NUM }; // 最大线程数

run(): 线程空转函数

  • 不断执行,每一次都是加锁后先进先出从队列中取一个 task【一般要用 std::move()实现对象的移动】,然后执行对应函数
  • 如果当前队列为空或者整个线程处于 idle 没有被使用的状态,会阻塞在 task_cond_的逻辑中,直到其他线程发起通知才可以继续进行。
_int_ run() {
    while (run_) {
        TaskFunc curFunc = nullptr;
        {
            std::unique_lock<std::mutex> lock{ mtx_ };
            task_cond_.wait(lock, [this]{
                // 值为false的时候,会阻塞当前线程
                // 收到其他线程通知后,值为true的时候,会解除阻塞
                return (!run_ || !tasks_que_.empty());
            });

            if (!run_ && tasks_que_.empty()) {
                return 0;
            }

            curFunc = std::move(tasks_que_.front());    // 按先进先出从队列取一个 task
            tasks_que_.pop();
        }

        idl_thd_num_--;
        if (curFunc) {
            curFunc();
        }
        idl_thd_num_++;
    }

    return 0;
}

commit(): 将一个 Cluster 的内容加入到任务队列中

  • 利用 std::bind()将 GCluster::process 和具体要执行的 GCluster 实例绑定在一起
  • 利用 std::packaged_tack<int()> 制作一个异步执行任务,然后将任务和对应的 future 绑定在一起,可以通过 get_future()来获取对应的句柄
  • 利用 std::make_shared<> 来构建 future 的智能指针,基于引用计数的方案可以实现 RAII
  • 利用 std::lock_guard 的对锁的管理属于 RAII 风格用法(Resource Acquisition Is Initialization),在构造函数中自动绑定它的互斥体并加锁,在析构函数中解锁,大大减少了死锁的风险。
std::future<_int_> commit(const GCluster& _cluster_) {
    _auto_ curTask = std::make_shared<std::packaged_task<_int_()>>(std::bind(&GCluster::process, _cluster_, false));
    std::future<_int_> future = curTask->get_future();
    {
        // 添加任务到队列
        std::lock_guard<std::mutex> lock{ mtx_ };
        tasks_que_.push([curTask]() {
            (*curTask)();
        });

        if (idl_thd_num_ < 1 && pool_.size() < max_thd_num_) {
            // 如果空闲thread小于1,并且不超过max限制
            addThread(1);
        }
    }

    task_cond_.notify_one();
    return future;
}

GParam: 表示参数的类

  • 核心思想:读写锁管理一份共享的内存参数

    • std::mutex 互斥锁 => std::lock_guardstd::mutex 实现 RAII
    • std::shared_mutex 读写锁(共享级别:多个线程可以共享锁的所有权; 独占级别:仅有一个线程可以获得锁)
  • 中心化存储所有的参数数据,函数是没有返回值的也不会有参数,所有的函数都要重新自己去注册,写起来似乎也不是很方便。

GAspect: 表示切面, 可以为某个 Element 增加装饰器

  • 一般来说需要协商好切面实现的接口
    • beginInit
    • finishInit
    • beginRun
    • finishRun
    • beginDestroy
    • finishDestroy
_class_ MyTraceAspect : _public_ CGraph::GAspect {
_public:_
    /**
     * 实现几个切面函数逻辑,模拟在对应执行的位置,打印trace信息
     */
    CStatus beginInit() override {
        CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] init begin ...", this->getName().c_str());
        return CStatus();
    }

    CVoid finishInit(const CStatus& _curStatus_) override {
        CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] init finished, error code is [%d] ...", this->getName().c_str(),
                            _curStatus_.getCode());
    }

    CStatus beginRun() override {
        CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run begin ...", this->getName().c_str());
        return CStatus();
    }

    CVoid finishRun(const CStatus& _curStatus_) override {
        if (!_curStatus_.isOK()) {
            CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run finished, status is ok ...", this->getName().c_str());
        } else {
            CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] run finished, error code is [%d] ...", this->getName().c_str(),
                                _curStatus_.getCode());
        }
    }

    CStatus beginDestroy() override {
        CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] destroy begin ...", this->getName().c_str());
        return CStatus();
    }

    CVoid finishDestroy(const CStatus& _curStatus_) override {
        CGraph::CGRAPH_ECHO("----> [MyTraceAspect] [%s] destroy finished, error code is [%d] ...", this->getName().c_str(),
                            _curStatus_.getCode());
    }
};
  • 本质上就是在执行之前,根据反射方法找到当前需要执行的 Aspect 类型,然后调用执行即可
CStatus GElement::fatProcessor(const CFunctionType& type, CSize loop) {
    CGRAPH_FUNCTION_BEGIN

    while (loop--) {
        switch (type) {
            case CFunctionType::RUN: {
                /** 执行带切面的run方法 */
                status = doAspect(GAspectType::BEGIN_RUN);
                CGRAPH_FUNCTION_CHECK_STATUS
                do {
                    status = run();
                    /**
                     * 如果状态是ok的,并且被条件hold住,则循环执行
                     * 默认所有element的isHold条件均为false,即不hold,即执行一次
                     * 可以根据需求,对任意element类型,添加特定的isHold条件
                     * */
                } while (status.isOK() && this->isHold());
                doAspect(GAspectType::FINISH_RUN, status);
                break;
            }
            case CFunctionType::INIT: {
                status = doAspect(GAspectType::BEGIN_INIT);
                CGRAPH_FUNCTION_CHECK_STATUS
                status = init();
                doAspect(GAspectType::FINISH_INIT, status);
                break;
            }
            case CFunctionType::DESTROY: {
                status = doAspect(GAspectType::BEGIN_DESTROY);
                CGRAPH_FUNCTION_CHECK_STATUS
                status = destroy();
                doAspect(GAspectType::FINISH_DESTROY, status);
                break;
            }
            default:
                CGRAPH_RETURN_ERROR_STATUS("get function type error")
        }
    }

    CGRAPH_FUNCTION_END
}

GDaemon: 表示每隔几秒之后就要执行一次逻辑

  • std::async:C++11 提出的异步逻辑,会返回一个 std::future
    • 参数 1:启动策略
      • std::launch::async: 保证异步行为,异步函数将会在独立的线程中执行
      • std::launch::deferred: 当其他线程调用 get()来访问状态的时候,将调用非异步行为
      • std::launch::async std::launch::deferred: 由系统来决定是否异步执行
    • 参数 2:实际的异步函数和回调返回
      • 直接使用函数
      • 使用函数对象 【比如在类中重载一个 operator() 】
      • lambda 表达式
_template_<_typename_ FunctionType>
CVoid start(CMSec _interval_, const FunctionType& _task_) {
    if (!is_stop_) {
        return;    // 如果正在执行中,则无法继续执行
    }

    is_stop_ = false;
    /**
     * std::launch::async:在调用async就开始创建线程。
     * std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。
     * 后期会考虑将这个功能融合到线程池中去
     */
    future_ = std::async(std::launch::async, [this, _interval_, _task_]() {
         while (!is_stop_) {
             CGRAPH_UNIQUE_LOCK lk(mutex_);
             const _auto_& result = cv_.wait_for(lk, std::chrono::milliseconds(_interval_));
             if (std::cv_status::timeout == result) {
                 _task_();
             }
         }
    });
}

DAG 图调度优化

DAG 图的最大并发量计算

  • 第一步:求出全路径
  • 第二步:根据全路径,得出所有可达点对
  • 第三步,根据上面的点集信息,得到对应的二维矩阵,其中 A 可以到达 BCDEF,B 可以到达 ADEF
  • 第四步,求出二维矩阵的最大独立集【独立集是一个顶点的集合,集合中的顶点两两不相邻】

退出、暂停和恢复机制

线程池优化方案

  • C++ 线程池的问题

    • C++11 之前线程基本是没有标准库线程可以用的
    • threadpool 和 barrier 需要自己去实现
  • 任务队列

    • 存放的元素都是一个个自定义输入输出的任务,任务入队之前要对 queue 的尾部加锁
    • 几个 thread 会竞争获取任务队列的头部任务,任务出队的时候要对 queue 的头部加锁
  • 并发优化的基本点

    • 增加扇入扇出
    • 增加负载
  • 目标

    • 开箱即用,使用 std 库手工实现
    • 简单易用,任务队列可以支持任意格式的任务
    • 性能强

local-thread 机制: 本地队列(类似 GMP 中的 MP 模型)

  • 修改方案:
    • n 个线程都有私有的本地线程队列,线程执行任务的时候就不需要加锁争抢,直接从本地任务队列中获取即可。
    • 本线程中产生的 Task 尽可能放在本线程的 Queue 中执行【Golang 的 GMP 也实现了这个】

local-free 机制: 无锁编程

  • 无锁编程的实现方案
    • 基于 atomic 的原子操作 实现临界区的修改,本质上是在执行原子指令(比如 i++)的时候,变量内部维护了一个自旋锁(确保当前的线程不会被切换)和一个 CAS(一种乐观锁,确保写入的数值是正确的)
    • 基于内部封装的 mutex
    • 基于 CAS 机制 【Compare-And-Swap】

work-stealing 机制: GMP 中的实现也存在这个方案

  • 当某个线程的本地执行队列为空的时候,从另一个线程的本地执行队列中 steal 一些任务
  • 一般来说都是按照 thread 的序号顺序,thread5 会按 thread6, thread7, thread0, … 这样的顺序来偷取
  • 每个 thread 执行的任务顺序

    • 本地队列中的任务
    • 线程池 pool 中的任务
    • 从其他队列 steal 来的任务
  • 缺点和优化方案

    • 为了偷取一个任务,需要遍历所有的 thread 的本地队列来实现偷取,效率太低 => 只考虑从相邻的 3 个线程中偷取

自动扩缩容机制: 弹性伸缩 thread(machine)机制

  • 核心思想:在负载繁忙的时候,多增加一些 thread;当负载空闲的时候,就对 thread 进行自动回收
  • 主线程(PrimaryThread)和辅助线程(SecondaryThread)和监控线程(MonitorThread)

    • 主线程恒定不变
    • 辅助线程可以根据负载繁忙情况来进行动态伸缩
    • 监控线程会每隔 TTL 时间,轮询检测所有的主线程是否都在 running 状态,如果是就是忙碌状态,需要增加一个新的辅助线程;否则则认为 Pool 处于空闲状态,会销毁当前的辅助线程。
  • 注意事项
    • 线程不是越多越好的,因为线程的切换开销成本是比较大的
    • 一般计算密集型任务开辟和 cpu 核心数(n)相同的线程数来执行
    • 一般 I/O 密集型任务开辟 2n+1 个线程来执行
    • 最佳线程数量 = ((线程等待时间 + 线程 CPU 时间)/线程 CPU 时间 )* CPU 数目

批量处理机制: 一次获取多个 Task 执行

  • 主线程和辅助线程在从队列中获取/偷取任务的时候,整批整批地获取,减少争抢锁的次数,用于提高性能
  • 注意事项

    • 这种机制会打乱 queue 中任务的执行顺序 => 解决方案:DAG 图调度的场景中,放入线程池中执行的任务,均是“互不依赖”的,有依赖的任务节点会等被依赖的节点执行完毕之后才会放入 Pool 中

负载均衡机制

  • 核心机制
    • 尽可能保证当前主线程产生的子 Task 会被放入到当前的队列中
    • 尽可能保证每个主线程队列中所有任务的总耗时基本一致

避免忙等待(避免阻塞等待, try_lock+yield)

  • 这个核心思想在 Golang 和 Python 中均有体现

    • yield 类似 return,在 Python 中可以基于此来实现一个生成器,调用 next()会从 yield 返回的位置开始执行。这里的 yield 类似 Golang 的 runtime.Gosched()
    • try_lock 在 Golang 中也有实现,如果此时业务不能拿到锁,也不要让他阻塞等待着,而是利用 yield 来让出 CPU
  • 忙等待的方法主要运用在自旋锁场景中,可以用于 CAS 操作和 Atomic 操作中

    • CAS 需要一致比较当前值和期望值是否一致
    • Atomic 需要保证当前执行线程不退出

分支预测优化(少见)

减少无用的 Copy(std::move+emplace)

  • std::move 用于直接转移对象的内容
  • 指针的选型要避免 shared_ptr 和 unique_ptr 不断多次申请和来回赋值的问题

    • 禁止内部使用 shared_ptr【本质上就是引用计数管理内存,当指向某个实例的指针引用计数降低到 0,会自动释放动态分配的资源】【shared_ptr 内部存在 CAS 校验机制,占据内存也比较大】【shared_ptr 是线程安全的,引用计数是存在锁机制的!】【拷贝使得对象的引用计数增加 1赋值使得原对象引用计数减 1,当计数为 0 时,自动释放内存。后来指向的对象引用计数加 1,指向后来的对象】
    • 采用 unique_ptr 来管理个别不定期申请和释放的资源【独享所有权的指针,不支持普通的拷贝和赋值操作,只能支持移动语义操作,如果转移一个 unique_ptr,会发生对象的所有权从源指针到目标指针的转移过程】
    • weak_ptr【弱引用,避免循环引用的场景】:指向一个由 shared_ptr 管理的对象,只会引用但是不会计数,当一个对象同时被一个 shared_ptr 和一个 weak_ptr 引用,不管 weak_ptr 是否引用这个对象,shared_ptr 析构之后都会导致对象被释放
    • auto_ptr【解决的是有异常抛出的时候发生内存泄漏的问题】:当抛出异常的时候,指针 p 指向的空间需要被合理回收。【auto_ptr 的构造函数是 explicit 函数,阻止指针的隐式转换,不能将一般的指针类型赋值给 auto_ptr,必须要用 auto_ptr 的构造函数来创建对象。被 explicit 修饰的构造函数的类,不能发生相应的隐式类型转换】【auto_ptr 析构函数中删除对象是用 delete,而不是 delete[],不能用于管理数组】

执行超时机制

  • 需要限制单个线程的执行时间,做一个时间限定,如果超过了限定时间就进行阻塞并返回错误信息。

任务组(UTaskGroup)

  • 一批任务完成之后再执行其他的任务
  • 给同一批任务设定统一的等待时长
  • 在多批任务执行结束的时候,固定执行某个回调逻辑

优先级和阻塞执行

  • 用于给任务设置优先级