Skip to content
68 changes: 62 additions & 6 deletions src/aignostics/application/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,9 @@ def run_describe(
def run_dump_metadata(
run_id: Annotated[str, typer.Argument(help="Id of the run to dump custom metadata for")],
pretty: Annotated[bool, typer.Option(help="Pretty print JSON output with indentation")] = False,
show_checksum: Annotated[
bool, typer.Option("--show-checksum", help="Include custom_metadata_checksum in output for use with --checksum")
] = False,
) -> None:
"""Dump custom metadata of a run as JSON to stdout."""
logger.trace("Dumping custom metadata for run with ID '{}'", run_id)
Expand All @@ -1006,11 +1009,19 @@ def run_dump_metadata(
run = Service().application_run(run_id).details()
custom_metadata = run.custom_metadata if hasattr(run, "custom_metadata") else {}

if show_checksum:
output: object = {
"custom_metadata": custom_metadata,
"custom_metadata_checksum": run.custom_metadata_checksum,
}
else:
output = custom_metadata

# Output JSON to stdout
if pretty:
print(json.dumps(custom_metadata, indent=2))
print(json.dumps(output, indent=2))
else:
print(json.dumps(custom_metadata))
print(json.dumps(output))

logger.debug("Dumped custom metadata for run with ID '{}'", run_id)
except NotFoundException:
Expand All @@ -1028,6 +1039,9 @@ def run_dump_item_metadata(
run_id: Annotated[str, typer.Argument(help="Id of the run containing the item")],
external_id: Annotated[str, typer.Argument(help="External ID of the item to dump custom metadata for")],
pretty: Annotated[bool, typer.Option(help="Pretty print JSON output with indentation")] = False,
show_checksum: Annotated[
bool, typer.Option("--show-checksum", help="Include custom_metadata_checksum in output for use with --checksum")
] = False,
) -> None:
"""Dump custom metadata of an item as JSON to stdout."""
logger.trace("Dumping custom metadata for item '{}' in run with ID '{}'", external_id, run_id)
Expand All @@ -1052,11 +1066,19 @@ def run_dump_item_metadata(

custom_metadata = item.custom_metadata if hasattr(item, "custom_metadata") else {}

if show_checksum:
item_output: object = {
"custom_metadata": custom_metadata,
"custom_metadata_checksum": item.custom_metadata_checksum,
}
else:
item_output = custom_metadata

# Output JSON to stdout
if pretty:
print(json.dumps(custom_metadata, indent=2))
print(json.dumps(item_output, indent=2))
else:
print(json.dumps(custom_metadata))
print(json.dumps(item_output))

logger.debug("Dumped custom metadata for item '{}' in run with ID '{}'", external_id, run_id)
except NotFoundException:
Expand Down Expand Up @@ -1213,10 +1235,22 @@ def run_update_metadata(
metadata_json: Annotated[
str, typer.Argument(..., help='Custom metadata as JSON string (e.g., \'{"key": "value"}\')')
],
checksum: Annotated[
str | None,
typer.Option(
"--checksum",
help=(
"Optional checksum for optimistic concurrency control. "
"The server rejects the update with HTTP 412 if the metadata was modified since this checksum was read."
Comment thread
akunft marked this conversation as resolved.
),
),
] = None,
) -> None:
"""Update custom metadata for a run."""
import json # noqa: PLC0415

from aignostics.platform import ConcurrencyConflictError # noqa: PLC0415

logger.trace("Updating custom metadata for run with ID '{}'", run_id)

try:
Expand All @@ -1230,13 +1264,17 @@ def run_update_metadata(
console.print(f"[error]Error:[/error] Invalid JSON: {e}")
sys.exit(1)

Service().application_run_update_custom_metadata(run_id, custom_metadata)
Service().application_run_update_custom_metadata(run_id, custom_metadata, custom_metadata_checksum=checksum)
logger.debug("Updated custom metadata for run with ID '{}'.", run_id)
console.print(f"Successfully updated custom metadata for run with ID '{run_id}'.")
except NotFoundException:
logger.warning(f"Run with ID '{run_id}' not found.")
console.print(f"[warning]Warning:[/warning] Run with ID '{run_id}' not found.")
sys.exit(2)
except ConcurrencyConflictError as e:
logger.warning(f"Concurrency conflict updating metadata for run '{run_id}': {e}")
console.print(f"[warning]Warning:[/warning] Metadata was modified by another process. Re-read and retry: {e}")
sys.exit(3)
except ValueError as e:
logger.warning(f"Run ID '{run_id}' invalid or metadata invalid: {e}")
console.print(f"[warning]Warning:[/warning] Run ID '{run_id}' invalid or metadata invalid: {e}")
Expand All @@ -1254,10 +1292,22 @@ def run_update_item_metadata(
metadata_json: Annotated[
str, typer.Argument(..., help='Custom metadata as JSON string (e.g., \'{"key": "value"}\')')
],
checksum: Annotated[
str | None,
typer.Option(
"--checksum",
help=(
"Optional checksum for optimistic concurrency control. "
"The server rejects the update with HTTP 412 if the metadata was modified since this checksum was read."
),
Comment thread
akunft marked this conversation as resolved.
),
] = None,
) -> None:
"""Update custom metadata for an item in a run."""
import json # noqa: PLC0415

from aignostics.platform import ConcurrencyConflictError # noqa: PLC0415

Comment thread
akunft marked this conversation as resolved.
logger.trace("Updating custom metadata for item '{}' in run with ID '{}'", external_id, run_id)

try:
Expand All @@ -1271,13 +1321,19 @@ def run_update_item_metadata(
console.print(f"[error]Error:[/error] Invalid JSON: {e}")
sys.exit(1)

Service().application_run_update_item_custom_metadata(run_id, external_id, custom_metadata)
Service().application_run_update_item_custom_metadata(
run_id, external_id, custom_metadata, custom_metadata_checksum=checksum
)
logger.debug("Updated custom metadata for item '{}' in run with ID '{}'.", external_id, run_id)
console.print(f"Successfully updated custom metadata for item '{external_id}' in run with ID '{run_id}'.")
except NotFoundException:
logger.warning(f"Run with ID '{run_id}' or item '{external_id}' not found.")
console.print(f"[warning]Warning:[/warning] Run with ID '{run_id}' or item '{external_id}' not found.")
sys.exit(2)
except ConcurrencyConflictError as e:
logger.warning("Concurrency conflict updating metadata for item '{}' in run '{}': {}", external_id, run_id, e)
console.print(f"[warning]Warning:[/warning] Metadata was modified by another process. Re-read and retry: {e}")
sys.exit(3)
except ValueError as e:
logger.warning(
"Run ID '{}' or item external ID '{}' invalid or metadata invalid: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
)
from nicegui import run as nicegui_run

from aignostics.platform import ArtifactOutput, ItemOutput, ItemResult, ItemState, Run, RunState
from aignostics.platform import (
ArtifactOutput,
ConcurrencyConflictError,
ItemOutput,
ItemResult,
ItemState,
Run,
RunState,
)
from aignostics.third_party.showinfm.showinfm import show_in_file_manager
from aignostics.utils import GUILocalFilePicker, get_user_data_directory

Expand Down Expand Up @@ -744,9 +752,15 @@ async def handle_metadata_change(e: Any) -> None: # noqa: ANN401
Service.application_run_update_custom_metadata_static,
run_id=run_id,
custom_metadata=new_metadata,
custom_metadata_checksum=run_data.custom_metadata_checksum,
)
ui.notify("Custom metadata updated successfully!", type="positive")
ui.navigate.reload()
except ConcurrencyConflictError:
ui.notify(
"Metadata was modified by another process — reload the page and retry.",
type="warning",
)
except Exception as ex:
ui.notify(f"Failed to update custom metadata: {ex!s}", type="negative")

Expand Down
57 changes: 54 additions & 3 deletions src/aignostics/application/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ApplicationSummary,
ApplicationVersion,
Client,
ConcurrencyConflictError,
ForbiddenException,
InputArtifact,
InputItem,
Expand Down Expand Up @@ -1120,21 +1121,31 @@ def application_run_update_custom_metadata(
self,
run_id: str,
custom_metadata: dict[str, Any],
*,
custom_metadata_checksum: str | None = None,
) -> None:
"""Update custom metadata for an existing application run.

Args:
run_id (str): The ID of the run to update
custom_metadata (dict[str, Any]): The new custom metadata to attach to the run.
custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency
control. When provided, the server returns HTTP 412 (and rejects the update) if
the metadata was modified since the checksum was read. Pass ``None`` to skip
the precondition check.

Raises:
NotFoundException: If the application run with the given ID is not found.
ConcurrencyConflictError: If the checksum precondition failed (HTTP 412).
ValueError: If the run ID is invalid.
RuntimeError: If updating the run metadata fails unexpectedly.
"""
try:
logger.trace("Updating custom metadata for run with ID '{}'", run_id)
self._get_platform_client().run(run_id).update_custom_metadata(custom_metadata)
self._get_platform_client().run(run_id).update_custom_metadata(
custom_metadata,
custom_metadata_checksum=custom_metadata_checksum,
)
logger.trace("Updated custom metadata for run with ID '{}'", run_id)
except ValueError as e:
message = f"Failed to update custom metadata for run with ID '{run_id}': ValueError {e}"
Expand All @@ -1145,6 +1156,13 @@ def application_run_update_custom_metadata(
logger.warning(message)
raise NotFoundException(message) from e
except ApiException as e:
if e.status == HTTPStatus.PRECONDITION_FAILED:
message = (
f"Custom metadata for run '{run_id}' was modified since the checksum was read "
f"(optimistic concurrency conflict): {e!s}."
)
logger.warning(message)
raise ConcurrencyConflictError(message) from e
if e.status == HTTPStatus.UNPROCESSABLE_ENTITY:
message = f"Run ID '{run_id}' invalid: {e!s}."
logger.warning(message)
Expand All @@ -1161,35 +1179,51 @@ def application_run_update_custom_metadata(
def application_run_update_custom_metadata_static(
run_id: str,
custom_metadata: dict[str, Any],
*,
custom_metadata_checksum: str | None = None,
) -> None:
"""Static wrapper for updating custom metadata for an application run.

Args:
run_id (str): The ID of the run to update
custom_metadata (dict[str, Any]): The new custom metadata to attach to the run.
custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency
control. When provided, the server returns HTTP 412 (and rejects the update) if
the metadata was modified since the checksum was read. Pass ``None`` to skip
the precondition check.

Raises:
NotFoundException: If the application run with the given ID is not found.
ConcurrencyConflictError: If the checksum precondition failed (HTTP 412).
ValueError: If the run ID is invalid.
RuntimeError: If updating the run metadata fails unexpectedly.
"""
Service().application_run_update_custom_metadata(run_id, custom_metadata)
Service().application_run_update_custom_metadata(
run_id, custom_metadata, custom_metadata_checksum=custom_metadata_checksum
)

def application_run_update_item_custom_metadata(
self,
run_id: str,
external_id: str,
custom_metadata: dict[str, Any],
*,
custom_metadata_checksum: str | None = None,
) -> None:
"""Update custom metadata for an existing item in an application run.

Args:
run_id (str): The ID of the run containing the item
external_id (str): The external ID of the item to update
custom_metadata (dict[str, Any]): The new custom metadata to attach to the item.
custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency
control. When provided, the server returns HTTP 412 (and rejects the update) if
the metadata was modified since the checksum was read. Pass ``None`` to skip
the precondition check.

Raises:
NotFoundException: If the application run or item with the given IDs is not found.
ConcurrencyConflictError: If the checksum precondition failed (HTTP 412).
ValueError: If the run ID or item external ID is invalid.
RuntimeError: If updating the item metadata fails unexpectedly.
"""
Expand All @@ -1202,6 +1236,7 @@ def application_run_update_item_custom_metadata(
self._get_platform_client().run(run_id).update_item_custom_metadata(
external_id,
custom_metadata,
custom_metadata_checksum=custom_metadata_checksum,
)
logger.trace(
"Updated custom metadata for item '{}' in run with ID '{}'",
Expand All @@ -1219,6 +1254,13 @@ def application_run_update_item_custom_metadata(
logger.warning(message)
raise NotFoundException(message) from e
except ApiException as e:
if e.status == HTTPStatus.PRECONDITION_FAILED:
message = (
f"Custom metadata for item '{external_id}' in run '{run_id}' was modified since "
f"the checksum was read (optimistic concurrency conflict): {e!s}."
)
logger.warning(message)
raise ConcurrencyConflictError(message) from e
if e.status == HTTPStatus.UNPROCESSABLE_ENTITY:
message = f"Run ID '{run_id}' or item external ID '{external_id}' invalid: {e!s}."
logger.warning(message)
Expand All @@ -1236,20 +1278,29 @@ def application_run_update_item_custom_metadata_static(
run_id: str,
external_id: str,
custom_metadata: dict[str, Any],
*,
custom_metadata_checksum: str | None = None,
) -> None:
"""Static wrapper for updating custom metadata for an item in an application run.

Args:
run_id (str): The ID of the run containing the item
external_id (str): The external ID of the item to update
custom_metadata (dict[str, Any]): The new custom metadata to attach to the item.
custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency
control. When provided, the server returns HTTP 412 (and rejects the update) if
the metadata was modified since the checksum was read. Pass ``None`` to skip
the precondition check.

Raises:
NotFoundException: If the application run or item with the given IDs is not found.
ConcurrencyConflictError: If the checksum precondition failed (HTTP 412).
ValueError: If the run ID or item external ID is invalid.
RuntimeError: If updating the item metadata fails unexpectedly.
"""
Service().application_run_update_item_custom_metadata(run_id, external_id, custom_metadata)
Service().application_run_update_item_custom_metadata(
run_id, external_id, custom_metadata, custom_metadata_checksum=custom_metadata_checksum
)

def application_run_cancel(self, run_id: str) -> None:
"""Cancel a run by its ID.
Expand Down
2 changes: 2 additions & 0 deletions src/aignostics/platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
TOKEN_URL_STAGING,
TOKEN_URL_TEST,
)
from ._exceptions import ConcurrencyConflictError
from ._messages import AUTHENTICATION_FAILED, NOT_YET_IMPLEMENTED, UNKNOWN_ENDPOINT_URL
from ._sdk_metadata import (
PipelineConfig,
Expand Down Expand Up @@ -155,6 +156,7 @@
"Artifact",
"ArtifactOutput",
"Client",
"ConcurrencyConflictError",
"Documents",
"ForbiddenException",
"InputArtifact",
Expand Down
10 changes: 10 additions & 0 deletions src/aignostics/platform/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Exceptions of platform module."""


class ConcurrencyConflictError(ValueError):
"""Raised when an optimistic concurrency precondition (HTTP 412) fails.

Subclasses ValueError so existing ``except ValueError`` callers still catch it,
while callers that need to distinguish a conflict from a bad-ID error can use
``except ConcurrencyConflictError``.
"""
Loading
Loading