Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/tutorial/MyGSome/MySome.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
from pycgraph import GSome

class MyGSome(GSome):
def getWaitNum(self):
def getThreshold(self):
return 1
8 changes: 4 additions & 4 deletions python/wrapper/PywGSome.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ class PywGSome : public CGraph::GSome {
}
~PywGSome() override = default;

CSize getWaitNum() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSome, getWaitNum);
CSize getThreshold() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSome, getThreshold);
}
};


PYCGRAPH_DECLARE_GGROUP_INTERFACE_CLASS(PywGSome,
CSize getWaitNum() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSomeInterface, getWaitNum);
CSize getThreshold() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSomeInterface, getThreshold);
});

#endif //CGRAPH_PYWGSOME_H
12 changes: 6 additions & 6 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ CStatus GSome::addElementEx(GElementPtr element) {
CStatus GSome::run() {
CGRAPH_FUNCTION_BEGIN

wait_num_ = getWaitNum();
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(wait_num_ > children_.size(),
"num is bigger than elements size.");
threshold_ = getThreshold();
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(threshold_ > children_.size(),
"threshold cannot bigger than elements size.");
cur_status_.reset();

/**
Expand All @@ -57,7 +57,7 @@ CStatus GSome::run() {
const auto& curStatus = element->fatProcessor(CFunctionType::RUN);
CGRAPH_UNIQUE_LOCK lock(lock_);
cur_status_ += curStatus;
if (--wait_num_ == 0 || cur_status_.isErr()) {
if (--threshold_ == 0 || cur_status_.isErr()) {
cv_.notify_one();
}
}
Expand All @@ -67,7 +67,7 @@ CStatus GSome::run() {
thread_pool_->wakeupAllThread();
CGRAPH_UNIQUE_LOCK lock(lock_);
cv_.wait(lock, [this] {
return wait_num_ == 0 || cur_status_.isErr();
return threshold_ == 0 || cur_status_.isErr();
});

for (auto* element : children_) {
Expand Down Expand Up @@ -108,7 +108,7 @@ CStatus GSome::checkSuitable() {
status = GElement::checkSuitable();
CGRAPH_FUNCTION_CHECK_STATUS

CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((CGRAPH_DEFAULT_LOOP_TIMES != loop_), "GSome cannot set loop > 1.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(CGRAPH_DEFAULT_LOOP_TIMES != loop_, "GSome cannot set loop > 1.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(std::any_of(children_.begin(), children_.end(), [](GElementPtr ptr) {
return !ptr->isAsync();
}), "GSome contains async node only.")
Expand Down
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class GSome : public GGroup {
protected:
/**
* 设定 wait_num 个数
* 当前 group 执行完成 wait_num 个后,就可以继续执行
* 当前 group 执行完成 threshold 个后,就可以继续执行
* @return
*/
virtual CSize getWaitNum() = 0;
virtual CSize getThreshold() = 0;

protected:
explicit GSome();
Expand All @@ -41,7 +41,7 @@ class GSome : public GGroup {
CStatus addElementEx(GElementPtr element) override;

private:
CSize wait_num_ {0}; // 还剩的触发结束的个数
CSize threshold_ {0}; // 还剩的触发结束的个数
CStatus cur_status_ ; // 记录异步时刻的当前状态信息

std::mutex lock_;
Expand Down
10 changes: 8 additions & 2 deletions src/UtilsCtrl/ThreadPool/Task/UTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <vector>
#include <memory>
#include <utility>
#include <type_traits>

#include "../UThreadObject.h"
Expand All @@ -33,8 +34,9 @@ class UTask : public CStruct {
};

public:
template<typename F>
UTask(F&& func, const int priority = 0)
template<typename F,
typename std::enable_if<!std::is_same<typename std::decay<F>::type, UTask>::value, int>::type = 0>
explicit UTask(F&& func, const int priority = 0)
: impl_(new TaskDerided<F>(std::forward<F>(func)))
, priority_(priority) {}

Expand All @@ -49,6 +51,10 @@ class UTask : public CStruct {
impl_(std::move(task.impl_)),
priority_(task.priority_) {}

UTask(UTask&& task, const int priority) noexcept:
impl_(std::move(task.impl_)),
priority_(priority) {}

UTask &operator=(UTask&& task) noexcept {
impl_ = std::move(task.impl_);
priority_ = task.priority_;
Expand Down
16 changes: 8 additions & 8 deletions src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ auto UThreadPool::commitWithTid(const FunctionType& func, CIndex tid, CBool enab
std::packaged_task<ResultType()> task(std::move(func));
std::future<ResultType> result(task.get_future());

execute(std::move(task), tid, enable, lockable);
executeWithTid(std::move(task), tid, enable, lockable);
return result;
}

Expand All @@ -49,7 +49,7 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority)
createSecondaryThread(1); // 如果没有开启辅助线程,则直接开启一个
}

priority_task_queue_.push(std::move(task), priority);
priority_task_queue_.push(UTask(std::move(task)), priority);
return result;
}

Expand All @@ -59,25 +59,25 @@ CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) {
const CIndex realIndex = dispatch(index);

if (likely(realIndex >= 0 && realIndex < config_.default_thread_size_)) {
primary_threads_[realIndex]->pushTask(std::forward<FunctionType>(task));
primary_threads_[realIndex]->pushTask(UTask(std::forward<FunctionType>(task)));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
priority_task_queue_.push(std::forward<FunctionType>(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
priority_task_queue_.push(UTask(std::forward<FunctionType>(task)), CGRAPH_LONG_TIME_TASK_STRATEGY);
} else if (CGRAPH_TRIGGER_ALL_THREAD_STRATEGY == realIndex) {
task_queue_.push(std::forward<FunctionType>(task));
task_queue_.push(UTask(std::forward<FunctionType>(task)));
(void)wakeupAllThread();
} else {
task_queue_.push(std::forward<FunctionType>(task));
task_queue_.push(UTask(std::forward<FunctionType>(task)));
}
}


template<typename FunctionType>
CVoid UThreadPool::executeWithTid(FunctionType&& task, CIndex tid, CBool enable, CBool lockable) {
if (likely(tid >= 0 && tid < config_.default_thread_size_)) {
primary_threads_[tid]->pushTask(std::forward<FunctionType>(task), enable, lockable);
primary_threads_[tid]->pushTask(UTask(std::forward<FunctionType>(task)), enable, lockable);
} else {
// 如果超出主线程的范围,则默认写入 pool 通用的任务队列中
task_queue_.push(std::forward<FunctionType>(task));
task_queue_.push(UTask(std::forward<FunctionType>(task)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion tutorial/MyGSome/MySome.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class MySome : public CGraph::GSome {
protected:
CSize getWaitNum() override {
CSize getThreshold() override {
/**
* 执行完 1个之后,当前的 GSome 就继续往后执行
* 其余的 element,会在 pipeline 结束之前,执行完成。
Expand Down
Loading