Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyatlan/client/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@
WorkflowStop,
WorkflowUpdate,
WorkflowUpdateOwner,
WorkflowUserPublishPreflight,
)

__all__ = [
Expand Down Expand Up @@ -337,4 +338,5 @@
"WorkflowStop",
"WorkflowUpdate",
"WorkflowUpdateOwner",
"WorkflowUserPublishPreflight",
]
40 changes: 40 additions & 0 deletions pyatlan/client/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SCHEDULE_QUERY_WORKFLOWS_MISSED,
SCHEDULE_QUERY_WORKFLOWS_SEARCH,
STOP_WORKFLOW_RUN,
USER_PUBLISH_PREFLIGHT,
WORKFLOW_ARCHIVE,
WORKFLOW_CHANGE_OWNER,
WORKFLOW_INDEX_RUN_SEARCH,
Expand Down Expand Up @@ -376,6 +377,45 @@ def process_response(raw_json: Dict) -> WorkflowRunResponse:
return WorkflowParseResponse.parse_response(raw_json, WorkflowRunResponse)


class WorkflowUserPublishPreflight:
"""Shared logic for the user publish preflight check (HYP-829).

Verifies the workflow creator's account is enabled and that they may
publish (ENTITY_CREATE/UPDATE/DELETE) to the target connection. Heracles
runs both checks server-side (including the Keycloak impersonation), so
callers only need their own service-account token — the endpoint is
restricted to service-account callers.
"""

@staticmethod
def prepare_request(user_id: str, connection_qualified_name: str = "") -> tuple:
"""
Prepare the request for the user publish preflight check.

:param user_id: Keycloak user GUID of the workflow creator
:param connection_qualified_name: qualifiedName of the target
connection (e.g. ``default/snowflake/1234567890``); may be empty,
in which case only the user-enabled check runs
:returns: tuple of (endpoint, request_obj)
"""
request = {
"user_id": user_id,
"connection_qualified_name": connection_qualified_name,
}
return USER_PUBLISH_PREFLIGHT, request

@staticmethod
def process_response(raw_json: Dict) -> Dict:
"""
Process the preflight response.

:param raw_json: raw response from the API
:returns: dict with ``passed`` (bool), ``failed_checks``
(list of str or None) and ``message`` (str)
"""
return raw_json or {}


class WorkflowDelete:
"""Shared logic for deleting workflows."""

Expand Down
11 changes: 11 additions & 0 deletions pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,17 @@
WORKFLOW_OWNER_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)

# user publish preflight (HYP-829) — verifies the workflow creator's account is
# enabled and that they may publish (ENTITY_CREATE/UPDATE/DELETE) to the target
# connection; restricted to service-account callers on the Heracles side
USER_PUBLISH_PREFLIGHT_API = "workflows/preflight/user-publish-check"
USER_PUBLISH_PREFLIGHT = API(
USER_PUBLISH_PREFLIGHT_API,
HTTPMethod.POST,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_RUN = API(
WORKFLOW_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
Expand Down
29 changes: 29 additions & 0 deletions pyatlan_v9/client/aio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
WorkflowGetScheduledRun,
WorkflowStop,
WorkflowUpdateOwner,
WorkflowUserPublishPreflight,
)
from pyatlan.client.constants import (
PACKAGE_WORKFLOW_RERUN,
Expand Down Expand Up @@ -596,6 +597,34 @@ async def stop(
raw_json = await self._client._call_api(endpoint, request_obj=None)
return msgspec.convert(raw_json, WorkflowRunResponse, strict=False)

async def user_publish_preflight(
self,
user_id: str,
connection_qualified_name: str = "",
) -> dict:
"""
Run the user publish preflight check (HYP-829) for a workflow creator.

Verifies the user's account is enabled and that they may publish
(ENTITY_CREATE/UPDATE/DELETE) to the target connection. All checks run
server-side in Heracles (including the Keycloak impersonation), so the
caller only needs its own service-account token — the endpoint is
restricted to service-account callers.

:param user_id: Keycloak user GUID of the workflow creator
:param connection_qualified_name: qualifiedName of the target
connection (e.g. ``default/snowflake/1234567890``); may be empty,
in which case only the user-enabled check runs
:returns: dict with ``passed`` (bool), ``failed_checks``
(list of str or None) and ``message`` (str)
:raises AtlanError: on any API communication issue
"""
endpoint, request = WorkflowUserPublishPreflight.prepare_request(
user_id, connection_qualified_name
)
raw_json = await self._client._call_api(endpoint, request_obj=request)
return WorkflowUserPublishPreflight.process_response(raw_json)

@validate_arguments
async def delete(
self,
Expand Down
29 changes: 29 additions & 0 deletions pyatlan_v9/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
WorkflowGetScheduledRun,
WorkflowStop,
WorkflowUpdateOwner,
WorkflowUserPublishPreflight,
)
from pyatlan.client.constants import (
PACKAGE_WORKFLOW_RERUN,
Expand Down Expand Up @@ -585,6 +586,34 @@ def stop(
raw_json = self._client._call_api(endpoint, request_obj=None)
return msgspec.convert(raw_json, WorkflowRunResponse, strict=False)

def user_publish_preflight(
self,
user_id: str,
connection_qualified_name: str = "",
) -> dict:
"""
Run the user publish preflight check (HYP-829) for a workflow creator.

Verifies the user's account is enabled and that they may publish
(ENTITY_CREATE/UPDATE/DELETE) to the target connection. All checks run
server-side in Heracles (including the Keycloak impersonation), so the
caller only needs its own service-account token — the endpoint is
restricted to service-account callers.

:param user_id: Keycloak user GUID of the workflow creator
:param connection_qualified_name: qualifiedName of the target
connection (e.g. ``default/snowflake/1234567890``); may be empty,
in which case only the user-enabled check runs
:returns: dict with ``passed`` (bool), ``failed_checks``
(list of str or None) and ``message`` (str)
:raises AtlanError: on any API communication issue
"""
endpoint, request = WorkflowUserPublishPreflight.prepare_request(
user_id, connection_qualified_name
)
raw_json = self._client._call_api(endpoint, request_obj=request)
return WorkflowUserPublishPreflight.process_response(raw_json)

@validate_arguments
def delete(
self,
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,36 @@ def test_remove_schedule_with_role_cache_api_token_user(
mock_api_caller._call_api.assert_called_once()
assert response == workflow_response
mock_api_caller.reset_mock()


def test_user_publish_preflight_prepare_request():
from pyatlan.client.common import WorkflowUserPublishPreflight
from pyatlan.client.constants import USER_PUBLISH_PREFLIGHT

endpoint, request = WorkflowUserPublishPreflight.prepare_request(
"8c1e2d8a-1564-4e95-8b14-ea14522fcf61", "default/snowflake/1234567890"
)
assert endpoint is USER_PUBLISH_PREFLIGHT
assert request == {
"user_id": "8c1e2d8a-1564-4e95-8b14-ea14522fcf61",
"connection_qualified_name": "default/snowflake/1234567890",
}


def test_user_publish_preflight_prepare_request_without_connection():
from pyatlan.client.common import WorkflowUserPublishPreflight

_, request = WorkflowUserPublishPreflight.prepare_request("user-123")
assert request == {"user_id": "user-123", "connection_qualified_name": ""}


def test_user_publish_preflight_process_response():
from pyatlan.client.common import WorkflowUserPublishPreflight

verdict = {
"passed": False,
"failed_checks": ["UserEnabled"],
"message": "User user-123 failed publish preflight checks: UserEnabled",
}
assert WorkflowUserPublishPreflight.process_response(verdict) == verdict
assert WorkflowUserPublishPreflight.process_response(None) == {}
Loading