diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index a1a17c3..5ad21bb 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -2,6 +2,7 @@ import signal import time from concurrent.futures import Future, ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from dataclasses import dataclass import openhound.core.logging # noqa: F401 @@ -99,6 +100,14 @@ def _shutdown(self) -> None: self.executor.shutdown(wait=True, cancel_futures=True) logger.info("Collection service stopped.") + def _reset_executor(self) -> None: + """Tear down and recreate the process pool after it has entered a broken state.""" + try: + self.executor.shutdown(wait=False, cancel_futures=True) + except Exception: + logger.exception("Error shutting down broken executor.") + self.executor = ProcessPoolExecutor(max_workers=1, max_tasks_per_child=1) + def check_jobs(self) -> Job | None: """Checks BloodHound enterprise for available jobs. These can either be new jobs or jobs currently started and not finished/stopped. @@ -137,9 +146,21 @@ def _start_job(self, job: Job) -> None: logger.info(f"Starting job {job.id} with collector '{self.collector_name}'") self.client.start_job(job.id) self.job_running = job.id - self.future = self.executor.submit( - _subprocess_collect, self.collector_name, job.id - ) + try: + self.future = self.executor.submit( + _subprocess_collect, self.collector_name, job.id + ) + except BrokenProcessPool: + logger.exception( + f"Failed to submit job {job.id}: process pool is broken, resetting." + ) + self.future = None + self.job_running = None + self._reset_executor() + self.client.end_job( + JobStatus.FAILED, + f"Failed to start collector '{self.collector_name}': worker pool was broken", + ) def _handle_completed_job(self, future: Future[Result]) -> None: """Handles completion of a job and reports success or failure to BloodHound Enterprise @@ -164,6 +185,16 @@ def _handle_completed_job(self, future: Future[Result]) -> None: f"Collector '{self.collector_name}' not found", ) + except BrokenProcessPool: + logger.exception( + "Collection worker was terminated abruptly; resetting process pool." + ) + self._reset_executor() + self.client.end_job( + JobStatus.FAILED, + f"Collection worker for '{self.collector_name}' was terminated abruptly", + ) + except Exception: logger.exception("Collection subprocess failed.") self.client.end_job( diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index b7e976f..e4051b0 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -1,6 +1,7 @@ import gzip import json from concurrent.futures import Future +from concurrent.futures.process import BrokenProcessPool from pathlib import Path from urllib.parse import urlsplit @@ -201,6 +202,51 @@ def test_poll_missing_extension(mock_service, mock_bloodhound_api): } +def test_poll_recovers_from_broken_process_pool(mock_service, mock_bloodhound_api): + """A BrokenProcessPool surfaced via future.result() should fail the job, clear state, and rebuild the executor.""" + mock_bloodhound_api.app.state.job_started = True + future = Future() + future.set_exception(BrokenProcessPool("worker died")) + mock_service.future = future + mock_service.job_running = 123 + original_executor = mock_service.executor + + mock_service._poll() + + assert mock_service.future is None + assert mock_service.job_running is None + assert mock_service.executor is not original_executor + assert mock_bloodhound_api.app.state.job_ended is True + assert mock_bloodhound_api.app.state.end_payload == { + "status": JobStatus.FAILED.value, + "message": "Collection worker for 'openhound-faker' was terminated abruptly", + } + + +def test_start_job_recovers_when_submit_raises_broken_pool( + mock_service, mock_bloodhound_api, monkeypatch +): + """If executor.submit raises BrokenProcessPool after the BHE job was started, the job should be ended FAILED, state cleared, and the executor rebuilt.""" + + def broken_submit(*args, **kwargs): + raise BrokenProcessPool("worker died before submit") + + monkeypatch.setattr(mock_service.executor, "submit", broken_submit) + original_executor = mock_service.executor + + mock_service._poll() + + assert mock_service.future is None + assert mock_service.job_running is None + assert mock_service.executor is not original_executor + assert mock_bloodhound_api.app.state.job_started is True + assert mock_bloodhound_api.app.state.job_ended is True + assert mock_bloodhound_api.app.state.end_payload == { + "status": JobStatus.FAILED.value, + "message": "Failed to start collector 'openhound-faker': worker pool was broken", + } + + def test_scheduler_ingest_opengraph(mock_service, mock_bloodhound_api, monkeypatch): """Run the DLT pipeline with the openhound-faker collector + check the amount of ingested nodes + edges""" monkeypatch.setenv(