From a3e6e76add35c12a02ac4ddcf219f014d53a8083 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Mon, 22 Jun 2026 17:13:31 +0200 Subject: [PATCH 1/9] Add default schema contract to evolve --- src/openhound/core/app.py | 28 +++++++++++++--------------- src/openhound/core/collect.py | 15 ++++++++++++++- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index 8b66861..ea7fc2c 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -46,7 +46,6 @@ class Contract(str, Enum): evolve = "evolve" freeze = "freeze" - discard_value = "discard_value" discard_row = "discard_row" @@ -67,9 +66,6 @@ def __init__(self, name: str, source_kind: str, help: str = "OpenGraph collector self.dlt_source: DltSource | None = None self.dlt_resources: list[DltResource] = [] self.dlt_transformers: list[DltResource] = [] - self.table_contract: Contract = Contract.evolve - self.data_type_contract: Contract = Contract.freeze - self.columns_contract: Contract = Contract.evolve # Store the graph definitions for this source self.assets: list[BaseAsset] = [] @@ -96,30 +92,32 @@ def wrapper( progress: Progress = typer.Option( Progress.tqdm, help="Select progress tracker option" ), - tables: Contract = typer.Option( + tables_contract: Contract = typer.Option( Contract.evolve, - help="Contract applied when data contains newly seen resources/tables previously not collected", + help="DLT contract applied when data contains newly seen resources/tables previously not collected", ), - columns: Contract = typer.Option( + columns_contract: Contract = typer.Option( Contract.evolve, - help="Contract applied when data contains values/keys not found in the Pydantic model", + help="DLT contract applied when data contains values/keys not found in the Pydantic model", ), - data_type: Contract = typer.Option( - Contract.freeze, - help="Contract applied when fields do not match the data types defined in the Pydantic model", + data_type_contract: Contract = typer.Option( + Contract.evolve, + help="DLT contract applied when fields do not match the data types defined in the Pydantic model", ), ) -> LoadInfo | None: + schema_contract = { + "tables": tables_contract, + "columns": columns_contract, + "data_type": data_type_contract, + } collector = Collector( name=self.name, output_path=output_path, resources=resources, progress=progress, + schema_contract=schema_contract, ) - # TODO: Implement data/table/column contracts - # self.data_type_contract = data_type - # self.columns_contract = columns - # self.table_contract = tables ctx = CollectContext(pipeline=collector) source_method: DltSource = func(ctx) if source_method: diff --git a/src/openhound/core/collect.py b/src/openhound/core/collect.py index 7d6664d..1f433a7 100644 --- a/src/openhound/core/collect.py +++ b/src/openhound/core/collect.py @@ -14,6 +14,12 @@ logger = logging.getLogger(__name__) +DEFAULT_SCHEMA_CONTRACT = { + "tables": "evolve", + "columns": "evolve", + "data_type": "evolve", +} + class Collector(BasePipeline): def __init__( @@ -22,11 +28,15 @@ def __init__( output_path: Path, resources: list[str] | None = None, progress: Progress = Progress.tqdm, + schema_contract: dict | None = None, ): self.name = name self.output_path = output_path self.resources = resources if resources else [] self.progress = progress + self.schema_contract = ( + schema_contract if schema_contract is not None else DEFAULT_SCHEMA_CONTRACT + ) @property def pipeline(self) -> Pipeline: @@ -56,7 +66,10 @@ def run(self, source_object: DltSource, **kwargs) -> LoadInfo: logger_override.set_handler(self.name) return self._run( - all_resources, write_disposition="replace", loader_file_format="jsonl" + all_resources, + write_disposition="replace", + loader_file_format="jsonl", + schema_contract=self.schema_contract, ) From 46c50081d7f076735598ebe1cb37ab68b1f3293b Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Mon, 22 Jun 2026 20:55:05 +0200 Subject: [PATCH 2/9] Bump collector versions --- pyproject.toml | 8 ++++---- uv.lock | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 33bf355..f4ac285 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,15 +23,15 @@ dependencies = [ [project.optional-dependencies] all = [ - "openhound-jamf==0.2.0", - "openhound-github==0.3.1", + "openhound-jamf==0.2.1", + "openhound-github==0.3.3", "openhound-okta==0.1.4", ] jamf = [ - "openhound-jamf==0.2.0", + "openhound-jamf==0.2.1", ] github = [ - "openhound-github==0.3.1" + "openhound-github==0.3.3" ] okta = [ diff --git a/uv.lock b/uv.lock index b75c538..14360e6 100644 --- a/uv.lock +++ b/uv.lock @@ -1262,10 +1262,10 @@ requires-dist = [ { name = "griffe-fieldz", specifier = ">=0.5.0" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "mkdocstrings", extras = ["python"], specifier = ">=1.0.0" }, - { name = "openhound-github", marker = "extra == 'all'", specifier = "==0.3.1" }, - { name = "openhound-github", marker = "extra == 'github'", specifier = "==0.3.1" }, - { name = "openhound-jamf", marker = "extra == 'all'", specifier = "==0.2.0" }, - { name = "openhound-jamf", marker = "extra == 'jamf'", specifier = "==0.2.0" }, + { name = "openhound-github", marker = "extra == 'all'", specifier = "==0.3.3" }, + { name = "openhound-github", marker = "extra == 'github'", specifier = "==0.3.3" }, + { name = "openhound-jamf", marker = "extra == 'all'", specifier = "==0.2.1" }, + { name = "openhound-jamf", marker = "extra == 'jamf'", specifier = "==0.2.1" }, { name = "openhound-okta", marker = "extra == 'all'", specifier = "==0.1.4" }, { name = "openhound-okta", marker = "extra == 'okta'", specifier = "==0.1.4" }, { name = "psutil", specifier = ">=7.2.1" }, @@ -1308,24 +1308,24 @@ wheels = [ [[package]] name = "openhound-github" -version = "0.3.1" +version = "0.3.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "joserfc" }, { name = "requests" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/92/ac/0c9d22e3815b4eb1d8cb2c865ac9db168c91ec7e322171b9e4a6b029ade9/openhound_github-0.3.1.tar.gz", hash = "sha256:40ba88c39697078d196994762b6700eb92db1b11c4cdf0ba98c25aa9176c801c", size = 3566642, upload-time = "2026-06-16T16:27:52.483Z" } +sdist = { url = "https://files.pythonhosted.org/packages/32/cf/8cd378f9ffa493c3941d438439b6ac226b64bcad55819443600f2cdb7519/openhound_github-0.3.3.tar.gz", hash = "sha256:3e71608fbf600caca4a1df8ad456e319f431e9187ff0a5791c5b3e00d1de1110", size = 3567129, upload-time = "2026-06-22T18:50:22.734Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/3c/4e/387f8089889b4514c3b1a5bf2ad47493707a5991f61643459fec247f39be/openhound_github-0.3.1-py3-none-any.whl", hash = "sha256:b7e0af3baad551e860ee6f9ea3130c537f278948efa78c515551d5f52221b1dc", size = 119795, upload-time = "2026-06-16T16:27:54.183Z" }, + { url = "https://files.pythonhosted.org/packages/c8/fa/eb1e18bc73df2ca14865e0c5d579d711a4c4ee8a64004fb2320e4a4b1045/openhound_github-0.3.3-py3-none-any.whl", hash = "sha256:dd19765a533a93b3f27f64316d0bf5626fe2dae5d05a4d3fa499b1f3d2ab9bf9", size = 120098, upload-time = "2026-06-22T18:50:21.308Z" }, ] [[package]] name = "openhound-jamf" -version = "0.2.0" +version = "0.2.1" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/9f/2a/e6cd9f1be57672dbe1384994fb06385f9bb090c45229160bc569a6f36a09/openhound_jamf-0.2.0.tar.gz", hash = "sha256:ea06915ec85974ceec8881af8b9d7935322a4c4864c556cc7ecb00ce42b19ad0", size = 3418725, upload-time = "2026-06-08T19:01:15.566Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d8/11/70bba4fa0c2b81cbc4979a966213b6a3ab6a534072d26ca0fdfb7adb6157/openhound_jamf-0.2.1.tar.gz", hash = "sha256:1dab37fa47a7cf1f27ed0f19c5766dcdf3bdfdbc3084a8ebd3c228d801a086c1", size = 3418716, upload-time = "2026-06-22T18:45:13.593Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2d/6b/f97e30acbe6c0534976a33148c397379353d21b26b6293484d31e3aadeb1/openhound_jamf-0.2.0-py3-none-any.whl", hash = "sha256:668bc846772c773691e9ffe64555e76fe19c95d6a64abc10da1197adf0bdab53", size = 34990, upload-time = "2026-06-08T19:01:17.524Z" }, + { url = "https://files.pythonhosted.org/packages/f8/1f/8576d945cd5be1a66b517b6077bb9952e604af06490163538a16a95f7b0b/openhound_jamf-0.2.1-py3-none-any.whl", hash = "sha256:ccb90891ef4b4d1ad4ba2ce49ea9ddf2a8729c2e0a74a41a9f789f682ed55c59", size = 34954, upload-time = "2026-06-22T18:45:12.659Z" }, ] [[package]] From 8e496c1cab2a488a5854b536a8e31b42c43e79fe Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Mon, 22 Jun 2026 21:52:08 +0200 Subject: [PATCH 3/9] Replace contract behaviour to discard rows instead --- src/openhound/core/app.py | 7 +------ src/openhound/core/collect.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index ea7fc2c..349ee5c 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -92,21 +92,16 @@ def wrapper( progress: Progress = typer.Option( Progress.tqdm, help="Select progress tracker option" ), - tables_contract: Contract = typer.Option( - Contract.evolve, - help="DLT contract applied when data contains newly seen resources/tables previously not collected", - ), columns_contract: Contract = typer.Option( Contract.evolve, help="DLT contract applied when data contains values/keys not found in the Pydantic model", ), data_type_contract: Contract = typer.Option( - Contract.evolve, + Contract.discard_row, help="DLT contract applied when fields do not match the data types defined in the Pydantic model", ), ) -> LoadInfo | None: schema_contract = { - "tables": tables_contract, "columns": columns_contract, "data_type": data_type_contract, } diff --git a/src/openhound/core/collect.py b/src/openhound/core/collect.py index 1f433a7..8cdc376 100644 --- a/src/openhound/core/collect.py +++ b/src/openhound/core/collect.py @@ -17,7 +17,7 @@ DEFAULT_SCHEMA_CONTRACT = { "tables": "evolve", "columns": "evolve", - "data_type": "evolve", + "data_type": "discard_row", } From bb826109c85e5977534b9e59317d5f30437a8924 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Mon, 22 Jun 2026 21:55:42 +0200 Subject: [PATCH 4/9] Added tables contact again --- src/openhound/core/app.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index 349ee5c..31b7d17 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -92,6 +92,10 @@ def wrapper( progress: Progress = typer.Option( Progress.tqdm, help="Select progress tracker option" ), + tables_contract: Contract = typer.Option( + Contract.evolve, + help="DLT contract applied when data contains newly seen resources/tables previously not collected", + ), columns_contract: Contract = typer.Option( Contract.evolve, help="DLT contract applied when data contains values/keys not found in the Pydantic model", @@ -102,6 +106,7 @@ def wrapper( ), ) -> LoadInfo | None: schema_contract = { + "tables": tables_contract, "columns": columns_contract, "data_type": data_type_contract, } From ee01c2b45e2f319223ceca43482eecad6f31ba9d Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Mon, 22 Jun 2026 22:58:42 +0200 Subject: [PATCH 5/9] Use Annotated options for use outside of the typer CLI --- src/openhound/core/app.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index 31b7d17..d06e3c7 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -92,18 +92,24 @@ def wrapper( progress: Progress = typer.Option( Progress.tqdm, help="Select progress tracker option" ), - tables_contract: Contract = typer.Option( - Contract.evolve, - help="DLT contract applied when data contains newly seen resources/tables previously not collected", - ), - columns_contract: Contract = typer.Option( - Contract.evolve, - help="DLT contract applied when data contains values/keys not found in the Pydantic model", - ), - data_type_contract: Contract = typer.Option( - Contract.discard_row, - help="DLT contract applied when fields do not match the data types defined in the Pydantic model", - ), + tables_contract: Annotated[ + Contract, + typer.Option( + help="DLT contract applied when data contains newly seen resources/tables previously not collected", + ), + ] = Contract.evolve, + columns_contract: Annotated[ + Contract, + typer.Option( + help="DLT contract applied when data contains values/keys not found in the Pydantic model", + ), + ] = Contract.evolve, + data_type_contract: Annotated[ + Contract, + typer.Option( + help="DLT contract applied when fields do not match the data types defined in the Pydantic model", + ), + ] = Contract.discard_row, ) -> LoadInfo | None: schema_contract = { "tables": tables_contract, From 768b701703585ed58629ca05c1b099bc5d764038 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 01:20:04 +0200 Subject: [PATCH 6/9] Add (patched) version of DLT's model validator --- src/openhound/core/app.py | 5 ++ src/openhound/core/validate.py | 124 +++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 src/openhound/core/validate.py diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index d06e3c7..971aecf 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -6,6 +6,7 @@ import dlt import duckdb import typer +from dlt.common.libs import pydantic as dlt_pydantic from dlt.common.pipeline import LoadInfo from dlt.extract.resource import DltResource from dlt.extract.source import DltSource @@ -13,6 +14,7 @@ from openhound.cli.collect import collect from openhound.cli.convert import convert from openhound.cli.preproc import preprocess +from openhound.core import validate from openhound.core.asset import BaseAsset, EdgeDef, NodeDef from openhound.core.collect import CollectContext, Collector from openhound.core.convert import ConvertContext, Converter, Method @@ -51,6 +53,9 @@ class Contract(str, Enum): class OpenHound: def __init__(self, name: str, source_kind: str, help: str = "OpenGraph collector"): + dlt_pydantic.create_list_model = validate.create_list_model + dlt_pydantic._classify_validation_errors = validate._classify_validation_errors + self.name = name self.source_kind = source_kind self.help = help diff --git a/src/openhound/core/validate.py b/src/openhound/core/validate.py new file mode 100644 index 0000000..e225376 --- /dev/null +++ b/src/openhound/core/validate.py @@ -0,0 +1,124 @@ +import logging +from typing import Any, Type + +from dlt.common.schema import DataValidationError +from dlt.common.schema.typing import TSchemaEvolutionMode +from dlt.common.typing import TDataItem +from pydantic import BaseModel, ValidationError, create_model +from pydantic.functional_validators import WrapValidator +from typing_extensions import Annotated + +logger = logging.getLogger("dlt") + + +def create_list_model( + model: type[BaseModel], + column_mode: TSchemaEvolutionMode = "freeze", + data_mode: TSchemaEvolutionMode = "freeze", +) -> type[BaseModel]: + """Creates a model from `model` for validating list of items in batch.""" + if column_mode == "discard_row" or data_mode == "discard_row": + + def _lenient_item_validator(value: Any, handler: Any) -> BaseModel | None: + try: + return handler(value) + except ValidationError as val_err: + for err in val_err.errors(): + if err["type"] == "model_type": + raise + + logger.warning( + "DLT discarded row during listed Pydantic validation", + extra={ + "resource": model.__name__, + "pydantic_errors": val_err.errors( + include_input=False, include_context=False + ), + }, + ) + return None + except Exception: + return None + + item_type = Annotated[model | None, WrapValidator(_lenient_item_validator)] # type: ignore[valid-type] + return create_model( + "LenientList" + model.__name__, + items=(list[item_type], ...), # type: ignore[valid-type] + ) + + return create_model( + "List" + model.__name__, + items=(list[model], ...), # type: ignore[valid-type] + ) + + +def _classify_validation_errors( + table_name: str, + model: Type[BaseModel], + item: TDataItem, + exc: ValidationError, + column_mode: TSchemaEvolutionMode, + data_mode: TSchemaEvolutionMode, +) -> None: + """Classifies validation errors and raises DataValidationError for freeze mode. + + For discard_row mode, returns without raising so the caller can discard the item. + For model_type errors (item is not a mapping), always re-raises. + """ + for err in exc.errors(): + if err["type"] == "model_type": + raise exc + if err["type"] == "extra_forbidden": + if column_mode == "freeze": + raise DataValidationError( + None, + table_name, + str(err["loc"]), + "columns", + "freeze", + model, + {"columns": "freeze"}, + item, + err["msg"], + ) from exc + elif column_mode == "discard_row": + logger.warning( + "DLT discarded row during Pydantic validation", + extra={ + "resource": table_name, + "pydantic_errors": exc.errors( + include_input=False, include_context=False + ), + }, + ) + return + raise NotImplementedError( + f"`{column_mode=:}` not implemented for Pydantic validation" + ) + else: + if data_mode == "freeze": + raise DataValidationError( + None, + table_name, + str(err["loc"]), + "data_type", + "freeze", + model, + {"data_type": "freeze"}, + item, + err["msg"], + ) from exc + elif data_mode == "discard_row": + logger.warning( + "DLT discarded row during Pydantic validation", + extra={ + "resource": table_name, + "pydantic_errors": exc.errors( + include_input=False, include_context=False + ), + }, + ) + return + raise NotImplementedError( + f"`{data_mode=:}` not implemented for Pydantic validation" + ) From 832851db798c8bd2a38c394be6c10d4cc841b9fe Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 01:21:34 +0200 Subject: [PATCH 7/9] Add description to validator --- src/openhound/core/validate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/openhound/core/validate.py b/src/openhound/core/validate.py index e225376..b7eec48 100644 --- a/src/openhound/core/validate.py +++ b/src/openhound/core/validate.py @@ -1,3 +1,7 @@ +# This is a patched version of DLT's model validator which adds logging when resources fail +# pydantic validation and the schema contract is set to discard_row +# https://github.com/dlt-hub/dlt/blob/devel/dlt/common/libs/pydantic.py + import logging from typing import Any, Type From 19efd33b26a4a724b1d6dd381ac061f71e233e02 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 01:38:08 +0200 Subject: [PATCH 8/9] Add pytest to check if the custom overrides log pydantic validation errors when the contract is set to discard_row --- tests/test_dlt_pydantic_override.py | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/test_dlt_pydantic_override.py diff --git a/tests/test_dlt_pydantic_override.py b/tests/test_dlt_pydantic_override.py new file mode 100644 index 0000000..8cf32e0 --- /dev/null +++ b/tests/test_dlt_pydantic_override.py @@ -0,0 +1,59 @@ +import logging + +import pytest +from dlt.common.libs import pydantic as dlt_pydantic +from dlt.common.schema.exceptions import DataValidationError +from dlt.extract.validation import PydanticValidator +from pydantic import BaseModel + +from openhound.core import validate + + +class ExampleModel(BaseModel): + id: int + required_field: str + + +@pytest.fixture(autouse=True) +def patch_dlt_validation(monkeypatch): + monkeypatch.setattr(dlt_pydantic, "create_list_model", validate.create_list_model) + monkeypatch.setattr( + dlt_pydantic, + "_classify_validation_errors", + validate._classify_validation_errors, + ) + + +def _validator(column_mode: str = "evolve", data_mode: str = "discard_row"): + validator = PydanticValidator(ExampleModel, column_mode, data_mode) + validator.table_name = "users" + return validator + + +def test_dlt_pydantic_discard_log(caplog): + """Checks if the validator (with discard_row) logs data type errors""" + caplog.set_level(logging.WARNING, logger="dlt") + + result = _validator()(dict(id="not-an-int")) + + assert result is None + assert len(caplog.records) == 1 + record = caplog.records[0] + + assert record.resource == "users" + assert record.pydantic_errors[0]["type"] == "int_parsing" + assert record.pydantic_errors[0]["loc"] == ("id",) + assert record.pydantic_errors[1]["type"] == "missing" + assert record.pydantic_errors[1]["loc"] == ("required_field",) + + +def test_dlt_pydantic_freeze_exception(caplog): + """Check if DLT raises an exception instead of continuing when the data mode is set to freeze""" + caplog.set_level(logging.WARNING, logger="dlt") + + with pytest.raises(DataValidationError) as exc_info: + _validator(data_mode="freeze")(dict(id="not-an-int")) + + assert "id" in exc_info.value.data_item + assert exc_info.value.data_item["id"] == "not-an-int" + assert exc_info.value.contract_mode == "freeze" From 1bd8009112b61531c3afe3669303435a119bee89 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 01:39:55 +0200 Subject: [PATCH 9/9] Addd test_dlt_pydantic_override test to workflow --- .github/workflows/test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 043e7f0..3a75f7a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -67,3 +67,7 @@ jobs: - name: Run DuckDB lookup exception handling tests run: | .venv/bin/pytest tests/test_lookup.py -v + + - name: Run DLT Pydantic exception handling (override) test + run: | + .venv/bin/pytest tests/test_dlt_pydantic_override.py -v