Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
affedf8
Validate tiled service account configuration at startup
tpoliaw May 21, 2026
0482d81
Add tests for tiled check
tpoliaw May 28, 2026
170e3d1
Add opa dependency function to create OpaUserClient
tpoliaw May 26, 2026
b3254ea
test opa dependency function
tpoliaw May 28, 2026
fcad0af
Add can_submit_task auth check method and config
tpoliaw May 26, 2026
171de82
feat: add authz dependency injection
shree-iyengar-dls May 15, 2026
ddfeab0
feat: add auth check dependency injections to task endpoints
shree-iyengar-dls May 18, 2026
37524eb
feat: create new access task permission fns and add as dependencies
shree-iyengar-dls May 20, 2026
13093f6
refactor: update rest api version
shree-iyengar-dls May 20, 2026
d7d57ae
comment out dependency addition in set_state
shree-iyengar-dls May 20, 2026
4ef3365
refactor: add admin check and check to set state function
May 20, 2026
659cf58
Update dependency names
tpoliaw May 26, 2026
a669903
Add missing admin check
tpoliaw May 26, 2026
d1cb939
Handle missing opa and fix tests
tpoliaw May 26, 2026
b889e20
Remove old admin method
tpoliaw May 26, 2026
3eb315a
Use starlette statuses directly
tpoliaw May 28, 2026
086953f
test task submission authz
tpoliaw May 28, 2026
2d3a564
Use _config instead of _conf
tpoliaw Jun 5, 2026
83ae2a6
Re-use instrument session regex
tpoliaw Jun 5, 2026
029bc89
remove task access check
tpoliaw Jun 5, 2026
4e12a23
Add match to raises check
tpoliaw Jun 5, 2026
850682b
Add exception detail
tpoliaw Jun 5, 2026
2f1d3eb
Let admin see all tasks
tpoliaw Jun 5, 2026
e720bc4
Start of api authz tests
tpoliaw Jun 5, 2026
d73d01a
Make get_tasks async to access authz check
tpoliaw Jun 8, 2026
d1b7302
Parametrise filter test to check with and without admin
tpoliaw Jun 8, 2026
02eba88
Add test for deleting tasks
tpoliaw Jun 8, 2026
b34600a
Add test for submit without permission
tpoliaw Jun 8, 2026
6967b16
Test setting other user's task active
tpoliaw Jun 9, 2026
49b1b97
Test getting other users task
tpoliaw Jun 9, 2026
0139c0b
Add tests for set state
tpoliaw Jun 9, 2026
55c9886
Remove print debugging
tpoliaw Jun 10, 2026
c9fd7d2
Merge remote-tracking branch 'origin/main' into add_system_test
ZohebShaikh Jun 26, 2026
c3fa597
refactor: update rest api version
shree-iyengar-dls May 20, 2026
8c6c481
comment out dependency addition in set_state
shree-iyengar-dls May 20, 2026
9392e18
refactor: add admin check and check to set state function
May 20, 2026
25afaad
Update dependency names
tpoliaw May 26, 2026
5b8ed01
Add missing admin check
tpoliaw May 26, 2026
d795c5f
Handle missing opa and fix tests
tpoliaw May 26, 2026
5b79bb9
Remove old admin method
tpoliaw May 26, 2026
ba66305
Use starlette statuses directly
tpoliaw May 28, 2026
d98149d
test task submission authz
tpoliaw May 28, 2026
97ee31a
Use _config instead of _conf
tpoliaw Jun 5, 2026
9f9335f
Re-use instrument session regex
tpoliaw Jun 5, 2026
27c605c
remove task access check
tpoliaw Jun 5, 2026
8f2ede0
Add match to raises check
tpoliaw Jun 5, 2026
ea3d734
Add exception detail
tpoliaw Jun 5, 2026
7393985
Let admin see all tasks
tpoliaw Jun 5, 2026
ecd6fb3
Start of api authz tests
tpoliaw Jun 5, 2026
7fe6303
Make get_tasks async to access authz check
tpoliaw Jun 8, 2026
2486063
Parametrise filter test to check with and without admin
tpoliaw Jun 8, 2026
af15716
Add test for deleting tasks
tpoliaw Jun 8, 2026
12709ff
Add test for submit without permission
tpoliaw Jun 8, 2026
8585f4d
Test setting other user's task active
tpoliaw Jun 9, 2026
ce1b652
Test getting other users task
tpoliaw Jun 9, 2026
082be79
Add tests for set state
tpoliaw Jun 9, 2026
e2cbd6d
Remove print debugging
tpoliaw Jun 10, 2026
2155ea1
Merge branch 'add_depedency_injection' into add_system_test
ZohebShaikh Jun 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion helm/blueapi/config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,16 @@
"submit_task_check": {
"title": "Submit Task Check",
"type": "string"
},
"admin_check": {
"title": "Admin Check",
"type": "string"
}
},
"required": [
"tiled_service_account_check",
"submit_task_check"
"submit_task_check",
"admin_check"
],
"title": "OpaConfig",
"type": "object",
Expand Down
7 changes: 6 additions & 1 deletion helm/blueapi/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,14 @@
"type": "object",
"required": [
"tiled_service_account_check",
"submit_task_check"
"submit_task_check",
"admin_check"
],
"properties": {
"admin_check": {
"title": "Admin Check",
"type": "string"
},
"audience": {
"title": "Audience",
"default": "account",
Expand Down
3 changes: 2 additions & 1 deletion src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class OpaConfig(BlueapiBaseModel):
audience: str = "account"
tiled_service_account_check: str
submit_task_check: str
admin_check: str


class ApplicationConfig(BlueapiBaseModel):
Expand All @@ -323,7 +324,7 @@ class ApplicationConfig(BlueapiBaseModel):
"""

#: API version to publish in OpenAPI schema
REST_API_VERSION: ClassVar[str] = "1.4.0"
REST_API_VERSION: ClassVar[str] = "1.4.1"

LICENSE_INFO: ClassVar[dict[str, str]] = {
"name": "Apache 2.0",
Expand Down
27 changes: 19 additions & 8 deletions src/blueapi/service/authorization.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import logging
import re
from collections.abc import Mapping
from contextlib import AbstractAsyncContextManager, aclosing, nullcontext
from typing import Annotated, Any, Self, cast

from aiohttp import ClientSession
from fastapi import Depends, HTTPException, Request
from starlette import status
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN

from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount
from blueapi.service.authentication import TiledAuth, unchecked_bearer_token
from blueapi.service.model import TaskRequest
from blueapi.utils import INSTRUMENT_SESSION_RE

LOGGER = logging.getLogger(__name__)
INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P<proposal>\d+)-(?P<visit>\d+)$")


class OpaClient:
Expand Down Expand Up @@ -66,14 +65,19 @@ async def require_submit_task(self, instrument_session: str, token: str):
raise ValueError("Invalid instrument session")

if not await self._call_opa(
self._conf.submit_task_check,
self._config.submit_task_check,
{
"token": token,
"proposal": int(match["proposal"]),
"visit": int(match["visit"]),
},
):
raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED)
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authorized to submit task"
)

async def is_admin(self, token: str) -> bool:
return await self._call_opa(self._config.admin_check, {"token": token})


class OpaUserClient:
Expand All @@ -88,6 +92,9 @@ async def can_submit_task(self, task: TaskRequest):
LOGGER.info("Checking permissions to run task")
await self.client.require_submit_task(task.instrument_session, self.token)

async def admin(self) -> bool:
return await self.client.is_admin(self.token)


async def validate_tiled_config(
tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None
Expand All @@ -112,12 +119,16 @@ async def opa(

if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)):
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Authentication missing"
)
return OpaUserClient(opa, token)
return None


async def submit_permission(
opa: Annotated[OpaUserClient, Depends(opa)],
opa: Annotated[OpaUserClient | None, Depends(opa)],
task_request: TaskRequest,
):
await opa.can_submit_task(task_request)
if opa:
await opa.can_submit_task(task_request)
76 changes: 42 additions & 34 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@
from blueapi.worker import TrackableTask, WorkerState
from blueapi.worker.event import TaskStatusEnum

from .authorization import OpaClient, submit_permission, validate_tiled_config
from .authorization import (
OpaClient,
OpaUserClient,
opa,
submit_permission,
validate_tiled_config,
)
from .model import (
DeviceModel,
DeviceResponse,
Expand Down Expand Up @@ -148,39 +154,35 @@ def get_app(config: ApplicationConfig):
return app


def access_task_permission(
request: Request,
async def access_task_permission(
opa: Annotated[OpaUserClient | None, Depends(opa)],
task_id: str,
fedid: Fedid,
runner: Annotated[WorkerDispatcher, Depends(_runner)],
):
access_token: dict[str, Any] | None = getattr(
request.state, "decoded_access_token", None
)
try:
task = runner.run(interface.get_task_by_id, task_id)
except KeyError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from None
task = runner.run(interface.get_task_by_id, task_id)

if (
access_token
and task
and access_token.get("fedid") != task.task.metadata.get("user")
opa
and not await opa.admin()
and (task and fedid != task.task.metadata.get("user"))
):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)


# start_task_permission is used when there is WorkerTask
def start_task_permission(
request: Request,
async def start_task_permission(
task: WorkerTask,
opa: Annotated[OpaUserClient, Depends(opa)],
fedid: Fedid,
runner: Annotated[WorkerDispatcher, Depends(_runner)],
):
if not task.task_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="No task id provided",
)
access_task_permission(request, task.task_id, runner)
await access_task_permission(opa, task.task_id, fedid, runner)


async def on_key_error_404(_: Request, __: Exception):
Expand Down Expand Up @@ -310,12 +312,11 @@ def submit_task(
task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])],
_: Annotated[None, Depends(submit_permission)],
runner: Annotated[WorkerDispatcher, Depends(_runner)],
user: Fedid,
fedid: Fedid,
) -> TaskResponse:
"""Submit a task to the worker."""
try:
user = user or "Unknown"
task_id: str = runner.run(interface.submit_task, task_request, {"user": user})
task_id: str = runner.run(interface.submit_task, task_request, {"user": fedid})
response.headers["Location"] = f"{request.url}/{task_id}"
return TaskResponse(task_id=task_id)
except ValidationError as e:
Expand Down Expand Up @@ -364,9 +365,10 @@ def validate_task_status(v: str) -> TaskStatusEnum:
@secure_router_v1.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK])
@secure_router.get("/tasks", status_code=status.HTTP_200_OK, tags=[Tag.TASK])
@start_as_current_span(TRACER)
def get_tasks(
request: Request,
async def get_tasks(
fedid: Fedid,
runner: Annotated[WorkerDispatcher, Depends(_runner)],
opa: Annotated[OpaUserClient, Depends(opa)],
task_status: str | SkipJsonSchema[None] = None,
) -> TasksListResponse:
"""
Expand All @@ -387,13 +389,8 @@ def get_tasks(
else:
tasks = runner.run(interface.get_tasks)

access_token: dict[str, Any] | None = getattr(
request.state, "decoded_access_token", None
)
user = access_token.get("fedid") if access_token else None

if user:
tasks = [t for t in tasks if t.task.metadata.get("user") == user]
if opa and not await opa.admin():
tasks = [t for t in tasks if t.task.metadata.get("user") == fedid]

return TasksListResponse(tasks=tasks)

Expand Down Expand Up @@ -518,10 +515,11 @@ def get_state(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> WorkerSt
tags=[Tag.TASK],
)
@start_as_current_span(TRACER, "state_change_request.new_state")
def set_state(
async def set_state(
state_change_request: StateChangeRequest,
response: Response,
_: Annotated[None, Depends(access_task_permission)],
fedid: Fedid,
opa: Annotated[OpaUserClient, Depends(opa)],
runner: Annotated[WorkerDispatcher, Depends(_runner)],
) -> WorkerState:
"""
Expand All @@ -548,14 +546,24 @@ def set_state(
current_state in _ALLOWED_TRANSITIONS
and new_state in _ALLOWED_TRANSITIONS[current_state]
):
active = runner.run(interface.get_active_task)

if (
opa
and not await opa.admin()
and active
and active.task.metadata.get("user") != fedid
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to set worker state",
)

if new_state == WorkerState.PAUSED:
runner.run(interface.pause_worker, state_change_request.defer)
elif new_state == WorkerState.RUNNING:
runner.run(interface.resume_worker)
elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}:
# active = runner.run(interface.get_active_task)
# if active.task.metadata.get("user"):

try:
runner.run(
interface.cancel_active_task,
Expand Down
3 changes: 3 additions & 0 deletions src/blueapi/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from collections.abc import Callable, Mapping
from functools import wraps
from logging import Logger
Expand Down Expand Up @@ -30,6 +31,8 @@
Args = ParamSpec("Args")
Return = TypeVar("Return")

INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P<proposal>\d+)-(?P<visit>\d+)$")


def report_successful_devices(
devices: Mapping[str, Any], sim_backend: bool, logger: Logger
Expand Down
10 changes: 3 additions & 7 deletions src/blueapi/utils/serialization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import re
from typing import Any

from pydantic import BaseModel

from blueapi import utils


def serialize(obj: Any) -> Any:
"""
Expand All @@ -28,13 +29,8 @@ def serialize(obj: Any) -> Any:
return obj


_INSTRUMENT_SESSION_AUTHZ_REGEX: re.Pattern = re.compile(
r"^[a-zA-Z]{2}(?P<proposal>\d+)-(?P<visit>\d+)$"
)


def access_blob(instrument_session: str, beamline: str) -> str:
m = _INSTRUMENT_SESSION_AUTHZ_REGEX.match(instrument_session)
m = utils.INSTRUMENT_SESSION_RE.match(instrument_session)
if m is None:
raise ValueError(
"Unable to extract proposal and visit from "
Expand Down
Loading
Loading