Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@
CatalogExportTemplate,
CatalogExportTemplateAttributes,
)
from gooddata_sdk.catalog.organization.entity_model.ip_allowlist_policy import (
CatalogIpAllowlistPolicy,
CatalogIpAllowlistPolicyAttributes,
)
from gooddata_sdk.catalog.organization.entity_model.jwk import (
CatalogJwk,
CatalogJwkAttributes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

from typing import Any

import attrs


@attrs.define(kw_only=True)
class CatalogIpAllowlistPolicyAttributes:
"""Attributes of an IP allowlist policy."""

allowed_sources: list[str] = attrs.field(factory=list)

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogIpAllowlistPolicyAttributes:
raw = entity.get("allowedSources")
return cls(
allowed_sources=list(raw) if raw is not None else [],
)

def to_api(self) -> dict[str, Any]:
return {"allowedSources": self.allowed_sources}


@attrs.define(kw_only=True)
class CatalogIpAllowlistPolicy:
"""Represents an IP allowlist policy entity."""

id: str
attributes: CatalogIpAllowlistPolicyAttributes | None = None

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogIpAllowlistPolicy:
"""Parse from a JSON:API entity data dict (the ``data`` key of a response).

Args:
entity: The ``data`` portion of a JSON:API document, e.g.
``{"id": "...", "type": "ipAllowlistPolicy", "attributes": {...}}``.

Returns:
CatalogIpAllowlistPolicy: The parsed entity.
"""
raw_attrs = entity.get("attributes")
return cls(
id=entity["id"],
attributes=CatalogIpAllowlistPolicyAttributes.from_api(raw_attrs) if raw_attrs is not None else None,
)

def to_api(self) -> dict[str, Any]:
"""Serialise to a JSON:API request document.

Returns:
dict: A JSON:API document suitable for POST/PUT request bodies.
"""
attrs_dict: dict[str, Any] = {}
if self.attributes is not None:
attrs_dict = self.attributes.to_api()
return {
"data": {
"id": self.id,
"type": "ipAllowlistPolicy",
"attributes": attrs_dict,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

from gooddata_sdk import CatalogDeclarativeExportTemplate, CatalogExportTemplate
from gooddata_sdk.catalog.catalog_service_base import CatalogServiceBase
from gooddata_sdk.catalog.identifier import CatalogAssigneeIdentifier
from gooddata_sdk.catalog.organization.entity_model.directive import CatalogCspDirective
from gooddata_sdk.catalog.organization.entity_model.identity_provider import CatalogIdentityProvider
from gooddata_sdk.catalog.organization.entity_model.ip_allowlist_policy import CatalogIpAllowlistPolicy
from gooddata_sdk.catalog.organization.entity_model.jwk import CatalogJwk, CatalogJwkDocument
from gooddata_sdk.catalog.organization.entity_model.llm_provider import (
CatalogLlmProvider,
Expand Down Expand Up @@ -628,6 +630,121 @@ def delete_llm_provider(self, id: str) -> None:
"""
self._entities_api.delete_entity_llm_providers(id, _check_return_type=False)

# IP Allowlist Policy APIs

def get_ip_allowlist_policy(self, policy_id: str) -> CatalogIpAllowlistPolicy:
"""Get an IP allowlist policy by ID.

Args:
policy_id (str): IP allowlist policy identifier.

Returns:
CatalogIpAllowlistPolicy: The retrieved policy.
"""
response = self._client._do_json_request(
"GET",
f"api/v1/entities/ipAllowlistPolicies/{policy_id}",
)
response.raise_for_status()
return CatalogIpAllowlistPolicy.from_api(response.json()["data"])

def list_ip_allowlist_policies(self) -> list[CatalogIpAllowlistPolicy]:
"""List all IP allowlist policies in the organization.

Returns:
list[CatalogIpAllowlistPolicy]: List of IP allowlist policies.
"""
response = self._client._do_json_request(
"GET",
"api/v1/entities/ipAllowlistPolicies",
)
response.raise_for_status()
return [CatalogIpAllowlistPolicy.from_api(item) for item in response.json().get("data", [])]

def create_ip_allowlist_policy(self, policy: CatalogIpAllowlistPolicy) -> CatalogIpAllowlistPolicy:
"""Create a new IP allowlist policy.

Args:
policy (CatalogIpAllowlistPolicy): The policy to create.

Returns:
CatalogIpAllowlistPolicy: The created policy as returned by the server.
"""
response = self._client._do_json_request(
"POST",
"api/v1/entities/ipAllowlistPolicies",
json_body=policy.to_api(),
)
response.raise_for_status()
return CatalogIpAllowlistPolicy.from_api(response.json()["data"])

def update_ip_allowlist_policy(self, policy: CatalogIpAllowlistPolicy) -> CatalogIpAllowlistPolicy:
"""Replace an existing IP allowlist policy (full PUT).

Args:
policy (CatalogIpAllowlistPolicy): The policy with updated values.

Returns:
CatalogIpAllowlistPolicy: The updated policy as returned by the server.
"""
response = self._client._do_json_request(
"PUT",
f"api/v1/entities/ipAllowlistPolicies/{policy.id}",
json_body=policy.to_api(),
)
response.raise_for_status()
return CatalogIpAllowlistPolicy.from_api(response.json()["data"])

def delete_ip_allowlist_policy(self, policy_id: str) -> None:
"""Delete an IP allowlist policy.

Args:
policy_id (str): IP allowlist policy identifier.
"""
response = self._client._do_json_request(
"DELETE",
f"api/v1/entities/ipAllowlistPolicies/{policy_id}",
)
response.raise_for_status()

def add_targets_to_ip_allowlist_policy(
self,
policy_id: str,
targets: list[CatalogAssigneeIdentifier],
) -> None:
"""Add user or user-group targets to an IP allowlist policy.

Args:
policy_id (str): IP allowlist policy identifier.
targets (list[CatalogAssigneeIdentifier]): Users or user groups to add.
"""
request_body = {"targets": [{"id": t.id, "type": t.type} for t in targets]}
response = self._client._do_json_request(
"POST",
f"api/v1/actions/ipAllowlistPolicies/{policy_id}/addTargets",
json_body=request_body,
)
response.raise_for_status()

def remove_targets_from_ip_allowlist_policy(
self,
policy_id: str,
targets: list[CatalogAssigneeIdentifier],
) -> None:
"""Remove user or user-group targets from an IP allowlist policy.

Args:
policy_id (str): IP allowlist policy identifier.
targets (list[CatalogAssigneeIdentifier]): Users or user groups to remove.
"""
request_body = {"targets": [{"id": t.id, "type": t.type} for t in targets]}
response = self._client._do_json_request(
"POST",
f"api/v1/actions/ipAllowlistPolicies/{policy_id}/removeTargets",
json_body=request_body,
)
response.raise_for_status()

# Layout APIs

def get_declarative_notification_channels(self) -> list[CatalogDeclarativeNotificationChannel]:
Expand Down
49 changes: 49 additions & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,55 @@ def __init__(
self._ai_lake_api = apis.AILakeApi(self._api_client)
self._executions_cancellable = executions_cancellable

def _do_json_request(
self,
method: str,
endpoint: str,
json_body: dict | None = None,
) -> requests.Response:
"""Perform a JSON:API HTTP request.

Used as a low-level escape hatch when the generated API client does not
yet have the endpoint models (e.g. immediately after a new endpoint is
added to the OpenAPI spec but before the client is regenerated).

The standard GoodData request headers (Authorization, X-Requested-With,
X-GDC-VALIDATE-RELATIONS) are added automatically. Any custom headers
provided at client construction time are merged in as well.

Args:
method (str): HTTP method string, e.g. ``"GET"``, ``"POST"``,
``"PUT"``, ``"DELETE"``.
endpoint (str): API endpoint path without a leading slash, e.g.
``"api/v1/entities/ipAllowlistPolicies"``.
json_body (dict | None): Optional dict to be serialised as the JSON
request body. When given, ``Content-Type`` is set to
``application/vnd.gooddata.api+json``.

Returns:
requests.Response: The HTTP response. The caller is responsible
for checking the status code (e.g. ``response.raise_for_status()``).
"""
if not self._hostname.endswith("/"):
endpoint = f"/{endpoint}"

headers: dict[str, str] = {
"Authorization": f"Bearer {self._token}",
"Accept": "application/vnd.gooddata.api+json",
"X-Requested-With": "XMLHttpRequest",
"X-GDC-VALIDATE-RELATIONS": "true",
}
if json_body is not None:
headers["Content-Type"] = "application/vnd.gooddata.api+json"
headers.update(self._custom_headers)

return requests.request(
method=method,
url=f"{self._hostname}{endpoint}",
headers=headers,
json=json_body,
)

def _do_post_request(
self,
data: bytes,
Expand Down
Loading
Loading