diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index 5dd4249cab519..cb8bd8c6a1203 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -240,9 +240,18 @@ void ComputingQuotaEvaluator::dispose(int taskId) if (offer.valid == false) { continue; } - if (offer.sharedMemory <= 0) { + // Decrement timeslices after each use and track if a decrement happened. + // - SHM-only offers (ts=0 from start): no decrement, kept alive while shm > 0 + // - Timeslice-only offers (shm=0, ts>0): ts decremented, invalidated when shm <= 0 + // - Combined offers (shm>0, ts>0): ts decremented, invalidated when ts reaches 0 + bool timesliceConsumed = false; + if (offer.timeslices > 0) { + offer.timeslices--; + timesliceConsumed = true; + } + if (offer.sharedMemory <= 0 || (timesliceConsumed && offer.timeslices <= 0)) { O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi * 8)); - O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi); + O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed (shm=%lli, ts=%lli).", oi, offer.sharedMemory, offer.timeslices); offer.valid = false; offer.score = OfferScore::Unneeded; } diff --git a/Framework/Core/src/ControlWebSocketHandler.cxx b/Framework/Core/src/ControlWebSocketHandler.cxx index 35528a1d6dfec..8be91c0e22fc3 100644 --- a/Framework/Core/src/ControlWebSocketHandler.cxx +++ b/Framework/Core/src/ControlWebSocketHandler.cxx @@ -14,10 +14,13 @@ #include "StatusWebSocketHandler.h" #include "Framework/DeviceMetricsHelper.h" #include "Framework/ServiceMetricsInfo.h" +#include "Framework/Signpost.h" #include #include "Framework/Logger.h" #include "Framework/DeviceConfigInfo.h" +O2_DECLARE_DYNAMIC_LOG(rate_limiting); + namespace o2::framework { void ControlWebSocketHandler::frame(char const* frame, size_t s) @@ -74,6 +77,10 @@ void ControlWebSocketHandler::endChunk() if (!didProcessMetric) { return; } + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + O2_SIGNPOST_START(rate_limiting, sid, "endChunk", + "Processing metrics from device %d (had new metric: %d)", + mIndex, (int)didHaveNewMetric); size_t timestamp = (uv_hrtime() - mContext.driver->startTime) / 1000000 + mContext.driver->startTimeMsFromEpoch; assert(mContext.metrics); assert(mContext.infos); @@ -91,6 +98,8 @@ void ControlWebSocketHandler::endChunk() for (auto& metricsInfo : *mContext.metrics) { std::fill(metricsInfo.changed.begin(), metricsInfo.changed.end(), false); } + O2_SIGNPOST_END(rate_limiting, sid, "endChunk", + "Done processing metrics from device %d", mIndex); } void ControlWebSocketHandler::headers(std::map const& headers) diff --git a/Framework/Core/src/DevicesManager.cxx b/Framework/Core/src/DevicesManager.cxx index e6fa2c2c61ae6..b427e72ca781d 100644 --- a/Framework/Core/src/DevicesManager.cxx +++ b/Framework/Core/src/DevicesManager.cxx @@ -13,12 +13,19 @@ #include "Framework/RuntimeError.h" #include "Framework/Logger.h" #include "Framework/DeviceController.h" +#include "Framework/Signpost.h" + +O2_DECLARE_DYNAMIC_LOG(devices_manager); namespace o2::framework { void DevicesManager::queueMessage(char const* target, char const* message) { + O2_SIGNPOST_ID_GENERATE(sid, devices_manager); + O2_SIGNPOST_EVENT_EMIT(devices_manager, sid, "queue", + "Queuing message for %{public}s: %{public}s", + target, message); for (int di = 0; di < specs.size(); ++di) { if (specs[di].id == target) { messages.push_back({di, message}); @@ -44,6 +51,10 @@ void DevicesManager::flush() LOGP(info, "Controller for {} now available.", specs[handle.ref.index].id); notifiedAvailable = true; } + O2_SIGNPOST_ID_GENERATE(sid, devices_manager); + O2_SIGNPOST_EVENT_EMIT(devices_manager, sid, "flush", + "Flushing message to %{public}s: %{public}s", + specs[handle.ref.index].id.c_str(), handle.message.c_str()); controller->write(handle.message.c_str(), handle.message.size()); }