From 6229d951219ef5cd8e633bc7fa9d9f94a315f626 Mon Sep 17 00:00:00 2001 From: Chunel Date: Sun, 5 Jul 2026 00:24:14 +0800 Subject: [PATCH 1/2] [chron] set UTask as explicit --- src/UtilsCtrl/ThreadPool/Task/UTask.h | 10 ++++++++-- src/UtilsCtrl/ThreadPool/UThreadPool.inl | 16 ++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/UtilsCtrl/ThreadPool/Task/UTask.h b/src/UtilsCtrl/ThreadPool/Task/UTask.h index e0e9d6c0..93343317 100644 --- a/src/UtilsCtrl/ThreadPool/Task/UTask.h +++ b/src/UtilsCtrl/ThreadPool/Task/UTask.h @@ -11,6 +11,7 @@ #include #include +#include #include #include "../UThreadObject.h" @@ -33,8 +34,9 @@ class UTask : public CStruct { }; public: - template - UTask(F&& func, const int priority = 0) + template::type, UTask>::value, int>::type = 0> + explicit UTask(F&& func, const int priority = 0) : impl_(new TaskDerided(std::forward(func))) , priority_(priority) {} @@ -49,6 +51,10 @@ class UTask : public CStruct { impl_(std::move(task.impl_)), priority_(task.priority_) {} + UTask(UTask&& task, const int priority) noexcept: + impl_(std::move(task.impl_)), + priority_(priority) {} + UTask &operator=(UTask&& task) noexcept { impl_ = std::move(task.impl_); priority_ = task.priority_; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl index 850b6f89..d5124fef 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.inl +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -32,7 +32,7 @@ auto UThreadPool::commitWithTid(const FunctionType& func, CIndex tid, CBool enab std::packaged_task task(std::move(func)); std::future result(task.get_future()); - execute(std::move(task), tid, enable, lockable); + executeWithTid(std::move(task), tid, enable, lockable); return result; } @@ -49,7 +49,7 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority) createSecondaryThread(1); // 如果没有开启辅助线程,则直接开启一个 } - priority_task_queue_.push(std::move(task), priority); + priority_task_queue_.push(UTask(std::move(task)), priority); return result; } @@ -59,14 +59,14 @@ CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) { const CIndex realIndex = dispatch(index); if (likely(realIndex >= 0 && realIndex < config_.default_thread_size_)) { - primary_threads_[realIndex]->pushTask(std::forward(task)); + primary_threads_[realIndex]->pushTask(UTask(std::forward(task))); } else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) { - priority_task_queue_.push(std::forward(task), CGRAPH_LONG_TIME_TASK_STRATEGY); + priority_task_queue_.push(UTask(std::forward(task)), CGRAPH_LONG_TIME_TASK_STRATEGY); } else if (CGRAPH_TRIGGER_ALL_THREAD_STRATEGY == realIndex) { - task_queue_.push(std::forward(task)); + task_queue_.push(UTask(std::forward(task))); (void)wakeupAllThread(); } else { - task_queue_.push(std::forward(task)); + task_queue_.push(UTask(std::forward(task))); } } @@ -74,10 +74,10 @@ CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) { template CVoid UThreadPool::executeWithTid(FunctionType&& task, CIndex tid, CBool enable, CBool lockable) { if (likely(tid >= 0 && tid < config_.default_thread_size_)) { - primary_threads_[tid]->pushTask(std::forward(task), enable, lockable); + primary_threads_[tid]->pushTask(UTask(std::forward(task)), enable, lockable); } else { // 如果超出主线程的范围,则默认写入 pool 通用的任务队列中 - task_queue_.push(std::forward(task)); + task_queue_.push(UTask(std::forward(task))); } } From e395e48f9cd1461d846fd7a3b59cf50eae5ef98b Mon Sep 17 00:00:00 2001 From: Chunel Date: Sun, 5 Jul 2026 00:36:56 +0800 Subject: [PATCH 2/2] [chg] change gsome virtual api, from getWaitTime to getThreshold --- python/tutorial/MyGSome/MySome.py | 2 +- python/wrapper/PywGSome.h | 8 ++++---- src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp | 12 ++++++------ src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h | 6 +++--- tutorial/MyGSome/MySome.h | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/python/tutorial/MyGSome/MySome.py b/python/tutorial/MyGSome/MySome.py index 3735486e..e65ef22d 100644 --- a/python/tutorial/MyGSome/MySome.py +++ b/python/tutorial/MyGSome/MySome.py @@ -9,5 +9,5 @@ from pycgraph import GSome class MyGSome(GSome): - def getWaitNum(self): + def getThreshold(self): return 1 diff --git a/python/wrapper/PywGSome.h b/python/wrapper/PywGSome.h index b0483b50..d0084996 100644 --- a/python/wrapper/PywGSome.h +++ b/python/wrapper/PywGSome.h @@ -21,15 +21,15 @@ class PywGSome : public CGraph::GSome { } ~PywGSome() override = default; - CSize getWaitNum() override { - PYBIND11_OVERLOAD_PURE(CSize, PywGSome, getWaitNum); + CSize getThreshold() override { + PYBIND11_OVERLOAD_PURE(CSize, PywGSome, getThreshold); } }; PYCGRAPH_DECLARE_GGROUP_INTERFACE_CLASS(PywGSome, - CSize getWaitNum() override { - PYBIND11_OVERLOAD_PURE(CSize, PywGSomeInterface, getWaitNum); + CSize getThreshold() override { + PYBIND11_OVERLOAD_PURE(CSize, PywGSomeInterface, getThreshold); }); #endif //CGRAPH_PYWGSOME_H diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp index 2145085e..e69ccf61 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp @@ -40,9 +40,9 @@ CStatus GSome::addElementEx(GElementPtr element) { CStatus GSome::run() { CGRAPH_FUNCTION_BEGIN - wait_num_ = getWaitNum(); - CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(wait_num_ > children_.size(), - "num is bigger than elements size."); + threshold_ = getThreshold(); + CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(threshold_ > children_.size(), + "threshold cannot bigger than elements size."); cur_status_.reset(); /** @@ -57,7 +57,7 @@ CStatus GSome::run() { const auto& curStatus = element->fatProcessor(CFunctionType::RUN); CGRAPH_UNIQUE_LOCK lock(lock_); cur_status_ += curStatus; - if (--wait_num_ == 0 || cur_status_.isErr()) { + if (--threshold_ == 0 || cur_status_.isErr()) { cv_.notify_one(); } } @@ -67,7 +67,7 @@ CStatus GSome::run() { thread_pool_->wakeupAllThread(); CGRAPH_UNIQUE_LOCK lock(lock_); cv_.wait(lock, [this] { - return wait_num_ == 0 || cur_status_.isErr(); + return threshold_ == 0 || cur_status_.isErr(); }); for (auto* element : children_) { @@ -108,7 +108,7 @@ CStatus GSome::checkSuitable() { status = GElement::checkSuitable(); CGRAPH_FUNCTION_CHECK_STATUS - CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((CGRAPH_DEFAULT_LOOP_TIMES != loop_), "GSome cannot set loop > 1.") + CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(CGRAPH_DEFAULT_LOOP_TIMES != loop_, "GSome cannot set loop > 1.") CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(std::any_of(children_.begin(), children_.end(), [](GElementPtr ptr) { return !ptr->isAsync(); }), "GSome contains async node only.") diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h index f7d02ecb..4e132240 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h @@ -20,10 +20,10 @@ class GSome : public GGroup { protected: /** * 设定 wait_num 个数 - * 当前 group 执行完成 wait_num 个后,就可以继续执行 + * 当前 group 执行完成 threshold 个后,就可以继续执行 * @return */ - virtual CSize getWaitNum() = 0; + virtual CSize getThreshold() = 0; protected: explicit GSome(); @@ -41,7 +41,7 @@ class GSome : public GGroup { CStatus addElementEx(GElementPtr element) override; private: - CSize wait_num_ {0}; // 还剩的触发结束的个数 + CSize threshold_ {0}; // 还剩的触发结束的个数 CStatus cur_status_ ; // 记录异步时刻的当前状态信息 std::mutex lock_; diff --git a/tutorial/MyGSome/MySome.h b/tutorial/MyGSome/MySome.h index 66eae2d2..74e27ad9 100644 --- a/tutorial/MyGSome/MySome.h +++ b/tutorial/MyGSome/MySome.h @@ -13,7 +13,7 @@ class MySome : public CGraph::GSome { protected: - CSize getWaitNum() override { + CSize getThreshold() override { /** * 执行完 1个之后,当前的 GSome 就继续往后执行 * 其余的 element,会在 pipeline 结束之前,执行完成。