Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/openhound/scheduler/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import signal
import time
from concurrent.futures import Future, ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from dataclasses import dataclass

import openhound.core.logging # noqa: F401
Expand Down Expand Up @@ -99,6 +100,14 @@ def _shutdown(self) -> None:
self.executor.shutdown(wait=True, cancel_futures=True)
logger.info("Collection service stopped.")

def _reset_executor(self) -> None:
"""Tear down and recreate the process pool after it has entered a broken state."""
try:
self.executor.shutdown(wait=False, cancel_futures=True)
except Exception:
logger.exception("Error shutting down broken executor.")
self.executor = ProcessPoolExecutor(max_workers=1, max_tasks_per_child=1)

def check_jobs(self) -> Job | None:
"""Checks BloodHound enterprise for available jobs. These can either be new jobs or jobs currently started and not finished/stopped.

Expand Down Expand Up @@ -137,9 +146,21 @@ def _start_job(self, job: Job) -> None:
logger.info(f"Starting job {job.id} with collector '{self.collector_name}'")
self.client.start_job(job.id)
self.job_running = job.id
self.future = self.executor.submit(
_subprocess_collect, self.collector_name, job.id
)
try:
self.future = self.executor.submit(
_subprocess_collect, self.collector_name, job.id
)
except BrokenProcessPool:
logger.exception(
f"Failed to submit job {job.id}: process pool is broken, resetting."
)
self.future = None
self.job_running = None
self._reset_executor()
self.client.end_job(
JobStatus.FAILED,
f"Failed to start collector '{self.collector_name}': worker pool was broken",
)

def _handle_completed_job(self, future: Future[Result]) -> None:
"""Handles completion of a job and reports success or failure to BloodHound Enterprise
Expand All @@ -164,6 +185,16 @@ def _handle_completed_job(self, future: Future[Result]) -> None:
f"Collector '{self.collector_name}' not found",
)

except BrokenProcessPool:
logger.exception(
"Collection worker was terminated abruptly; resetting process pool."
)
self._reset_executor()
self.client.end_job(
JobStatus.FAILED,
f"Collection worker for '{self.collector_name}' was terminated abruptly",
)

except Exception:
logger.exception("Collection subprocess failed.")
self.client.end_job(
Expand Down
46 changes: 46 additions & 0 deletions tests/test_bhe_job_scheduling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gzip
import json
from concurrent.futures import Future
from concurrent.futures.process import BrokenProcessPool
from pathlib import Path
from urllib.parse import urlsplit

Expand Down Expand Up @@ -201,6 +202,51 @@ def test_poll_missing_extension(mock_service, mock_bloodhound_api):
}


def test_poll_recovers_from_broken_process_pool(mock_service, mock_bloodhound_api):
"""A BrokenProcessPool surfaced via future.result() should fail the job, clear state, and rebuild the executor."""
mock_bloodhound_api.app.state.job_started = True
future = Future()
future.set_exception(BrokenProcessPool("worker died"))
mock_service.future = future
mock_service.job_running = 123
original_executor = mock_service.executor

mock_service._poll()

assert mock_service.future is None
assert mock_service.job_running is None
assert mock_service.executor is not original_executor
assert mock_bloodhound_api.app.state.job_ended is True
assert mock_bloodhound_api.app.state.end_payload == {
"status": JobStatus.FAILED.value,
"message": "Collection worker for 'openhound-faker' was terminated abruptly",
}


def test_start_job_recovers_when_submit_raises_broken_pool(
mock_service, mock_bloodhound_api, monkeypatch
):
"""If executor.submit raises BrokenProcessPool after the BHE job was started, the job should be ended FAILED, state cleared, and the executor rebuilt."""

def broken_submit(*args, **kwargs):
raise BrokenProcessPool("worker died before submit")

monkeypatch.setattr(mock_service.executor, "submit", broken_submit)
original_executor = mock_service.executor

mock_service._poll()

assert mock_service.future is None
assert mock_service.job_running is None
assert mock_service.executor is not original_executor
assert mock_bloodhound_api.app.state.job_started is True
assert mock_bloodhound_api.app.state.job_ended is True
assert mock_bloodhound_api.app.state.end_payload == {
"status": JobStatus.FAILED.value,
"message": "Failed to start collector 'openhound-faker': worker pool was broken",
}


def test_scheduler_ingest_opengraph(mock_service, mock_bloodhound_api, monkeypatch):
"""Run the DLT pipeline with the openhound-faker collector + check the amount of ingested nodes + edges"""
monkeypatch.setenv(
Expand Down
Loading