Allow long-running jobs to be processed as they come#457
Open
antonije wants to merge 9 commits into
Open
Conversation
…progress Previously, jobs were marked as "in progress" in `get_jobs()` immediately after being fetched from the server, before they were actually scheduled by `run_jobs()`. This caused jobs to appear both in the queue and in progress simultaneously, which inflated `current_occupancy()` and blocked new jobs from starting until earlier ones had finished. The fix moves the `job_progress.add(job)` call into `run_jobs()`, right after the job is dequeued and scheduled as a task. This ensures that: - Jobs are only marked "in progress" once they actually start running. - Queue and progress metrics no longer overlap. - Concurrency limits are enforced correctly (e.g. with concurrency=4, two jobs now run in parallel instead of sequentially). As a result, jobs are dispatched immediately when capacity is available, and status logging reflects the real state of the worker.
…ne finished tasks each tick.
- add a timeout of 100 milliseconds that doesn't block the code and accepts new jobs
There was a problem hiding this comment.
Pull request overview
This PR updates the serverless worker’s JobScaler loop to better support long-running jobs by decoupling job acquisition from job execution, avoiding queue backpressure, and aligning job state tracking with when work actually begins.
Changes:
- Replace the bounded
asyncio.Queue(maxsize=concurrency)with an unbounded queue to prevent acquisition from blocking on long-running jobs. - Adjust scaling logic to update
current_concurrencywithout recreating the queue. - Rework
run_jobsto manage a live set of tasks and process jobs as capacity opens up; movejob_progress.add()to dequeue-time to avoid “queued + in-process” double-state.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
81
to
+87
| while self.current_occupancy() > 0: | ||
| # not safe to scale when jobs are in flight | ||
| await asyncio.sleep(1) | ||
| continue | ||
|
|
||
| self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency) | ||
| log.debug( | ||
| f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}" | ||
| ) | ||
| self.current_concurrency = new_concurrency | ||
| log.debug(f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}") |
| Runs the block in an infinite loop while the worker is alive or jobs queue is not empty. | ||
| """ | ||
| tasks = [] # Store the tasks for concurrent job processing | ||
| tasks: set[asyncio.Task] = set() # Store the tasks for concurrent job processing |
Comment on lines
236
to
240
| done, pending = await asyncio.wait( | ||
| tasks, return_when=asyncio.FIRST_COMPLETED | ||
| tasks, | ||
| timeout=0.1, | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
These changes fix the following issues:
jobs_queueand setting initialmaxsizeNOTE: I have added longer sleep in the
run_jobsmethod to suit my needs, but feel free to edit those.