From affedf816b6aa065a41d69a48418ee576edc5d76 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 21 May 2026 16:45:44 +0100 Subject: [PATCH 01/56] Validate tiled service account configuration at startup --- helm/blueapi/config_schema.json | 7 +++++++ helm/blueapi/values.schema.json | 7 +++++++ src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 29 +++++++++++++++++++++++++++- src/blueapi/service/main.py | 3 ++- tests/unit_tests/test_config.py | 1 + 6 files changed, 46 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index 39a1b1aab..dd8a48433 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -345,8 +345,15 @@ "default": "account", "title": "Audience", "type": "string" + }, + "tiled_service_account_check": { + "title": "Tiled Service Account Check", + "type": "string" } }, + "required": [ + "tiled_service_account_check" + ], "title": "OpaConfig", "type": "object", "$id": "OpaConfig" diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index f78dbfb84..60310135c 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -755,6 +755,9 @@ "$id": "OpaConfig", "title": "OpaConfig", "type": "object", + "required": [ + "tiled_service_account_check" + ], "properties": { "audience": { "title": "Audience", @@ -768,6 +771,10 @@ "format": "uri", "maxLength": 2083, "minLength": 1 + }, + "tiled_service_account_check": { + "title": "Tiled Service Account Check", + "type": "string" } }, "additionalProperties": false diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 538d14158..c56415bfe 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -299,6 +299,7 @@ class Tag(StrEnum): class OpaConfig(BlueapiBaseModel): root: HttpUrl = HttpUrl("http://localhost:8181") audience: str = "account" + tiled_service_account_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index aabd4a692..a4a7b5c98 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -5,7 +5,8 @@ from aiohttp import ClientSession -from blueapi.config import OpaConfig +from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount +from blueapi.service.authentication import TiledAuth LOGGER = logging.getLogger(__name__) @@ -45,3 +46,29 @@ def for_config( return aclosing(cls(instrument, config)) LOGGER.info("No OPA config provided - not creating OpaClient") return nullcontext() + + async def require_tiled_service_account(self, token: str): + if not await self._call_opa( + self._config.tiled_service_account_check, + {"token": token, "beamline": self._instrument}, + ): + raise ValueError( + f"Tiled service account is not valid for '{self._instrument}'" + ) + + +async def validate_tiled_config( + tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None +): + if not isinstance(tiled, ServiceAccount): + # can't validate an API key + return + + if not opa or not oidc: + LOGGER.info("Missing OPA or OIDC configuration required to validate tiled auth") + return + + LOGGER.info("Validating tiled configuration") + tiled.token_url = oidc.token_endpoint + auth = TiledAuth(tiled) + await opa.require_tiled_service_account(auth.get_access_token()) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index cc937d307..a84a85b29 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,7 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient +from .authorization import OpaClient, validate_tiled_config from .model import ( DeviceModel, DeviceResponse, @@ -98,6 +98,7 @@ async def inner(app: FastAPI): setup_runner(config) async with OpaClient.for_config(meta and meta.instrument, config.opa) as opa: app.state.authz = opa + await validate_tiled_config(config.tiled.authentication, config.oidc, opa) yield teardown_runner() diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 97a4ce380..5cbb00c1b 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -340,6 +340,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "opa": { "root": "http://opa.example.com/", "audience": "account", + "tiled_service_account_check": "v1/tiled_service_account", }, }, { From 0482d81fa9f7216fa114ea00c0b95230dc99085a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:22:29 +0100 Subject: [PATCH 02/56] Add tests for tiled check --- .../unit_tests/service/test_authorization.py | 93 ++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 608269d20..249198580 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -1,11 +1,13 @@ -from unittest.mock import AsyncMock, MagicMock, patch +from contextlib import AbstractContextManager, nullcontext +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from pydantic import HttpUrl -from blueapi.config import OpaConfig +from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + validate_tiled_config, ) # Reusable client patch decorator @@ -20,9 +22,50 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), + tiled_service_account_check="/auth/tiled", ) +@patch_client_session +@pytest.mark.parametrize( + "result,context", + [ + (False, pytest.raises(ValueError, match="Tiled service account is not valid ")), + (True, nullcontext()), + ], +) +async def test_tiled_service_account( + session: MagicMock, + opa_config: OpaConfig, + result: bool, + context: AbstractContextManager, +): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + + client = OpaClient(instrument="p99", config=opa_config) + + session.assert_called_once_with(base_url="http://auth.example.com/") + with context: + await client.require_tiled_service_account(token="foo_bar") + session().post.assert_called_once_with( + "/auth/tiled", + json={"input": {"token": "foo_bar", "beamline": "p99", "audience": "account"}}, + ) + + +@patch_client_session +async def test_exception_raised_when_opa_fails( + session: MagicMock, opa_config: OpaConfig +): + session.return_value.post = AsyncMock(side_effect=RuntimeError("Connection failed")) + async with OpaClient.for_config("p45", opa_config) as client: + assert client is not None + with pytest.raises(RuntimeError, match="Connection failed"): + await client.require_tiled_service_account(token="foo_bar") + + @patch_client_session async def test_session_closed(session: MagicMock, opa_config: OpaConfig): async with OpaClient.for_config("p45", opa_config): @@ -60,3 +103,49 @@ async def test_opa_adds_input_fields(session: MagicMock, opa_config: OpaConfig): "foo/bar", json={"input": {"beamline": "p45", "audience": "account", "foo": "bar"}}, ) + + +async def test_validate_tiled_config(): + opa = MagicMock(spec=OpaClient) + tiled = ServiceAccount() + oidc = Mock(spec=OIDCConfig) + oidc.token_endpoint = "token-endpoint" + with patch("blueapi.service.authorization.TiledAuth") as auth: + auth.return_value.get_access_token.return_value = "tiled-token" + await validate_tiled_config(tiled, oidc, opa) + + auth.assert_called_once_with(tiled) + opa.require_tiled_service_account.assert_called_once_with("tiled-token") + + +@pytest.mark.parametrize( + "tiled_auth,oidc,opa_client", + [ + (None, None, MagicMock(spec=OpaClient)), + ( + None, + OIDCConfig(well_known_url="http://example.com", client_id="test-client"), + MagicMock(spec=OpaClient), + ), + ("api_key", None, MagicMock(spec=OpaClient)), + ( + "api_key", + OIDCConfig(well_known_url="http://example.com", client_id="test-client"), + MagicMock(spec=OpaClient), + ), + (ServiceAccount(), None, MagicMock(spec=OpaClient)), + ( + ServiceAccount(), + OIDCConfig(well_known_url="http://example.com", client_id="test-client"), + None, + ), + ], +) +async def test_validate_tiled_config_with_missing_config( + tiled_auth: ServiceAccount | str | None, + oidc: OIDCConfig | None, + opa_client: MagicMock | None, +): + assert await validate_tiled_config(tiled_auth, oidc, opa_client) is None + if opa_client is not None: + opa_client.require_tiled_service_account.assert_not_called() From 170e3d1520a6f194c88fa434c78debda214a2591 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:01:54 +0100 Subject: [PATCH 03/56] Add opa dependency function to create OpaUserClient --- src/blueapi/service/authorization.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index a4a7b5c98..c4f72ec45 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,12 +1,14 @@ import logging from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext -from typing import Any, Self +from typing import Any, Self, cast from aiohttp import ClientSession +from fastapi import Depends, HTTPException, Request +from starlette import status from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount -from blueapi.service.authentication import TiledAuth +from blueapi.service.authentication import TiledAuth, unchecked_bearer_token LOGGER = logging.getLogger(__name__) @@ -72,3 +74,14 @@ async def validate_tiled_config( tiled.token_url = oidc.token_endpoint auth = TiledAuth(tiled) await opa.require_tiled_service_account(auth.get_access_token()) + + +async def opa( + request: Request, token: str | None = Depends(unchecked_bearer_token) +) -> OpaUserClient | None: + + if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): + if not token: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + return opa.for_token(token) + return None From b3254eab8407a9cd03236c8dc8dac2532f4d4703 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:24:54 +0100 Subject: [PATCH 04/56] test opa dependency function --- src/blueapi/service/authorization.py | 2 +- .../unit_tests/service/test_authorization.py | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index c4f72ec45..dbdc1f685 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -83,5 +83,5 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - return opa.for_token(token) + return OpaUserClient(opa, token) return None diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 249198580..ff3949174 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -2,11 +2,13 @@ from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest +from fastapi import HTTPException from pydantic import HttpUrl from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + opa, validate_tiled_config, ) @@ -149,3 +151,28 @@ async def test_validate_tiled_config_with_missing_config( assert await validate_tiled_config(tiled_auth, oidc, opa_client) is None if opa_client is not None: opa_client.require_tiled_service_account.assert_not_called() + + +async def test_opa_dependency_method(): + request = MagicMock() + + user_client = await opa(request, "foo_bar") + + assert user_client is not None + assert user_client.client == request.app.state.authz + assert user_client.token == "foo_bar" + + +async def test_opa_dependency_without_token(): + request = MagicMock() + + with pytest.raises(HTTPException, match="401"): + await opa(request, None) + + +@pytest.mark.parametrize("token", ["foo_bar", None]) +async def test_opa_dependency_without_authz(token): + request = MagicMock() + del request.app.state.authz + user_client = await opa(request, token) + assert user_client is None From fcad0afc8ad4b031bfd94289fb9f9ae67d890daf Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:17:35 +0100 Subject: [PATCH 05/56] Add can_submit_task auth check method and config --- helm/blueapi/config_schema.json | 7 ++++- helm/blueapi/values.schema.json | 7 ++++- src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 30 +++++++++++++++++++ .../unit_tests/service/test_authorization.py | 1 + tests/unit_tests/test_config.py | 1 + 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index dd8a48433..e176e5c66 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -349,10 +349,15 @@ "tiled_service_account_check": { "title": "Tiled Service Account Check", "type": "string" + }, + "submit_task_check": { + "title": "Submit Task Check", + "type": "string" } }, "required": [ - "tiled_service_account_check" + "tiled_service_account_check", + "submit_task_check" ], "title": "OpaConfig", "type": "object", diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 60310135c..5808acc54 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -756,7 +756,8 @@ "title": "OpaConfig", "type": "object", "required": [ - "tiled_service_account_check" + "tiled_service_account_check", + "submit_task_check" ], "properties": { "audience": { @@ -772,6 +773,10 @@ "maxLength": 2083, "minLength": 1 }, + "submit_task_check": { + "title": "Submit Task Check", + "type": "string" + }, "tiled_service_account_check": { "title": "Tiled Service Account Check", "type": "string" diff --git a/src/blueapi/config.py b/src/blueapi/config.py index c56415bfe..a64491090 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -300,6 +300,7 @@ class OpaConfig(BlueapiBaseModel): root: HttpUrl = HttpUrl("http://localhost:8181") audience: str = "account" tiled_service_account_check: str + submit_task_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index dbdc1f685..9cc4ea7df 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,4 +1,5 @@ import logging +import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext from typing import Any, Self, cast @@ -9,8 +10,10 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token +from blueapi.service.model import TaskRequest LOGGER = logging.getLogger(__name__) +INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") class OpaClient: @@ -58,6 +61,33 @@ async def require_tiled_service_account(self, token: str): f"Tiled service account is not valid for '{self._instrument}'" ) + async def require_submit_task(self, instrument_session: str, token: str): + if not (match := INSTRUMENT_SESSION_RE.match(instrument_session)): + raise ValueError("Invalid instrument session") + + if not await self._call_opa( + self._conf.submit_task_check, + { + "token": token, + "proposal": int(match["proposal"]), + "visit": int(match["visit"]), + }, + ): + raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + + +class OpaUserClient: + client: OpaClient + token: str + + def __init__(self, client: OpaClient, token: str): + self.client = client + self.token = token + + async def can_submit_task(self, task: TaskRequest): + LOGGER.info("Checking permissions to run task") + await self.client.require_submit_task(task.instrument_session, self.token) + async def validate_tiled_config( tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index ff3949174..37c1d7e3f 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -24,6 +24,7 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), + submit_task_check="/auth/submit", tiled_service_account_check="/auth/tiled", ) diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 5cbb00c1b..747e944d5 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -341,6 +341,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "root": "http://opa.example.com/", "audience": "account", "tiled_service_account_check": "v1/tiled_service_account", + "submit_task_check": "v1/submit_task", }, }, { From 171de826d25a90f6f5572d29c4a9b0c191d38e87 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 15 May 2026 08:27:14 +0000 Subject: [PATCH 06/56] feat: add authz dependency injection --- src/blueapi/service/main.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index a84a85b29..54c2be69e 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,7 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, validate_tiled_config +from .authorization import OpaClient, OpaUserClient, opa, validate_tiled_config from .model import ( DeviceModel, DeviceResponse, @@ -258,6 +258,13 @@ def get_device_by_name( ) +async def submission_check( + opa: Annotated[OpaUserClient, Depends(opa)], + task_request: TaskRequest, +): + await opa.can_submit_task(task_request) + + @secure_router_v1.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @secure_router.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @start_as_current_span( @@ -271,6 +278,7 @@ def submit_task( request: Request, response: Response, task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], + authz_check: Annotated[None, Depends(submission_check)], runner: Annotated[WorkerDispatcher, Depends(_runner)], user: Fedid, ) -> TaskResponse: From ddfeab038e19b3f47c10a0720883e06c67257c55 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 18 May 2026 15:01:34 +0000 Subject: [PATCH 07/56] feat: add auth check dependency injections to task endpoints --- src/blueapi/service/authorization.py | 8 +++++++- src/blueapi/service/main.py | 16 +++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 9cc4ea7df..9d9c96ddd 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -2,7 +2,7 @@ import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext -from typing import Any, Self, cast +from typing import Annotated, Any, Self, cast from aiohttp import ClientSession from fastapi import Depends, HTTPException, Request @@ -115,3 +115,9 @@ async def opa( raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return OpaUserClient(opa, token) return None + +async def submit_permission( + opa: Annotated[OpaUserClient, Depends(opa)], + task_request: TaskRequest, +): + await opa.can_submit_task(task_request) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 54c2be69e..ed8014c6c 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,7 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, OpaUserClient, opa, validate_tiled_config +from .authorization import OpaClient, submit_permission, validate_tiled_config from .model import ( DeviceModel, DeviceResponse, @@ -258,13 +258,6 @@ def get_device_by_name( ) -async def submission_check( - opa: Annotated[OpaUserClient, Depends(opa)], - task_request: TaskRequest, -): - await opa.can_submit_task(task_request) - - @secure_router_v1.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @secure_router.post("/tasks", status_code=status.HTTP_201_CREATED, tags=[Tag.TASK]) @start_as_current_span( @@ -278,7 +271,7 @@ def submit_task( request: Request, response: Response, task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], - authz_check: Annotated[None, Depends(submission_check)], + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], user: Fedid, ) -> TaskResponse: @@ -317,6 +310,7 @@ def submit_task( @start_as_current_span(TRACER, "task_id") def delete_submitted_task( task_id: str, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TaskResponse: return TaskResponse(task_id=runner.run(interface.clear_task, task_id)) @@ -335,6 +329,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @start_as_current_span(TRACER) def get_tasks( runner: Annotated[WorkerDispatcher, Depends(_runner)], + _: Annotated[None, Depends(submit_permission)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -371,6 +366,7 @@ def get_tasks( def set_active_task( request: Request, task: WorkerTask, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerTask: """Set a task to active status, the worker should begin it as soon as possible. @@ -401,6 +397,7 @@ def get_passthrough_headers(request: Request) -> dict[str, str]: @start_as_current_span(TRACER, "task_id") def get_task( task_id: str, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TrackableTask: """Retrieve a task""" @@ -478,6 +475,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, + _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 37524ebc1ed5a0ca7e907dc242e3632133cf4280 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 08:13:42 +0000 Subject: [PATCH 08/56] feat: create new access task permission fns and add as dependencies --- src/blueapi/service/main.py | 57 +++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index ed8014c6c..5aaa3d20a 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -146,6 +146,41 @@ def get_app(config: ApplicationConfig): return app +def access_task_permission( + request: Request, + task_id: str, + runner: Annotated[WorkerDispatcher, Depends(_runner)], +): + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + try: + task = runner.run(interface.get_task_by_id, task_id) + except KeyError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None + + if ( + access_token + and task + and access_token.get("fedid") != task.task.metadata.get("user") + ): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + + +# start_task_permission is used when there is WorkerTask +def start_task_permission( + request: Request, + task: WorkerTask, + runner: Annotated[WorkerDispatcher, Depends(_runner)], +): + if not task.task_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="No task id provided", + ) + access_task_permission(request, task.task_id, runner) + + async def on_key_error_404(_: Request, __: Exception): return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, @@ -310,7 +345,7 @@ def submit_task( @start_as_current_span(TRACER, "task_id") def delete_submitted_task( task_id: str, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TaskResponse: return TaskResponse(task_id=runner.run(interface.clear_task, task_id)) @@ -328,8 +363,8 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) def get_tasks( + request: Request, runner: Annotated[WorkerDispatcher, Depends(_runner)], - _: Annotated[None, Depends(submit_permission)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -349,6 +384,15 @@ def get_tasks( tasks = runner.run(interface.get_tasks_by_status, desired_status) else: tasks = runner.run(interface.get_tasks) + + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + user = access_token.get("fedid") if access_token else None + + if user: + tasks = [t for t in tasks if t.task.metadata.get("user") == user] + return TasksListResponse(tasks=tasks) @@ -366,7 +410,7 @@ def get_tasks( def set_active_task( request: Request, task: WorkerTask, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(start_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerTask: """Set a task to active status, the worker should begin it as soon as possible. @@ -397,7 +441,7 @@ def get_passthrough_headers(request: Request) -> dict[str, str]: @start_as_current_span(TRACER, "task_id") def get_task( task_id: str, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> TrackableTask: """Retrieve a task""" @@ -475,7 +519,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, - _: Annotated[None, Depends(submit_permission)], + _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ @@ -507,6 +551,9 @@ def set_state( elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: + # active = runner.run(interface.get_active_task) + # if active.task.metadata.get("user"): + try: runner.run( interface.cancel_active_task, From 13093f6e5ce83e803032f88449b552e6155a9770 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 08:30:53 +0000 Subject: [PATCH 09/56] refactor: update rest api version --- src/blueapi/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index a64491090..1f4da7161 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -310,7 +310,7 @@ class ApplicationConfig(BlueapiBaseModel): """ #: API version to publish in OpenAPI schema - REST_API_VERSION: ClassVar[str] = "1.3.0" + REST_API_VERSION: ClassVar[str] = "1.3.1" LICENSE_INFO: ClassVar[dict[str, str]] = { "name": "Apache 2.0", From d7d57ae27d6953bc3470701ffcb0cf427e410597 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 10:13:46 +0000 Subject: [PATCH 10/56] comment out dependency addition in set_state --- src/blueapi/config.py | 2 +- src/blueapi/service/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 1f4da7161..a64491090 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -310,7 +310,7 @@ class ApplicationConfig(BlueapiBaseModel): """ #: API version to publish in OpenAPI schema - REST_API_VERSION: ClassVar[str] = "1.3.1" + REST_API_VERSION: ClassVar[str] = "1.3.0" LICENSE_INFO: ClassVar[dict[str, str]] = { "name": "Apache 2.0", diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 5aaa3d20a..289a66da8 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -519,7 +519,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, - _: Annotated[None, Depends(access_task_permission)], + # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 4ef3365d291a3cc7bed2eb7f6f07200d7acaed4f Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 14:16:24 +0000 Subject: [PATCH 11/56] refactor: add admin check and check to set state function --- src/blueapi/service/authentication.py | 3 +++ src/blueapi/service/main.py | 30 ++++++++++++++--------- tests/unit_tests/service/test_rest_api.py | 2 +- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index 6761256de..fd1a8a5f1 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -290,6 +290,9 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() + def admin(self): + return False + UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 289a66da8..7008fa21c 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -147,6 +147,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -154,21 +155,19 @@ def access_task_permission( access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) - try: - task = runner.run(interface.get_task_by_id, task_id) - except KeyError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None + task = runner.run(interface.get_task_by_id, task_id) - if ( + if not opa.admin() and ( access_token and task and access_token.get("fedid") != task.task.metadata.get("user") ): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -178,7 +177,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(request, task.task_id, runner) + access_task_permission(opa, request, task.task_id, runner) async def on_key_error_404(_: Request, __: Exception): @@ -390,8 +389,7 @@ def get_tasks( ) user = access_token.get("fedid") if access_token else None - if user: - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == user] return TasksListResponse(tasks=tasks) @@ -517,8 +515,10 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( + request: Request, state_change_request: StateChangeRequest, response: Response, + opa: Annotated[OPAClient, Depends(get_opa_client)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: @@ -546,14 +546,20 @@ def set_state( current_state in _ALLOWED_TRANSITIONS and new_state in _ALLOWED_TRANSITIONS[current_state] ): + active = runner.run(interface.get_active_task) + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + user = access_token.get("fedid") if access_token else None + + if not opa.admin() and active and active.task.metadata.get("user") != user: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + if new_state == WorkerState.PAUSED: runner.run(interface.pause_worker, state_change_request.defer) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: - # active = runner.run(interface.get_active_task) - # if active.task.metadata.get("user"): - try: runner.run( interface.cancel_active_task, diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 1ddf2c6ca..14439f111 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -251,7 +251,7 @@ def test_create_task(mock_runner: Mock, client: TestClient) -> None: response = client.post("/tasks", json=task.model_dump()) - mock_runner.run.assert_called_with(submit_task, task, {"user": "Unknown"}) + mock_runner.run.assert_called_with(submit_task, task, {"user": None}) assert response.json() == {"task_id": task_id} From 659cf58118962c7c95165a530c439a972d7d9f65 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:34:41 +0100 Subject: [PATCH 12/56] Update dependency names --- src/blueapi/service/main.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 7008fa21c..ff3457039 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -40,7 +40,13 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, submit_permission, validate_tiled_config +from .authorization import ( + OpaClient, + OpaUserClient, + opa, + submit_permission, + validate_tiled_config, +) from .model import ( DeviceModel, DeviceResponse, @@ -147,7 +153,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -167,7 +173,7 @@ def access_task_permission( # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -518,7 +524,7 @@ def set_state( request: Request, state_change_request: StateChangeRequest, response: Response, - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: From a6699033c349b5840a232f615518f57ce4cf1448 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:53:37 +0100 Subject: [PATCH 13/56] Add missing admin check --- helm/blueapi/config_schema.json | 7 ++++++- helm/blueapi/values.schema.json | 7 ++++++- src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 7 +++++++ tests/unit_tests/test_config.py | 1 + 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index e176e5c66..4f5d157eb 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -353,11 +353,16 @@ "submit_task_check": { "title": "Submit Task Check", "type": "string" + }, + "admin_check": { + "title": "Admin Check", + "type": "string" } }, "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "title": "OpaConfig", "type": "object", diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 5808acc54..6083f77e5 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -757,9 +757,14 @@ "type": "object", "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "properties": { + "admin_check": { + "title": "Admin Check", + "type": "string" + }, "audience": { "title": "Audience", "default": "account", diff --git a/src/blueapi/config.py b/src/blueapi/config.py index a64491090..a19e30c7b 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -301,6 +301,7 @@ class OpaConfig(BlueapiBaseModel): audience: str = "account" tiled_service_account_check: str submit_task_check: str + admin_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 9d9c96ddd..c84a7d1aa 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -75,6 +75,9 @@ async def require_submit_task(self, instrument_session: str, token: str): ): raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + async def is_admin(self, token: str) -> bool: + return await self._call_opa(self._conf.admin_check, {"token": token}) + class OpaUserClient: client: OpaClient @@ -88,6 +91,9 @@ async def can_submit_task(self, task: TaskRequest): LOGGER.info("Checking permissions to run task") await self.client.require_submit_task(task.instrument_session, self.token) + async def admin(self) -> bool: + return await self.client.is_admin(self.token) + async def validate_tiled_config( tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None @@ -116,6 +122,7 @@ async def opa( return OpaUserClient(opa, token) return None + async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 747e944d5..f3d3d37fd 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -342,6 +342,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "audience": "account", "tiled_service_account_check": "v1/tiled_service_account", "submit_task_check": "v1/submit_task", + "admin_check": "v1/admin_check", }, }, { From d1cb93987ce3831de8ccbe6488adbd9e96d294ab Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:45:58 +0100 Subject: [PATCH 14/56] Handle missing opa and fix tests --- src/blueapi/service/authorization.py | 3 +- src/blueapi/service/main.py | 12 ++++++-- tests/unit_tests/service/test_rest_api.py | 34 +++++++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index c84a7d1aa..c669b627c 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -127,4 +127,5 @@ async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, ): - await opa.can_submit_task(task_request) + if opa: + await opa.can_submit_task(task_request) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index ff3457039..3f5ab89ef 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -153,11 +153,14 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): + if not opa: + return + access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) @@ -558,7 +561,12 @@ def set_state( ) user = access_token.get("fedid") if access_token else None - if not opa.admin() and active and active.task.metadata.get("user") != user: + if ( + opa + and not opa.admin() + and active + and active.task.metadata.get("user") != user + ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if new_state == WorkerState.PAUSED: diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 14439f111..79258fc7b 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -574,7 +574,12 @@ def test_get_state(mock_runner: Mock, client: TestClient): def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.PAUSED - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -588,7 +593,12 @@ def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): current_state = WorkerState.PAUSED final_state = WorkerState.RUNNING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -602,7 +612,12 @@ def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): def test_set_state_running_to_aborting(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.ABORTING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -619,7 +634,12 @@ def test_set_state_running_to_stopping_including_reason( current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING reason = "blueapi is being stopped" - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", @@ -635,7 +655,11 @@ def test_set_state_transition_error(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING - mock_runner.run.side_effect = [current_state, TransitionError()] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + TransitionError(), + ] response = client.put( "/worker/state", From b889e20a1696eaf1bce53644428ecc852f60dc71 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:46:43 +0100 Subject: [PATCH 15/56] Remove old admin method --- src/blueapi/service/authentication.py | 3 -- src/blueapi/service/main.py | 42 +++++++-------------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index fd1a8a5f1..6761256de 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -290,9 +290,6 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() - def admin(self): - return False - UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 3f5ab89ef..3e97d5c87 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -154,31 +154,21 @@ def get_app(config: ApplicationConfig): def access_task_permission( opa: Annotated[OpaUserClient | None, Depends(opa)], - request: Request, task_id: str, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): - if not opa: - return - - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) task = runner.run(interface.get_task_by_id, task_id) - if not opa.admin() and ( - access_token - and task - and access_token.get("fedid") != task.task.metadata.get("user") - ): + if opa and not opa.admin() and (task and fedid != task.task.metadata.get("user")): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], - request: Request, task: WorkerTask, + opa: Annotated[OpaUserClient, Depends(opa)], + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): if not task.task_id: @@ -186,7 +176,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(opa, request, task.task_id, runner) + access_task_permission(opa, task.task_id, fedid, runner) async def on_key_error_404(_: Request, __: Exception): @@ -316,12 +306,11 @@ def submit_task( task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], - user: Fedid, + fedid: Fedid, ) -> TaskResponse: """Submit a task to the worker.""" try: - user = user or "Unknown" - task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + task_id: str = runner.run(interface.submit_task, task_request, {"user": fedid}) response.headers["Location"] = f"{request.url}/{task_id}" return TaskResponse(task_id=task_id) except ValidationError as e: @@ -371,7 +360,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) def get_tasks( - request: Request, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: @@ -393,12 +382,7 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None - - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) @@ -524,9 +508,9 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( - request: Request, state_change_request: StateChangeRequest, response: Response, + fedid: Fedid, opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -556,16 +540,12 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): active = runner.run(interface.get_active_task) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None if ( opa and not opa.admin() and active - and active.task.metadata.get("user") != user + and active.task.metadata.get("user") != fedid ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) From 3eb315a2cb882f7f7caddbbffd71e18a86bf5126 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:17:51 +0100 Subject: [PATCH 16/56] Use starlette statuses directly --- src/blueapi/service/authorization.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index c669b627c..edcec5cbd 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -6,7 +6,7 @@ from aiohttp import ClientSession from fastapi import Depends, HTTPException, Request -from starlette import status +from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token @@ -73,7 +73,7 @@ async def require_submit_task(self, instrument_session: str, token: str): "visit": int(match["visit"]), }, ): - raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + raise HTTPException(status_code=HTTP_403_FORBIDDEN) async def is_admin(self, token: str) -> bool: return await self._call_opa(self._conf.admin_check, {"token": token}) @@ -118,13 +118,13 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED) return OpaUserClient(opa, token) return None async def submit_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], task_request: TaskRequest, ): if opa: From 086953fe70c304b667bc7511f747083da6ababa8 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:27:47 +0100 Subject: [PATCH 17/56] test task submission authz --- .../unit_tests/service/test_authorization.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 37c1d7e3f..65f5c44a6 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -8,9 +8,12 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + OpaUserClient, opa, + submit_permission, validate_tiled_config, ) +from blueapi.service.model import TaskRequest # Reusable client patch decorator patch_client_session = patch( @@ -25,6 +28,7 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), submit_task_check="/auth/submit", + admin_check="/auth/admin", tiled_service_account_check="/auth/tiled", ) @@ -108,6 +112,105 @@ async def test_opa_adds_input_fields(session: MagicMock, opa_config: OpaConfig): ) +@pytest.mark.parametrize( + "result,context", + [(True, nullcontext()), (False, pytest.raises(HTTPException, match="403"))], +) +@patch_client_session +async def test_require_submit_task( + session: MagicMock, + opa_config: OpaConfig, + result: bool, + context: AbstractContextManager, +): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + + client = OpaClient(instrument="p99", config=opa_config) + + session.assert_called_once_with(base_url="http://auth.example.com/") + with context: + await client.require_submit_task( + instrument_session="cm12345-1", token="foo_bar" + ) + + session().post.assert_called_once_with( + "/auth/submit", + json={ + "input": { + "token": "foo_bar", + "beamline": "p99", + "audience": "account", + "visit": 1, + "proposal": 12345, + } + }, + ) + + +@patch_client_session +async def test_opa_require_submit_task_invalid_session( + session: MagicMock, opa_config: OpaConfig +): + client = OpaClient(instrument="p45", config=opa_config) + + with pytest.raises(ValueError): + await client.require_submit_task( + instrument_session="not a session", token="foo_bar" + ) + + +@pytest.mark.parametrize("result", [True, False]) +@patch_client_session +async def test_opa_is_admin(session: MagicMock, opa_config: OpaConfig, result: bool): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + client = OpaClient(instrument="p45", config=opa_config) + + admin = await client.is_admin("foo_bar") + + assert admin == result + + session().post.assert_called_once_with( + "/auth/admin", + json={"input": {"token": "foo_bar", "beamline": "p45", "audience": "account"}}, + ) + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_user_client_can_submit_task(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.require_submit_task = AsyncMock(side_effect=result) + + user_client = OpaUserClient(opa, "foo_bar") + + with context: + await user_client.can_submit_task( + TaskRequest(name="foo", params={}, instrument_session="cm12345-1") + ) + opa.require_submit_task.assert_called_once_with("cm12345-1", "foo_bar") + + +@pytest.mark.parametrize("result", [True, False]) +async def test_user_client_admin(result: bool): + opa = MagicMock(spec=OpaUserClient) + opa.is_admin = AsyncMock(return_value=result) + + user_client = OpaUserClient(opa, "foo_bar") + + admin = await user_client.admin() + + assert admin == result + + async def test_validate_tiled_config(): opa = MagicMock(spec=OpaClient) tiled = ServiceAccount() @@ -177,3 +280,21 @@ async def test_opa_dependency_without_authz(token): del request.app.state.authz user_client = await opa(request, token) assert user_client is None + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_submit_permission_dependency(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.can_submit_task.side_effect = result + with context: + await submit_permission(opa, Mock()) + + +async def test_submit_permission_dependency_without_opa(): + assert await submit_permission(None, Mock()) is None From 2d3a564182db7fe342458b1bc6d343135466408f Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 15:19:28 +0100 Subject: [PATCH 18/56] Use _config instead of _conf --- src/blueapi/service/authorization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index edcec5cbd..5f1e2e980 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -66,7 +66,7 @@ async def require_submit_task(self, instrument_session: str, token: str): raise ValueError("Invalid instrument session") if not await self._call_opa( - self._conf.submit_task_check, + self._config.submit_task_check, { "token": token, "proposal": int(match["proposal"]), @@ -76,7 +76,7 @@ async def require_submit_task(self, instrument_session: str, token: str): raise HTTPException(status_code=HTTP_403_FORBIDDEN) async def is_admin(self, token: str) -> bool: - return await self._call_opa(self._conf.admin_check, {"token": token}) + return await self._call_opa(self._config.admin_check, {"token": token}) class OpaUserClient: From 83ae2a6c8462b0e484dcf32459ed312625a49732 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 16:36:19 +0100 Subject: [PATCH 19/56] Re-use instrument session regex --- src/blueapi/service/authorization.py | 3 +-- src/blueapi/utils/__init__.py | 3 +++ src/blueapi/utils/serialization.py | 10 +++------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 5f1e2e980..e54d0daa9 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,5 +1,4 @@ import logging -import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext from typing import Annotated, Any, Self, cast @@ -11,9 +10,9 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token from blueapi.service.model import TaskRequest +from blueapi.utils import INSTRUMENT_SESSION_RE LOGGER = logging.getLogger(__name__) -INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") class OpaClient: diff --git a/src/blueapi/utils/__init__.py b/src/blueapi/utils/__init__.py index 4b2e41f2c..bf96b7009 100644 --- a/src/blueapi/utils/__init__.py +++ b/src/blueapi/utils/__init__.py @@ -1,3 +1,4 @@ +import re from collections.abc import Callable from functools import wraps from typing import ParamSpec, TypeVar @@ -31,6 +32,8 @@ Args = ParamSpec("Args") Return = TypeVar("Return") +INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") + def deprecated(alternative): from warnings import warn diff --git a/src/blueapi/utils/serialization.py b/src/blueapi/utils/serialization.py index deee82b1e..8918cf882 100644 --- a/src/blueapi/utils/serialization.py +++ b/src/blueapi/utils/serialization.py @@ -1,9 +1,10 @@ import json -import re from typing import Any from pydantic import BaseModel +from blueapi import utils + def serialize(obj: Any) -> Any: """ @@ -28,13 +29,8 @@ def serialize(obj: Any) -> Any: return obj -_INSTRUMENT_SESSION_AUTHZ_REGEX: re.Pattern = re.compile( - r"^[a-zA-Z]{2}(?P\d+)-(?P\d+)$" -) - - def access_blob(instrument_session: str, beamline: str) -> str: - m = _INSTRUMENT_SESSION_AUTHZ_REGEX.match(instrument_session) + m = utils.INSTRUMENT_SESSION_RE.match(instrument_session) if m is None: raise ValueError( "Unable to extract proposal and visit from " From 029bc89b3f25dec7c826de221198c46f1b5aa845 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:04:21 +0100 Subject: [PATCH 20/56] remove task access check --- src/blueapi/service/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 3e97d5c87..e886f617e 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -512,7 +512,6 @@ def set_state( response: Response, fedid: Fedid, opa: Annotated[OpaUserClient, Depends(opa)], - # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 4e12a23cb45e0ffc5b386f00527a99e4b073e9b0 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:06:25 +0100 Subject: [PATCH 21/56] Add match to raises check --- tests/unit_tests/service/test_authorization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 65f5c44a6..a2e602f21 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -155,7 +155,7 @@ async def test_opa_require_submit_task_invalid_session( ): client = OpaClient(instrument="p45", config=opa_config) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Invalid instrument session"): await client.require_submit_task( instrument_session="not a session", token="foo_bar" ) From 850682ba31ecfecb8786000c8243f15d6e287de1 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:12:30 +0100 Subject: [PATCH 22/56] Add exception detail --- src/blueapi/service/authorization.py | 8 ++++++-- src/blueapi/service/main.py | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index e54d0daa9..f9008138a 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -72,7 +72,9 @@ async def require_submit_task(self, instrument_session: str, token: str): "visit": int(match["visit"]), }, ): - raise HTTPException(status_code=HTTP_403_FORBIDDEN) + raise HTTPException( + status_code=HTTP_403_FORBIDDEN, detail="Not authorized to submit task" + ) async def is_admin(self, token: str) -> bool: return await self._call_opa(self._config.admin_check, {"token": token}) @@ -117,7 +119,9 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: - raise HTTPException(status_code=HTTP_401_UNAUTHORIZED) + raise HTTPException( + status_code=HTTP_401_UNAUTHORIZED, detail="Authentication missing" + ) return OpaUserClient(opa, token) return None diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index e886f617e..6d9f89574 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -546,7 +546,10 @@ def set_state( and active and active.task.metadata.get("user") != fedid ): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to set worker state", + ) if new_state == WorkerState.PAUSED: runner.run(interface.pause_worker, state_change_request.defer) From 2f1d3eb3ab219df80528637a2c894259399999a8 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:15:15 +0100 Subject: [PATCH 23/56] Let admin see all tasks --- src/blueapi/service/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 6d9f89574..df6e2c301 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -362,6 +362,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: def get_tasks( fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], + opa: Annotated[OpaUserClient, Depends(opa)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -382,7 +383,8 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] + if opa and not opa.admin(): + tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) From e720bc49f5984962f4bbbe5a34acd8c3248ad8e7 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:15:55 +0100 Subject: [PATCH 24/56] Start of api authz tests --- tests/unit_tests/service/test_rest_api.py | 56 ++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 79258fc7b..e903e74a5 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -14,9 +14,15 @@ from pydantic_core import InitErrorDetails from super_state_machine.errors import TransitionError -from blueapi.config import ApplicationConfig, CORSConfig, OIDCConfig, RestConfig +from blueapi.config import ( + ApplicationConfig, + CORSConfig, + OIDCConfig, + RestConfig, +) from blueapi.core.bluesky_types import Plan from blueapi.service import main +from blueapi.service.authorization import OpaUserClient, opa from blueapi.service.interface import ( cancel_active_task, get_device, @@ -54,6 +60,11 @@ def mock_runner() -> Mock: return Mock(spec=WorkerDispatcher) +@pytest.fixture +def mock_opa_client() -> Mock: + return Mock(spec=OpaUserClient) + + @pytest.fixture def client(mock_runner: Mock) -> Iterator[TestClient]: with patch("blueapi.service.interface.worker"): @@ -79,6 +90,27 @@ def client_with_auth( main.teardown_runner() +@pytest.fixture +def access_token(valid_token_with_jwt: dict[str, Any]) -> str: + return valid_token_with_jwt["access_token"] + + +@pytest.fixture +def client_with_opa( + mock_runner: Mock, + oidc_config: OIDCConfig, + mock_opa_client: Mock, + mock_authn_server, +): + with patch("blueapi.service.interface.worker"): + main.setup_runner(runner=mock_runner) + app = main.get_app(ApplicationConfig(oidc=oidc_config)) + app.dependency_overrides[opa] = lambda: mock_opa_client + client = TestClient(app) + yield client + main.teardown_runner() + + @pytest.fixture def rest_config_with_cors() -> RestConfig: cors_config = CORSConfig( @@ -416,6 +448,28 @@ def test_get_tasks_by_status_invalid(client: TestClient) -> None: assert response.status_code == status.HTTP_400_BAD_REQUEST +def test_get_tasks_filters_by_user( + mock_runner: Mock, + client_with_opa: TestClient, + access_token: str, + mock_opa_client: Mock, +): + + print("Start of test") + mock_runner.run.return_value = [ + TrackableTask(task_id="foo", task=Task(name="f1", metadata={"user": "jd1"})), + TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), + ] + print(f"in test: {mock_opa_client=}") + mock_opa_client.admin.return_value = False + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + tasks = client_with_opa.get("/tasks").json().get("tasks") + print(tasks) + + assert len(tasks) == 1 + assert tasks[0]["task_id"] == "foo" + + def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: task_id = str(uuid.uuid4()) mock_runner.run.return_value = task_id From d73d01ab421db9cb311ca860f05b88b0d4327246 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 14:30:20 +0100 Subject: [PATCH 25/56] Make get_tasks async to access authz check --- src/blueapi/service/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index df6e2c301..08e66f5b1 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -359,7 +359,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router_v1.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) -def get_tasks( +async def get_tasks( fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], opa: Annotated[OpaUserClient, Depends(opa)], @@ -383,7 +383,7 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - if opa and not opa.admin(): + if opa and not await opa.admin(): tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) From d1b7302795de55804d971c963b11942f83c84198 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 14:45:41 +0100 Subject: [PATCH 26/56] Parametrise filter test to check with and without admin --- tests/unit_tests/service/test_rest_api.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index e903e74a5..548ac3078 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -448,11 +448,14 @@ def test_get_tasks_by_status_invalid(client: TestClient) -> None: assert response.status_code == status.HTTP_400_BAD_REQUEST +@pytest.mark.parametrize("admin,task_ids", [(True, ["foo", "bar"]), (False, ["foo"])]) def test_get_tasks_filters_by_user( mock_runner: Mock, client_with_opa: TestClient, access_token: str, mock_opa_client: Mock, + admin: bool, + task_ids: list[str], ): print("Start of test") @@ -461,13 +464,12 @@ def test_get_tasks_filters_by_user( TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), ] print(f"in test: {mock_opa_client=}") - mock_opa_client.admin.return_value = False + mock_opa_client.admin.return_value = admin client_with_opa.headers["Authorization"] = f"Bearer {access_token}" tasks = client_with_opa.get("/tasks").json().get("tasks") print(tasks) - assert len(tasks) == 1 - assert tasks[0]["task_id"] == "foo" + assert [t["task_id"] for t in tasks] == task_ids def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: From 02eba884b12cc5c98f7a834d2a2ee3845b25045a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 15:17:11 +0100 Subject: [PATCH 27/56] Add test for deleting tasks --- src/blueapi/service/main.py | 12 ++++++++---- tests/unit_tests/service/test_rest_api.py | 24 ++++++++++++++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 08e66f5b1..1e5a6fbcc 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -152,7 +152,7 @@ def get_app(config: ApplicationConfig): return app -def access_task_permission( +async def access_task_permission( opa: Annotated[OpaUserClient | None, Depends(opa)], task_id: str, fedid: Fedid, @@ -160,12 +160,16 @@ def access_task_permission( ): task = runner.run(interface.get_task_by_id, task_id) - if opa and not opa.admin() and (task and fedid != task.task.metadata.get("user")): + if ( + opa + and not await opa.admin() + and (task and fedid != task.task.metadata.get("user")) + ): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask -def start_task_permission( +async def start_task_permission( task: WorkerTask, opa: Annotated[OpaUserClient, Depends(opa)], fedid: Fedid, @@ -176,7 +180,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(opa, task.task_id, fedid, runner) + await access_task_permission(opa, task.task_id, fedid, runner) async def on_key_error_404(_: Request, __: Exception): diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 548ac3078..bb61d0a57 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -21,7 +21,7 @@ RestConfig, ) from blueapi.core.bluesky_types import Plan -from blueapi.service import main +from blueapi.service import interface, main from blueapi.service.authorization import OpaUserClient, opa from blueapi.service.interface import ( cancel_active_task, @@ -479,6 +479,28 @@ def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: assert response.json() == {"task_id": f"{task_id}"} +def test_cant_delete_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + access_token: str, + mock_opa_client: Mock, +): + mock_opa_client.admin.return_value = False + mock_runner.run.side_effect = lambda mth, *args: { + interface.get_task_by_id: TrackableTask( + task_id="bar", task=Task(name="t2", metadata={"user": "jd2"}) + ), + }[mth] + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + resp = client_with_opa.delete("/tasks/bar") + + # 404 to obfuscate whether task exists when inaccessible + assert resp.status_code == 404 + + mock_runner.run.assert_called_once() + + def test_set_active_task(client: TestClient) -> None: task_id = str(uuid.uuid4()) task = WorkerTask(task_id=task_id) From b34600ad5ad459e140d999702225262981634c22 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 16:42:20 +0100 Subject: [PATCH 28/56] Add test for submit without permission --- tests/unit_tests/service/test_rest_api.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index bb61d0a57..6495c0c35 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -7,7 +7,7 @@ import jwt import pytest from bluesky.protocols import Stoppable -from fastapi import status +from fastapi import HTTPException, status from fastapi.testclient import TestClient from httpx import Headers from pydantic import BaseModel, ValidationError @@ -287,6 +287,23 @@ def test_create_task(mock_runner: Mock, client: TestClient) -> None: assert response.json() == {"task_id": task_id} +def test_submit_task_requires_permission( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, +): + task = TaskRequest(name="sleep", params={"time": 2}, instrument_session="cm12345-2") + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + mock_opa_client.can_submit_task.side_effect = HTTPException(status_code=403) + mock_runner.run.side_effect = RuntimeError("Task should not be submitted") + + resp = client_with_opa.post("/tasks", json=task.model_dump()) + + assert resp.status_code == 403 + mock_runner.run.assert_not_called() + + def test_create_task_inserts_auth_metadata( mock_runner: Mock, client_with_auth: TestClient, From 6967b1698cf830be2d231306675f7e47d388ca65 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 09:27:58 +0100 Subject: [PATCH 29/56] Test setting other user's task active --- tests/unit_tests/service/test_rest_api.py | 29 +++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 6495c0c35..d63720f78 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -566,6 +566,35 @@ def test_set_active_task_worker_already_running( assert response.json() == {"detail": "Worker already active"} +@pytest.mark.parametrize("admin,status", [(True, 200), (False, 404)]) +def test_set_other_users_task_active( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + + task_id = "foo" + task = WorkerTask(task_id=task_id) + mock_opa_client.admin.return_value = admin + + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + mock_runner.run.side_effect = lambda mth, *a, **kw: { + interface.get_task_by_id: TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ), + interface.get_active_task: None, + interface.begin_task: None, + }[mth] + + resp = client_with_opa.put("/worker/task", json=task.model_dump()) + + assert resp.status_code == status + + def test_get_task(mock_runner: Mock, client: TestClient): task_id = str(uuid.uuid4()) task = TrackableTask( From 49b1b97557bba4b88a5159617296cca95a8b412d Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 09:44:14 +0100 Subject: [PATCH 30/56] Test getting other users task --- tests/unit_tests/service/test_rest_api.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index d63720f78..3c78dc5ab 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -627,6 +627,25 @@ def test_get_task(mock_runner: Mock, client: TestClient): } +@pytest.mark.parametrize("admin,status", [(True, 200), (False, 404)]) +def test_get_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + mock_runner.run.return_value = TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ) + mock_opa_client.admin.return_value = admin + + resp = client_with_opa.get("/tasks/foo") + assert resp.status_code == status + + def test_get_all_tasks(mock_runner: Mock, client: TestClient): task_id = str(uuid.uuid4()) tasks = [ From 0139c0be39f0be79b8cf2f8e657c75a72a4a9d47 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 11:05:06 +0100 Subject: [PATCH 31/56] Add tests for set state --- src/blueapi/service/main.py | 4 ++-- tests/unit_tests/service/test_rest_api.py | 29 +++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 1e5a6fbcc..3e7cdeeb0 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -513,7 +513,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt tags=[Tag.TASK], ) @start_as_current_span(TRACER, "state_change_request.new_state") -def set_state( +async def set_state( state_change_request: StateChangeRequest, response: Response, fedid: Fedid, @@ -548,7 +548,7 @@ def set_state( if ( opa - and not opa.admin() + and not await opa.admin() and active and active.task.metadata.get("user") != fedid ): diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 3c78dc5ab..81ab6a97e 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -833,6 +833,35 @@ def test_set_state_invalid_transition(mock_runner: Mock, client: TestClient): } +@pytest.mark.parametrize("admin,status", [(True, 202), (False, 403)]) +def test_set_state_of_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + + mock_opa_client.admin.return_value = admin + mock_runner.run.side_effect = lambda mth, *a, **kw: { + interface.get_active_task: TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ), + interface.get_worker_state: WorkerState.RUNNING, + interface.cancel_active_task: WorkerState.ABORTING, + }[mth] + + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + resp = client_with_opa.put( + "/worker/state", + json=StateChangeRequest(new_state=WorkerState.ABORTING).model_dump(), + ) + + assert resp.status_code == status + + def test_get_environment_idle(mock_runner: Mock, client: TestClient) -> None: environment_id = uuid.uuid4() mock_runner.state = EnvironmentResponse( From 55c98865b2d05975a39a806dfdb6a1d2b2d95db8 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Wed, 10 Jun 2026 10:27:17 +0100 Subject: [PATCH 32/56] Remove print debugging --- tests/unit_tests/service/test_rest_api.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 81ab6a97e..8ec1b9b65 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -475,16 +475,13 @@ def test_get_tasks_filters_by_user( task_ids: list[str], ): - print("Start of test") mock_runner.run.return_value = [ TrackableTask(task_id="foo", task=Task(name="f1", metadata={"user": "jd1"})), TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), ] - print(f"in test: {mock_opa_client=}") mock_opa_client.admin.return_value = admin client_with_opa.headers["Authorization"] = f"Bearer {access_token}" tasks = client_with_opa.get("/tasks").json().get("tasks") - print(tasks) assert [t["task_id"] for t in tasks] == task_ids From c3fa59742b17429174ae1c3edcacd4392d61b403 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 08:30:53 +0000 Subject: [PATCH 33/56] refactor: update rest api version --- src/blueapi/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 5b2b4453d..b9a71db31 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -323,7 +323,7 @@ class ApplicationConfig(BlueapiBaseModel): """ #: API version to publish in OpenAPI schema - REST_API_VERSION: ClassVar[str] = "1.4.0" + REST_API_VERSION: ClassVar[str] = "1.4.1" LICENSE_INFO: ClassVar[dict[str, str]] = { "name": "Apache 2.0", From 8c6c4818322546f77965678a16d0948e9349eb75 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 10:13:46 +0000 Subject: [PATCH 34/56] comment out dependency addition in set_state --- src/blueapi/service/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 7e9c2f017..bc593ef1e 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -521,7 +521,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt def set_state( state_change_request: StateChangeRequest, response: Response, - _: Annotated[None, Depends(access_task_permission)], + # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 9392e1818510dd6870a9dad696c0e5e45daec126 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 May 2026 14:16:24 +0000 Subject: [PATCH 35/56] refactor: add admin check and check to set state function --- src/blueapi/service/authentication.py | 3 +++ src/blueapi/service/main.py | 30 ++++++++++++++--------- tests/unit_tests/service/test_rest_api.py | 2 +- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index 6761256de..fd1a8a5f1 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -290,6 +290,9 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() + def admin(self): + return False + UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index bc593ef1e..68415f031 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -149,6 +149,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -156,21 +157,19 @@ def access_task_permission( access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) - try: - task = runner.run(interface.get_task_by_id, task_id) - except KeyError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None + task = runner.run(interface.get_task_by_id, task_id) - if ( + if not opa.admin() and ( access_token and task and access_token.get("fedid") != task.task.metadata.get("user") ): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( + opa: Annotated[OPAClient, Depends(get_opa_client)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -180,7 +179,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(request, task.task_id, runner) + access_task_permission(opa, request, task.task_id, runner) async def on_key_error_404(_: Request, __: Exception): @@ -392,8 +391,7 @@ def get_tasks( ) user = access_token.get("fedid") if access_token else None - if user: - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == user] return TasksListResponse(tasks=tasks) @@ -519,8 +517,10 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( + request: Request, state_change_request: StateChangeRequest, response: Response, + opa: Annotated[OPAClient, Depends(get_opa_client)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: @@ -548,14 +548,20 @@ def set_state( current_state in _ALLOWED_TRANSITIONS and new_state in _ALLOWED_TRANSITIONS[current_state] ): + active = runner.run(interface.get_active_task) + access_token: dict[str, Any] | None = getattr( + request.state, "decoded_access_token", None + ) + user = access_token.get("fedid") if access_token else None + + if not opa.admin() and active and active.task.metadata.get("user") != user: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + if new_state == WorkerState.PAUSED: runner.run(interface.pause_worker, state_change_request.defer) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: - # active = runner.run(interface.get_active_task) - # if active.task.metadata.get("user"): - try: runner.run( interface.cancel_active_task, diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 1ddf2c6ca..14439f111 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -251,7 +251,7 @@ def test_create_task(mock_runner: Mock, client: TestClient) -> None: response = client.post("/tasks", json=task.model_dump()) - mock_runner.run.assert_called_with(submit_task, task, {"user": "Unknown"}) + mock_runner.run.assert_called_with(submit_task, task, {"user": None}) assert response.json() == {"task_id": task_id} From 25afaad27fbe795c7d73457bacd271cc05eb52b4 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:34:41 +0100 Subject: [PATCH 36/56] Update dependency names --- src/blueapi/service/main.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 68415f031..eef5149d5 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -41,7 +41,13 @@ from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum -from .authorization import OpaClient, submit_permission, validate_tiled_config +from .authorization import ( + OpaClient, + OpaUserClient, + opa, + submit_permission, + validate_tiled_config, +) from .model import ( DeviceModel, DeviceResponse, @@ -149,7 +155,7 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -169,7 +175,7 @@ def access_task_permission( # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], request: Request, task: WorkerTask, runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -520,7 +526,7 @@ def set_state( request: Request, state_change_request: StateChangeRequest, response: Response, - opa: Annotated[OPAClient, Depends(get_opa_client)], + opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: From 5b8ed0109161a3f7c93ae6963e3d1d4dcd321a1f Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 12:53:37 +0100 Subject: [PATCH 37/56] Add missing admin check --- helm/blueapi/config_schema.json | 7 ++++++- helm/blueapi/values.schema.json | 7 ++++++- src/blueapi/config.py | 1 + src/blueapi/service/authorization.py | 7 +++++++ tests/unit_tests/test_config.py | 1 + 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/helm/blueapi/config_schema.json b/helm/blueapi/config_schema.json index d64f3ce6b..3b9d03138 100644 --- a/helm/blueapi/config_schema.json +++ b/helm/blueapi/config_schema.json @@ -307,11 +307,16 @@ "submit_task_check": { "title": "Submit Task Check", "type": "string" + }, + "admin_check": { + "title": "Admin Check", + "type": "string" } }, "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "title": "OpaConfig", "type": "object", diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 1cb719910..c16ca03dc 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -712,9 +712,14 @@ "type": "object", "required": [ "tiled_service_account_check", - "submit_task_check" + "submit_task_check", + "admin_check" ], "properties": { + "admin_check": { + "title": "Admin Check", + "type": "string" + }, "audience": { "title": "Audience", "default": "account", diff --git a/src/blueapi/config.py b/src/blueapi/config.py index b9a71db31..9367c9f88 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -314,6 +314,7 @@ class OpaConfig(BlueapiBaseModel): audience: str = "account" tiled_service_account_check: str submit_task_check: str + admin_check: str class ApplicationConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 9d9c96ddd..c84a7d1aa 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -75,6 +75,9 @@ async def require_submit_task(self, instrument_session: str, token: str): ): raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + async def is_admin(self, token: str) -> bool: + return await self._call_opa(self._conf.admin_check, {"token": token}) + class OpaUserClient: client: OpaClient @@ -88,6 +91,9 @@ async def can_submit_task(self, task: TaskRequest): LOGGER.info("Checking permissions to run task") await self.client.require_submit_task(task.instrument_session, self.token) + async def admin(self) -> bool: + return await self.client.is_admin(self.token) + async def validate_tiled_config( tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None @@ -116,6 +122,7 @@ async def opa( return OpaUserClient(opa, token) return None + async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 999e01d23..ed00587a1 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -345,6 +345,7 @@ def test_config_yaml_parsed(temp_yaml_config_file): "audience": "account", "tiled_service_account_check": "v1/tiled_service_account", "submit_task_check": "v1/submit_task", + "admin_check": "v1/admin_check", }, }, { From d795c5f1e737204940bf8002822f7e86172e77a4 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:45:58 +0100 Subject: [PATCH 38/56] Handle missing opa and fix tests --- src/blueapi/service/authorization.py | 3 +- src/blueapi/service/main.py | 12 ++++++-- tests/unit_tests/service/test_rest_api.py | 34 +++++++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index c84a7d1aa..c669b627c 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -127,4 +127,5 @@ async def submit_permission( opa: Annotated[OpaUserClient, Depends(opa)], task_request: TaskRequest, ): - await opa.can_submit_task(task_request) + if opa: + await opa.can_submit_task(task_request) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index eef5149d5..0c236f64b 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -155,11 +155,14 @@ def get_app(config: ApplicationConfig): def access_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], request: Request, task_id: str, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): + if not opa: + return + access_token: dict[str, Any] | None = getattr( request.state, "decoded_access_token", None ) @@ -560,7 +563,12 @@ def set_state( ) user = access_token.get("fedid") if access_token else None - if not opa.admin() and active and active.task.metadata.get("user") != user: + if ( + opa + and not opa.admin() + and active + and active.task.metadata.get("user") != user + ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if new_state == WorkerState.PAUSED: diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 14439f111..79258fc7b 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -574,7 +574,12 @@ def test_get_state(mock_runner: Mock, client: TestClient): def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.PAUSED - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -588,7 +593,12 @@ def test_set_state_running_to_paused(mock_runner: Mock, client: TestClient): def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): current_state = WorkerState.PAUSED final_state = WorkerState.RUNNING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -602,7 +612,12 @@ def test_set_state_paused_to_running(mock_runner: Mock, client: TestClient): def test_set_state_running_to_aborting(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.ABORTING - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() @@ -619,7 +634,12 @@ def test_set_state_running_to_stopping_including_reason( current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING reason = "blueapi is being stopped" - mock_runner.run.side_effect = [current_state, None, final_state] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + None, + final_state, + ] response = client.put( "/worker/state", @@ -635,7 +655,11 @@ def test_set_state_transition_error(mock_runner: Mock, client: TestClient): current_state = WorkerState.RUNNING final_state = WorkerState.STOPPING - mock_runner.run.side_effect = [current_state, TransitionError()] + mock_runner.run.side_effect = [ + current_state, + TrackableTask(task_id="foobar", task=Task(name="foo")), + TransitionError(), + ] response = client.put( "/worker/state", From 5b79bb95c8dc1761e9ccd669b238d4ba7e8c9561 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 26 May 2026 15:46:43 +0100 Subject: [PATCH 39/56] Remove old admin method --- src/blueapi/service/authentication.py | 3 -- src/blueapi/service/main.py | 42 +++++++-------------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index fd1a8a5f1..6761256de 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -290,9 +290,6 @@ def unchecked_bearer_token(req: Request) -> str | None: return None return param.strip() - def admin(self): - return False - UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)] diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 0c236f64b..b5c1189de 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -156,31 +156,21 @@ def get_app(config: ApplicationConfig): def access_task_permission( opa: Annotated[OpaUserClient | None, Depends(opa)], - request: Request, task_id: str, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): - if not opa: - return - - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) task = runner.run(interface.get_task_by_id, task_id) - if not opa.admin() and ( - access_token - and task - and access_token.get("fedid") != task.task.metadata.get("user") - ): + if opa and not opa.admin() and (task and fedid != task.task.metadata.get("user")): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask def start_task_permission( - opa: Annotated[OpaUserClient, Depends(opa)], - request: Request, task: WorkerTask, + opa: Annotated[OpaUserClient, Depends(opa)], + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], ): if not task.task_id: @@ -188,7 +178,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(opa, request, task.task_id, runner) + access_task_permission(opa, task.task_id, fedid, runner) async def on_key_error_404(_: Request, __: Exception): @@ -318,12 +308,11 @@ def submit_task( task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])], _: Annotated[None, Depends(submit_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], - user: Fedid, + fedid: Fedid, ) -> TaskResponse: """Submit a task to the worker.""" try: - user = user or "Unknown" - task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + task_id: str = runner.run(interface.submit_task, task_request, {"user": fedid}) response.headers["Location"] = f"{request.url}/{task_id}" return TaskResponse(task_id=task_id) except ValidationError as e: @@ -373,7 +362,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) def get_tasks( - request: Request, + fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: @@ -395,12 +384,7 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None - - tasks = [t for t in tasks if t.task.metadata.get("user") == user] + tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) @@ -526,9 +510,9 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt ) @start_as_current_span(TRACER, "state_change_request.new_state") def set_state( - request: Request, state_change_request: StateChangeRequest, response: Response, + fedid: Fedid, opa: Annotated[OpaUserClient, Depends(opa)], # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], @@ -558,16 +542,12 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): active = runner.run(interface.get_active_task) - access_token: dict[str, Any] | None = getattr( - request.state, "decoded_access_token", None - ) - user = access_token.get("fedid") if access_token else None if ( opa and not opa.admin() and active - and active.task.metadata.get("user") != user + and active.task.metadata.get("user") != fedid ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) From ba663055b79cc357da5ce47dbf5105a621e89219 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:17:51 +0100 Subject: [PATCH 40/56] Use starlette statuses directly --- src/blueapi/service/authorization.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index c669b627c..edcec5cbd 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -6,7 +6,7 @@ from aiohttp import ClientSession from fastapi import Depends, HTTPException, Request -from starlette import status +from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token @@ -73,7 +73,7 @@ async def require_submit_task(self, instrument_session: str, token: str): "visit": int(match["visit"]), }, ): - raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED) + raise HTTPException(status_code=HTTP_403_FORBIDDEN) async def is_admin(self, token: str) -> bool: return await self._call_opa(self._conf.admin_check, {"token": token}) @@ -118,13 +118,13 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED) return OpaUserClient(opa, token) return None async def submit_permission( - opa: Annotated[OpaUserClient, Depends(opa)], + opa: Annotated[OpaUserClient | None, Depends(opa)], task_request: TaskRequest, ): if opa: From d98149db902af50f5a5714de6ee7f131e3f28b56 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 28 May 2026 17:27:47 +0100 Subject: [PATCH 41/56] test task submission authz --- .../unit_tests/service/test_authorization.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 37c1d7e3f..65f5c44a6 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -8,9 +8,12 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authorization import ( OpaClient, + OpaUserClient, opa, + submit_permission, validate_tiled_config, ) +from blueapi.service.model import TaskRequest # Reusable client patch decorator patch_client_session = patch( @@ -25,6 +28,7 @@ def opa_config() -> OpaConfig: return OpaConfig( root=HttpUrl("http://auth.example.com"), submit_task_check="/auth/submit", + admin_check="/auth/admin", tiled_service_account_check="/auth/tiled", ) @@ -108,6 +112,105 @@ async def test_opa_adds_input_fields(session: MagicMock, opa_config: OpaConfig): ) +@pytest.mark.parametrize( + "result,context", + [(True, nullcontext()), (False, pytest.raises(HTTPException, match="403"))], +) +@patch_client_session +async def test_require_submit_task( + session: MagicMock, + opa_config: OpaConfig, + result: bool, + context: AbstractContextManager, +): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + + client = OpaClient(instrument="p99", config=opa_config) + + session.assert_called_once_with(base_url="http://auth.example.com/") + with context: + await client.require_submit_task( + instrument_session="cm12345-1", token="foo_bar" + ) + + session().post.assert_called_once_with( + "/auth/submit", + json={ + "input": { + "token": "foo_bar", + "beamline": "p99", + "audience": "account", + "visit": 1, + "proposal": 12345, + } + }, + ) + + +@patch_client_session +async def test_opa_require_submit_task_invalid_session( + session: MagicMock, opa_config: OpaConfig +): + client = OpaClient(instrument="p45", config=opa_config) + + with pytest.raises(ValueError): + await client.require_submit_task( + instrument_session="not a session", token="foo_bar" + ) + + +@pytest.mark.parametrize("result", [True, False]) +@patch_client_session +async def test_opa_is_admin(session: MagicMock, opa_config: OpaConfig, result: bool): + session.return_value.post = AsyncMock( + return_value=MagicMock(json=AsyncMock(return_value={"result": result})) + ) + client = OpaClient(instrument="p45", config=opa_config) + + admin = await client.is_admin("foo_bar") + + assert admin == result + + session().post.assert_called_once_with( + "/auth/admin", + json={"input": {"token": "foo_bar", "beamline": "p45", "audience": "account"}}, + ) + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_user_client_can_submit_task(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.require_submit_task = AsyncMock(side_effect=result) + + user_client = OpaUserClient(opa, "foo_bar") + + with context: + await user_client.can_submit_task( + TaskRequest(name="foo", params={}, instrument_session="cm12345-1") + ) + opa.require_submit_task.assert_called_once_with("cm12345-1", "foo_bar") + + +@pytest.mark.parametrize("result", [True, False]) +async def test_user_client_admin(result: bool): + opa = MagicMock(spec=OpaUserClient) + opa.is_admin = AsyncMock(return_value=result) + + user_client = OpaUserClient(opa, "foo_bar") + + admin = await user_client.admin() + + assert admin == result + + async def test_validate_tiled_config(): opa = MagicMock(spec=OpaClient) tiled = ServiceAccount() @@ -177,3 +280,21 @@ async def test_opa_dependency_without_authz(token): del request.app.state.authz user_client = await opa(request, token) assert user_client is None + + +@pytest.mark.parametrize( + "result,context", + [ + (None, nullcontext()), + (HTTPException(status_code=403), pytest.raises(HTTPException, match="403")), + ], +) +async def test_submit_permission_dependency(result, context: AbstractContextManager): + opa = MagicMock(spec=OpaUserClient) + opa.can_submit_task.side_effect = result + with context: + await submit_permission(opa, Mock()) + + +async def test_submit_permission_dependency_without_opa(): + assert await submit_permission(None, Mock()) is None From 97ee31a15f52856ba5bf7635ec7386c74b738ea0 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 15:19:28 +0100 Subject: [PATCH 42/56] Use _config instead of _conf --- src/blueapi/service/authorization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index edcec5cbd..5f1e2e980 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -66,7 +66,7 @@ async def require_submit_task(self, instrument_session: str, token: str): raise ValueError("Invalid instrument session") if not await self._call_opa( - self._conf.submit_task_check, + self._config.submit_task_check, { "token": token, "proposal": int(match["proposal"]), @@ -76,7 +76,7 @@ async def require_submit_task(self, instrument_session: str, token: str): raise HTTPException(status_code=HTTP_403_FORBIDDEN) async def is_admin(self, token: str) -> bool: - return await self._call_opa(self._conf.admin_check, {"token": token}) + return await self._call_opa(self._config.admin_check, {"token": token}) class OpaUserClient: From 9f9335ff2977295e5ad560ac72b58bb0a4c723d4 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 16:36:19 +0100 Subject: [PATCH 43/56] Re-use instrument session regex --- src/blueapi/service/authorization.py | 3 +-- src/blueapi/utils/__init__.py | 3 +++ src/blueapi/utils/serialization.py | 10 +++------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index 5f1e2e980..e54d0daa9 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -1,5 +1,4 @@ import logging -import re from collections.abc import Mapping from contextlib import AbstractAsyncContextManager, aclosing, nullcontext from typing import Annotated, Any, Self, cast @@ -11,9 +10,9 @@ from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount from blueapi.service.authentication import TiledAuth, unchecked_bearer_token from blueapi.service.model import TaskRequest +from blueapi.utils import INSTRUMENT_SESSION_RE LOGGER = logging.getLogger(__name__) -INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") class OpaClient: diff --git a/src/blueapi/utils/__init__.py b/src/blueapi/utils/__init__.py index e5cc85166..f722c5b42 100644 --- a/src/blueapi/utils/__init__.py +++ b/src/blueapi/utils/__init__.py @@ -1,3 +1,4 @@ +import re from collections.abc import Callable, Mapping from functools import wraps from logging import Logger @@ -30,6 +31,8 @@ Args = ParamSpec("Args") Return = TypeVar("Return") +INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P\d+)-(?P\d+)$") + def report_successful_devices( devices: Mapping[str, Any], sim_backend: bool, logger: Logger diff --git a/src/blueapi/utils/serialization.py b/src/blueapi/utils/serialization.py index deee82b1e..8918cf882 100644 --- a/src/blueapi/utils/serialization.py +++ b/src/blueapi/utils/serialization.py @@ -1,9 +1,10 @@ import json -import re from typing import Any from pydantic import BaseModel +from blueapi import utils + def serialize(obj: Any) -> Any: """ @@ -28,13 +29,8 @@ def serialize(obj: Any) -> Any: return obj -_INSTRUMENT_SESSION_AUTHZ_REGEX: re.Pattern = re.compile( - r"^[a-zA-Z]{2}(?P\d+)-(?P\d+)$" -) - - def access_blob(instrument_session: str, beamline: str) -> str: - m = _INSTRUMENT_SESSION_AUTHZ_REGEX.match(instrument_session) + m = utils.INSTRUMENT_SESSION_RE.match(instrument_session) if m is None: raise ValueError( "Unable to extract proposal and visit from " From 27c605c085d09ff7b434a810a752f7b2f322c195 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:04:21 +0100 Subject: [PATCH 44/56] remove task access check --- src/blueapi/service/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index b5c1189de..e9a148af5 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -514,7 +514,6 @@ def set_state( response: Response, fedid: Fedid, opa: Annotated[OpaUserClient, Depends(opa)], - # _: Annotated[None, Depends(access_task_permission)], runner: Annotated[WorkerDispatcher, Depends(_runner)], ) -> WorkerState: """ From 8f2ede01c3dd96b4fb8f257a1e3a1573509b464c Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:06:25 +0100 Subject: [PATCH 45/56] Add match to raises check --- tests/unit_tests/service/test_authorization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_authorization.py b/tests/unit_tests/service/test_authorization.py index 65f5c44a6..a2e602f21 100644 --- a/tests/unit_tests/service/test_authorization.py +++ b/tests/unit_tests/service/test_authorization.py @@ -155,7 +155,7 @@ async def test_opa_require_submit_task_invalid_session( ): client = OpaClient(instrument="p45", config=opa_config) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Invalid instrument session"): await client.require_submit_task( instrument_session="not a session", token="foo_bar" ) From ea3d734fbf402a9c71e35688150933deec4312e0 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:12:30 +0100 Subject: [PATCH 46/56] Add exception detail --- src/blueapi/service/authorization.py | 8 ++++++-- src/blueapi/service/main.py | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/blueapi/service/authorization.py b/src/blueapi/service/authorization.py index e54d0daa9..f9008138a 100644 --- a/src/blueapi/service/authorization.py +++ b/src/blueapi/service/authorization.py @@ -72,7 +72,9 @@ async def require_submit_task(self, instrument_session: str, token: str): "visit": int(match["visit"]), }, ): - raise HTTPException(status_code=HTTP_403_FORBIDDEN) + raise HTTPException( + status_code=HTTP_403_FORBIDDEN, detail="Not authorized to submit task" + ) async def is_admin(self, token: str) -> bool: return await self._call_opa(self._config.admin_check, {"token": token}) @@ -117,7 +119,9 @@ async def opa( if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)): if not token: - raise HTTPException(status_code=HTTP_401_UNAUTHORIZED) + raise HTTPException( + status_code=HTTP_401_UNAUTHORIZED, detail="Authentication missing" + ) return OpaUserClient(opa, token) return None diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index e9a148af5..95117e014 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -548,7 +548,10 @@ def set_state( and active and active.task.metadata.get("user") != fedid ): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to set worker state", + ) if new_state == WorkerState.PAUSED: runner.run(interface.pause_worker, state_change_request.defer) From 7393985b6f267045c9d2e918fceff6535af29427 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:15:15 +0100 Subject: [PATCH 47/56] Let admin see all tasks --- src/blueapi/service/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 95117e014..2fc83d8f5 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -364,6 +364,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: def get_tasks( fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], + opa: Annotated[OpaUserClient, Depends(opa)], task_status: str | SkipJsonSchema[None] = None, ) -> TasksListResponse: """ @@ -384,7 +385,8 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] + if opa and not opa.admin(): + tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) From ecd6fb3b229fda2ee2f7e28c987cc864032f60b9 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 5 Jun 2026 17:15:55 +0100 Subject: [PATCH 48/56] Start of api authz tests --- tests/unit_tests/service/test_rest_api.py | 56 ++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 79258fc7b..e903e74a5 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -14,9 +14,15 @@ from pydantic_core import InitErrorDetails from super_state_machine.errors import TransitionError -from blueapi.config import ApplicationConfig, CORSConfig, OIDCConfig, RestConfig +from blueapi.config import ( + ApplicationConfig, + CORSConfig, + OIDCConfig, + RestConfig, +) from blueapi.core.bluesky_types import Plan from blueapi.service import main +from blueapi.service.authorization import OpaUserClient, opa from blueapi.service.interface import ( cancel_active_task, get_device, @@ -54,6 +60,11 @@ def mock_runner() -> Mock: return Mock(spec=WorkerDispatcher) +@pytest.fixture +def mock_opa_client() -> Mock: + return Mock(spec=OpaUserClient) + + @pytest.fixture def client(mock_runner: Mock) -> Iterator[TestClient]: with patch("blueapi.service.interface.worker"): @@ -79,6 +90,27 @@ def client_with_auth( main.teardown_runner() +@pytest.fixture +def access_token(valid_token_with_jwt: dict[str, Any]) -> str: + return valid_token_with_jwt["access_token"] + + +@pytest.fixture +def client_with_opa( + mock_runner: Mock, + oidc_config: OIDCConfig, + mock_opa_client: Mock, + mock_authn_server, +): + with patch("blueapi.service.interface.worker"): + main.setup_runner(runner=mock_runner) + app = main.get_app(ApplicationConfig(oidc=oidc_config)) + app.dependency_overrides[opa] = lambda: mock_opa_client + client = TestClient(app) + yield client + main.teardown_runner() + + @pytest.fixture def rest_config_with_cors() -> RestConfig: cors_config = CORSConfig( @@ -416,6 +448,28 @@ def test_get_tasks_by_status_invalid(client: TestClient) -> None: assert response.status_code == status.HTTP_400_BAD_REQUEST +def test_get_tasks_filters_by_user( + mock_runner: Mock, + client_with_opa: TestClient, + access_token: str, + mock_opa_client: Mock, +): + + print("Start of test") + mock_runner.run.return_value = [ + TrackableTask(task_id="foo", task=Task(name="f1", metadata={"user": "jd1"})), + TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), + ] + print(f"in test: {mock_opa_client=}") + mock_opa_client.admin.return_value = False + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + tasks = client_with_opa.get("/tasks").json().get("tasks") + print(tasks) + + assert len(tasks) == 1 + assert tasks[0]["task_id"] == "foo" + + def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: task_id = str(uuid.uuid4()) mock_runner.run.return_value = task_id From 7fe63039205c8723d29b8d5698db36c2e13ab3fa Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 14:30:20 +0100 Subject: [PATCH 49/56] Make get_tasks async to access authz check --- src/blueapi/service/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 2fc83d8f5..a429b7d61 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -361,7 +361,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @secure_router_v1.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK]) @start_as_current_span(TRACER) -def get_tasks( +async def get_tasks( fedid: Fedid, runner: Annotated[WorkerDispatcher, Depends(_runner)], opa: Annotated[OpaUserClient, Depends(opa)], @@ -385,7 +385,7 @@ def get_tasks( else: tasks = runner.run(interface.get_tasks) - if opa and not opa.admin(): + if opa and not await opa.admin(): tasks = [t for t in tasks if t.task.metadata.get("user") == fedid] return TasksListResponse(tasks=tasks) From 2486063fe3d25c0fcf6e9a7d3d0678a378c03e44 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 14:45:41 +0100 Subject: [PATCH 50/56] Parametrise filter test to check with and without admin --- tests/unit_tests/service/test_rest_api.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index e903e74a5..548ac3078 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -448,11 +448,14 @@ def test_get_tasks_by_status_invalid(client: TestClient) -> None: assert response.status_code == status.HTTP_400_BAD_REQUEST +@pytest.mark.parametrize("admin,task_ids", [(True, ["foo", "bar"]), (False, ["foo"])]) def test_get_tasks_filters_by_user( mock_runner: Mock, client_with_opa: TestClient, access_token: str, mock_opa_client: Mock, + admin: bool, + task_ids: list[str], ): print("Start of test") @@ -461,13 +464,12 @@ def test_get_tasks_filters_by_user( TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), ] print(f"in test: {mock_opa_client=}") - mock_opa_client.admin.return_value = False + mock_opa_client.admin.return_value = admin client_with_opa.headers["Authorization"] = f"Bearer {access_token}" tasks = client_with_opa.get("/tasks").json().get("tasks") print(tasks) - assert len(tasks) == 1 - assert tasks[0]["task_id"] == "foo" + assert [t["task_id"] for t in tasks] == task_ids def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: From af1571691a391fb315007dba72ebf7c84d679ab2 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 15:17:11 +0100 Subject: [PATCH 51/56] Add test for deleting tasks --- src/blueapi/service/main.py | 12 ++++++++---- tests/unit_tests/service/test_rest_api.py | 24 ++++++++++++++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index a429b7d61..21ab4bf56 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -154,7 +154,7 @@ def get_app(config: ApplicationConfig): return app -def access_task_permission( +async def access_task_permission( opa: Annotated[OpaUserClient | None, Depends(opa)], task_id: str, fedid: Fedid, @@ -162,12 +162,16 @@ def access_task_permission( ): task = runner.run(interface.get_task_by_id, task_id) - if opa and not opa.admin() and (task and fedid != task.task.metadata.get("user")): + if ( + opa + and not await opa.admin() + and (task and fedid != task.task.metadata.get("user")) + ): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # start_task_permission is used when there is WorkerTask -def start_task_permission( +async def start_task_permission( task: WorkerTask, opa: Annotated[OpaUserClient, Depends(opa)], fedid: Fedid, @@ -178,7 +182,7 @@ def start_task_permission( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="No task id provided", ) - access_task_permission(opa, task.task_id, fedid, runner) + await access_task_permission(opa, task.task_id, fedid, runner) async def on_key_error_404(_: Request, __: Exception): diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 548ac3078..bb61d0a57 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -21,7 +21,7 @@ RestConfig, ) from blueapi.core.bluesky_types import Plan -from blueapi.service import main +from blueapi.service import interface, main from blueapi.service.authorization import OpaUserClient, opa from blueapi.service.interface import ( cancel_active_task, @@ -479,6 +479,28 @@ def test_delete_submitted_task(mock_runner: Mock, client: TestClient) -> None: assert response.json() == {"task_id": f"{task_id}"} +def test_cant_delete_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + access_token: str, + mock_opa_client: Mock, +): + mock_opa_client.admin.return_value = False + mock_runner.run.side_effect = lambda mth, *args: { + interface.get_task_by_id: TrackableTask( + task_id="bar", task=Task(name="t2", metadata={"user": "jd2"}) + ), + }[mth] + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + resp = client_with_opa.delete("/tasks/bar") + + # 404 to obfuscate whether task exists when inaccessible + assert resp.status_code == 404 + + mock_runner.run.assert_called_once() + + def test_set_active_task(client: TestClient) -> None: task_id = str(uuid.uuid4()) task = WorkerTask(task_id=task_id) From 12709ff45c8e43b573a95cd6128fde488fa036c4 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Mon, 8 Jun 2026 16:42:20 +0100 Subject: [PATCH 52/56] Add test for submit without permission --- tests/unit_tests/service/test_rest_api.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index bb61d0a57..6495c0c35 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -7,7 +7,7 @@ import jwt import pytest from bluesky.protocols import Stoppable -from fastapi import status +from fastapi import HTTPException, status from fastapi.testclient import TestClient from httpx import Headers from pydantic import BaseModel, ValidationError @@ -287,6 +287,23 @@ def test_create_task(mock_runner: Mock, client: TestClient) -> None: assert response.json() == {"task_id": task_id} +def test_submit_task_requires_permission( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, +): + task = TaskRequest(name="sleep", params={"time": 2}, instrument_session="cm12345-2") + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + mock_opa_client.can_submit_task.side_effect = HTTPException(status_code=403) + mock_runner.run.side_effect = RuntimeError("Task should not be submitted") + + resp = client_with_opa.post("/tasks", json=task.model_dump()) + + assert resp.status_code == 403 + mock_runner.run.assert_not_called() + + def test_create_task_inserts_auth_metadata( mock_runner: Mock, client_with_auth: TestClient, From 8585f4db63bca2bc11f548095301d04b096bab5a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 09:27:58 +0100 Subject: [PATCH 53/56] Test setting other user's task active --- tests/unit_tests/service/test_rest_api.py | 29 +++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 6495c0c35..d63720f78 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -566,6 +566,35 @@ def test_set_active_task_worker_already_running( assert response.json() == {"detail": "Worker already active"} +@pytest.mark.parametrize("admin,status", [(True, 200), (False, 404)]) +def test_set_other_users_task_active( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + + task_id = "foo" + task = WorkerTask(task_id=task_id) + mock_opa_client.admin.return_value = admin + + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + mock_runner.run.side_effect = lambda mth, *a, **kw: { + interface.get_task_by_id: TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ), + interface.get_active_task: None, + interface.begin_task: None, + }[mth] + + resp = client_with_opa.put("/worker/task", json=task.model_dump()) + + assert resp.status_code == status + + def test_get_task(mock_runner: Mock, client: TestClient): task_id = str(uuid.uuid4()) task = TrackableTask( From ce1b652e31f6495e1baf03d2303e834a2de5834a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 09:44:14 +0100 Subject: [PATCH 54/56] Test getting other users task --- tests/unit_tests/service/test_rest_api.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index d63720f78..3c78dc5ab 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -627,6 +627,25 @@ def test_get_task(mock_runner: Mock, client: TestClient): } +@pytest.mark.parametrize("admin,status", [(True, 200), (False, 404)]) +def test_get_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + mock_runner.run.return_value = TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ) + mock_opa_client.admin.return_value = admin + + resp = client_with_opa.get("/tasks/foo") + assert resp.status_code == status + + def test_get_all_tasks(mock_runner: Mock, client: TestClient): task_id = str(uuid.uuid4()) tasks = [ From 082be79a1f968fccd9002509f9b25460eaa0a2e7 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 9 Jun 2026 11:05:06 +0100 Subject: [PATCH 55/56] Add tests for set state --- src/blueapi/service/main.py | 4 ++-- tests/unit_tests/service/test_rest_api.py | 29 +++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 21ab4bf56..d35ffa38b 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -515,7 +515,7 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt tags=[Tag.TASK], ) @start_as_current_span(TRACER, "state_change_request.new_state") -def set_state( +async def set_state( state_change_request: StateChangeRequest, response: Response, fedid: Fedid, @@ -550,7 +550,7 @@ def set_state( if ( opa - and not opa.admin() + and not await opa.admin() and active and active.task.metadata.get("user") != fedid ): diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 3c78dc5ab..81ab6a97e 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -833,6 +833,35 @@ def test_set_state_invalid_transition(mock_runner: Mock, client: TestClient): } +@pytest.mark.parametrize("admin,status", [(True, 202), (False, 403)]) +def test_set_state_of_other_users_task( + mock_runner: Mock, + client_with_opa: TestClient, + mock_opa_client: Mock, + access_token: str, + admin: bool, + status: int, +): + + mock_opa_client.admin.return_value = admin + mock_runner.run.side_effect = lambda mth, *a, **kw: { + interface.get_active_task: TrackableTask( + task_id="foo", task=Task(name="bar", metadata={"user": "jd2"}) + ), + interface.get_worker_state: WorkerState.RUNNING, + interface.cancel_active_task: WorkerState.ABORTING, + }[mth] + + client_with_opa.headers["Authorization"] = f"Bearer {access_token}" + + resp = client_with_opa.put( + "/worker/state", + json=StateChangeRequest(new_state=WorkerState.ABORTING).model_dump(), + ) + + assert resp.status_code == status + + def test_get_environment_idle(mock_runner: Mock, client: TestClient) -> None: environment_id = uuid.uuid4() mock_runner.state = EnvironmentResponse( From e2cbd6d09a686400a8d0f9c844ac9ac9333abe61 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Wed, 10 Jun 2026 10:27:17 +0100 Subject: [PATCH 56/56] Remove print debugging --- tests/unit_tests/service/test_rest_api.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 81ab6a97e..8ec1b9b65 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -475,16 +475,13 @@ def test_get_tasks_filters_by_user( task_ids: list[str], ): - print("Start of test") mock_runner.run.return_value = [ TrackableTask(task_id="foo", task=Task(name="f1", metadata={"user": "jd1"})), TrackableTask(task_id="bar", task=Task(name="f2", metadata={"user": "jd2"})), ] - print(f"in test: {mock_opa_client=}") mock_opa_client.admin.return_value = admin client_with_opa.headers["Authorization"] = f"Bearer {access_token}" tasks = client_with_opa.get("/tasks").json().get("tasks") - print(tasks) assert [t["task_id"] for t in tasks] == task_ids