Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `tilebox-workflows`: Added worker runtime mode for executing registered tasks over the worker RPC service, backed by a
shared task executor facade used by both worker and polling runners.

## [0.52.0]

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import core_pb2 as datasets_dot_v1_dot_core__pb2


class CollectionServiceStub(object):
class CollectionServiceStub:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(self, channel):
_registered_method=True)


class CollectionServiceServicer(object):
class CollectionServiceServicer:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down Expand Up @@ -97,7 +97,7 @@ def add_CollectionServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class CollectionService(object):
class CollectionService:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import data_access_pb2 as datasets_dot_v1_dot_data__access__pb2


class DataAccessServiceStub(object):
class DataAccessServiceStub:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand All @@ -28,7 +28,7 @@ def __init__(self, channel):
_registered_method=True)


class DataAccessServiceServicer(object):
class DataAccessServiceServicer:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand Down Expand Up @@ -67,7 +67,7 @@ def add_DataAccessServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DataAccessService(object):
class DataAccessService:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tilebox.datasets.datasets.v1 import data_ingestion_pb2 as datasets_dot_v1_dot_data__ingestion__pb2


class DataIngestionServiceStub(object):
class DataIngestionServiceStub:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand All @@ -27,7 +27,7 @@ def __init__(self, channel):
_registered_method=True)


class DataIngestionServiceServicer(object):
class DataIngestionServiceServicer:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand Down Expand Up @@ -64,7 +64,7 @@ def add_DataIngestionServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DataIngestionService(object):
class DataIngestionService:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import datasets_pb2 as datasets_dot_v1_dot_datasets__pb2


class DatasetServiceStub(object):
class DatasetServiceStub:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(self, channel):
_registered_method=True)


class DatasetServiceServicer(object):
class DatasetServiceServicer:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down Expand Up @@ -113,7 +113,7 @@ def add_DatasetServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DatasetService(object):
class DatasetService:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down
4 changes: 3 additions & 1 deletion tilebox-datasets/tilebox/datasets/tilebox/v1/query_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions tilebox-datasets/tilebox/datasets/tilebox/v1/query_pb2.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from tilebox.datasets.buf.validate import validate_pb2 as _validate_pb2
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from tilebox.datasets.tilebox.v1 import id_pb2 as _id_pb2
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from collections.abc import Mapping as _Mapping
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class SortDirection(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
SORT_DIRECTION_UNSPECIFIED: _ClassVar[SortDirection]
SORT_DIRECTION_ASCENDING: _ClassVar[SortDirection]
SORT_DIRECTION_DESCENDING: _ClassVar[SortDirection]
SORT_DIRECTION_UNSPECIFIED: SortDirection
SORT_DIRECTION_ASCENDING: SortDirection
SORT_DIRECTION_DESCENDING: SortDirection

class TimeInterval(_message.Message):
__slots__ = ("start_time", "end_time", "start_exclusive", "end_inclusive")
START_TIME_FIELD_NUMBER: _ClassVar[int]
Expand Down
4 changes: 3 additions & 1 deletion tilebox-workflows/tests/clusters/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from tilebox.workflows.data import (
Cluster,
)
from tilebox.workflows.workflows.v1.core_pb2 import Cluster as ClusterMessage
from tilebox.workflows.workflows.v1.workflows_pb2 import (
Cluster as ClusterMessage,
)
from tilebox.workflows.workflows.v1.workflows_pb2 import (
CreateClusterRequest,
DeleteClusterRequest,
Expand Down
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
3 changes: 2 additions & 1 deletion tilebox-workflows/tilebox/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

from tilebox.workflows.client import Client
from tilebox.workflows.data import Job
from tilebox.workflows.runner.runner import Runner
from tilebox.workflows.task import ExecutionContext, Task

__all__ = ["Client", "ExecutionContext", "Job", "Task"]
__all__ = ["Client", "ExecutionContext", "Job", "Runner", "Task"]


def _init_logging(level: str = "INFO") -> None:
Expand Down
51 changes: 27 additions & 24 deletions tilebox-workflows/tilebox/workflows/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import logging
import os
import warnings
from uuid import uuid4
from uuid import UUID, uuid4

from _tilebox.grpc.channel import open_channel, parse_channel_info
from tilebox.datasets.sync.client import Client as DatasetsClient
from tilebox.workflows.automations.client import AutomationClient, AutomationService
from tilebox.workflows.cache import JobCache, NoCache
from tilebox.workflows.clusters.client import ClusterClient, ClusterSlugLike, to_cluster_slug
Expand All @@ -22,13 +21,21 @@
_create_tilebox_logger_provider,
)
from tilebox.workflows.observability.tracing import WorkflowTracer
from tilebox.workflows.runner.executor import LazyStorageLocations
from tilebox.workflows.runner.runner import Runner
from tilebox.workflows.runner.task_runner import TaskRunner, _LeaseRenewer
from tilebox.workflows.runner.task_service import TaskService
from tilebox.workflows.task import Task


class Client:
def __init__(
self, *, url: str = "https://api.tilebox.com", token: str | None = None, name: str | None = None
self,
*,
url: str = "https://api.tilebox.com",
token: str | None = None,
name: str | None = None,
client_id: UUID | None = None,
) -> None:
"""
Create a Tilebox workflows client.
Expand All @@ -39,13 +46,14 @@ def __init__(
name: An optional name of the client, used as service.name for telemetry. If not set, defaults to
the service name provided by `tilebox.workflows.observability.tracing.configure_otel_tracing`,
or "tilebox-python" if no external tracer is configured.
client_id: An optional stable id used to scope internal loggers. Defaults to a random id.
"""
token = _token_from_env(url, token)
self._auth: dict[str, str] = {"token": token, "url": url}
self._channel = open_channel(url, token)

# configure logging and tracing
self._client_id = uuid4() # a random uuid to scope loggers to this client instance
self._client_id = client_id or uuid4() # a random uuid to scope loggers to this client instance
self._logger_provider = _create_tilebox_logger_provider(service=name, url=url, token=token)

# task logger is the logger available for users to emit logs from within a Task.execute method, via
Expand Down Expand Up @@ -107,9 +115,10 @@ def jobs(self) -> JobClient:
def runner(
self,
cluster: ClusterSlugLike | None = None,
tasks: list[type] | None = None,
tasks: list[type[Task]] | None = None,
cache: JobCache | None = None,
context: type[RunnerContext] | None = None,
runner: Runner | None = None,
) -> TaskRunner:
"""Initialize a task runner.

Expand All @@ -118,30 +127,25 @@ def runner(
tasks: A list of task the runner is able to execute.
cache: The cache to share between tasks.
context: The type of the runner context to use. Defaults to RunnerContext.
runner: A runner definition containing tasks, cache and context configuration.

Returns:
A task runner.
"""
if runner is not None and (tasks is not None or cache is not None or context is not None):
raise ValueError("Pass either runner or tasks/cache/context, not both.")

runner_definition = runner or Runner(tasks=tasks, cache=cache, context=context)
if cache is None:
cache = NoCache() # a no-op cache that will raise an error if it's used
cache = runner_definition.cache or NoCache() # a no-op cache that will raise an error if it's used

found_cluster = self.clusters().find(to_cluster_slug(cluster or ""))

try:
storage_locations = self.automations().storage_locations()
except: # noqa: E722
# if fetching storage locations fails, we just disable this feature, and don't crash all runners
# lets refactor this to a lazy loading mechanism in the future
storage_locations = []

runner_context_type = context or RunnerContext
runner_context = runner_context_type(
self._tracer,
datasets_client=DatasetsClient(**self._auth), # ty: ignore[invalid-argument-type]
storage_locations=storage_locations,
)
runner_context_type = runner_definition.context or RunnerContext
runner_context = runner_context_type(self._tracer)
runner_context.storage_locations = LazyStorageLocations(self, runner_context)

runner = TaskRunner(
task_runner = TaskRunner(
TaskService(self._channel),
found_cluster.slug,
cache,
Expand All @@ -152,11 +156,10 @@ def runner(
runner_logger=StructuredLogger(self._runner_logger, {}),
)

if tasks is not None:
for task in tasks:
runner.register(task)
for task in runner_definition.tasks_by_identifier.values():
task_runner.register(task)

return runner
return task_runner

def clusters(self) -> ClusterClient:
"""
Expand Down
Loading
Loading