Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ if __name__ == '__main__':
[2026.05.10 - v3.2.4 - Chunel]
* 优化 `pycgraph` 功能

[2026.07.04 - v3.2.5 - Chunel]
* 优化调度性能
* 优化 `pycgraph` 功能

</details>

------------
Expand Down
1 change: 1 addition & 0 deletions python/PyCGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,5 +334,6 @@ PYBIND11_MODULE(pycgraph, cg) {
PYCGRAPH_DECLARE_GGROUP_PYBIND11_FUNCTIONS(GCondition);
PYCGRAPH_DECLARE_GGROUP_PYBIND11_FUNCTIONS(GSerialMultiCondition);
PYCGRAPH_DECLARE_GGROUP_PYBIND11_FUNCTIONS(GParallelMultiCondition);
PYCGRAPH_DECLARE_GGROUP_PYBIND11_FUNCTIONS(GSome);
PYCGRAPH_DECLARE_GGROUP_PYBIND11_FUNCTIONS(GMutable);
}
13 changes: 13 additions & 0 deletions python/tutorial/MyGSome/MySome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
@Author: Chunel
@Contact: chunel@foxmail.com
@File: MySome
@Time: 2026/7/4 21:59
@Desc:
"""

from pycgraph import GSome

class MyGSome(GSome):
def getWaitNum(self):
return 1
7 changes: 7 additions & 0 deletions python/tutorial/MyGSome/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
@Author: Chunel
@Contact: chunel@foxmail.com
@File: __init__.py
@Time: 2026/7/4 21:58
@Desc:
"""
32 changes: 32 additions & 0 deletions python/tutorial/T23-GSome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
@Author: Chunel
@Contact: chunel@foxmail.com
@File: T23-GSome.py
@Time: 2026/7/4 21:58
@Desc:
"""

from pycgraph import GPipeline

from MyGNode.MyNode1 import MyNode1
from MyGNode.MyNode2 import MyNode2
from MyGSome.MySome import MyGSome


def tutorial_some():
pipeline = GPipeline()
a, c, d = MyNode1(), MyNode1(), MyNode2()
b_some = MyGSome(
[MyNode1("nodeB1"), MyNode2("nodeB2"), MyNode2("nodeB3")]
)

pipeline.registerGElement(a, set(), "nodeA")
pipeline.registerGElement(b_some, {a}, "someB")
pipeline.registerGElement(c, {b_some}, "nodeC")
pipeline.registerGElement(d, {c}, "nodeD")

pipeline.process()


if __name__ == '__main__':
tutorial_some()
1 change: 1 addition & 0 deletions python/wrapper/PyWrapperInclude.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "PywGStage.h"
#include "PywGParallelMultiCondition.h"
#include "PywGSerialMultiCondition.h"
#include "PywGSome.h"
#include "PywGMutable.h"
#include "PywGPassedParam.h"
#include "PywGPipelineDeleter.h"
Expand Down
35 changes: 35 additions & 0 deletions python/wrapper/PywGSome.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: PywGSome.h
@Time: 2026/7/4 15:13
@Desc:
***************************/

#ifndef CGRAPH_PYWGSOME_H
#define CGRAPH_PYWGSOME_H

#include <pybind11/pybind11.h>

#include "CGraph.h"
#include "PyWrapperMacro.h"

class PywGSome : public CGraph::GSome {
public:
explicit PywGSome(const CGraph::GElementPtrArr& elements = CGraph::GElementPtrArr{}) {
__addGElements_4py(elements);
}
~PywGSome() override = default;

CSize getWaitNum() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSome, getWaitNum);
}
};


PYCGRAPH_DECLARE_GGROUP_INTERFACE_CLASS(PywGSome,
CSize getWaitNum() override {
PYBIND11_OVERLOAD_PURE(CSize, PywGSomeInterface, getWaitNum);
});

#endif //CGRAPH_PYWGSOME_H
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,14 @@ CStatus GElement::asyncRun() {

async_result_ = thread_pool_->commit([this] {
return run();
}, CGRAPH_POOL_TASK_STRATEGY);
}, CGRAPH_TRIGGER_ALL_THREAD_STRATEGY);

const auto& futStatus = async_result_.wait_for(std::chrono::milliseconds(timeout_));
if (std::future_status::ready == futStatus) {
status = getAsyncResult();
} else {
doAspect(internal::GAspectType::ENTER_TIMEOUT);
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION( GElementTimeoutStrategy::AS_ERROR == timeout_strategy_, \
(void)doAspect(internal::GAspectType::ENTER_TIMEOUT);
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(GElementTimeoutStrategy::AS_ERROR == timeout_strategy_, \
"[" + name_ + "] running time more than [" + std::to_string(timeout_) + "]ms")
cur_state_.store(GElementState::TIMEOUT, std::memory_order_release);
}
Expand Down
17 changes: 6 additions & 11 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ CStatus GSome::addElementEx(GElementPtr element) {
}


CStatus GSome::run() {
CStatus GSome::run() {
CGRAPH_FUNCTION_BEGIN

wait_num_ = static_cast<CInt>(getWaitNum());
wait_num_ = getWaitNum();
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(wait_num_ > children_.size(),
"num is bigger than elements size.");
cur_status_.reset();
Expand All @@ -57,16 +57,17 @@ CStatus GSome::run() {
const auto& curStatus = element->fatProcessor(CFunctionType::RUN);
CGRAPH_UNIQUE_LOCK lock(lock_);
cur_status_ += curStatus;
if (--wait_num_ <= 0 || cur_status_.isErr()) {
if (--wait_num_ == 0 || cur_status_.isErr()) {
cv_.notify_one();
}
}
}, binding_index_);
});
}

thread_pool_->wakeupAllThread();
CGRAPH_UNIQUE_LOCK lock(lock_);
cv_.wait(lock, [this] {
return wait_num_ <= 0 || cur_status_.isErr();
return wait_num_ == 0 || cur_status_.isErr();
});

for (auto* element : children_) {
Expand Down Expand Up @@ -102,12 +103,6 @@ CVoid GSome::dump(std::ostream& oss) {
}


CBool GSome::isHold() {
// 这里固定是不可以 hold的
return false;
}


CStatus GSome::checkSuitable() {
CGRAPH_FUNCTION_BEGIN
status = GElement::checkSuitable();
Expand Down
4 changes: 1 addition & 3 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@ class GSome : public GGroup {

CVoid dump(std::ostream& oss) final;

CBool isHold() final;

CStatus checkSuitable() final;

CGRAPH_NO_ALLOWED_COPY(GSome)

CStatus addElementEx(GElementPtr element) override;

private:
CInt wait_num_ {0}; // 还剩的触发结束的个数
CSize wait_num_ {0}; // 还剩的触发结束的个数
CStatus cur_status_ ; // 记录异步时刻的当前状态信息

std::mutex lock_;
Expand Down
Loading