-
Notifications
You must be signed in to change notification settings - Fork 3
Fix BED-8727: Add a default schema contract for DLT collection pipeline #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a3e6e76
Add default schema contract to evolve
d3vzer0 46c5008
Bump collector versions
d3vzer0 8e496c1
Replace contract behaviour to discard rows instead
d3vzer0 bb82610
Added tables contact again
d3vzer0 bea2767
Merge branch 'main' into fix/schemacontracts
d3vzer0 ee01c2b
Use Annotated options for use outside of the typer CLI
d3vzer0 768b701
Add (patched) version of DLT's model validator
d3vzer0 832851d
Add description to validator
d3vzer0 19efd33
Add pytest to check if the custom overrides log pydantic validation e…
d3vzer0 1bd8009
Addd test_dlt_pydantic_override test to workflow
d3vzer0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| # This is a patched version of DLT's model validator which adds logging when resources fail | ||
| # pydantic validation and the schema contract is set to discard_row | ||
| # https://github.com/dlt-hub/dlt/blob/devel/dlt/common/libs/pydantic.py | ||
|
|
||
| import logging | ||
| from typing import Any, Type | ||
|
|
||
| from dlt.common.schema import DataValidationError | ||
| from dlt.common.schema.typing import TSchemaEvolutionMode | ||
| from dlt.common.typing import TDataItem | ||
| from pydantic import BaseModel, ValidationError, create_model | ||
| from pydantic.functional_validators import WrapValidator | ||
| from typing_extensions import Annotated | ||
|
|
||
| logger = logging.getLogger("dlt") | ||
|
|
||
|
|
||
| def create_list_model( | ||
| model: type[BaseModel], | ||
| column_mode: TSchemaEvolutionMode = "freeze", | ||
| data_mode: TSchemaEvolutionMode = "freeze", | ||
| ) -> type[BaseModel]: | ||
| """Creates a model from `model` for validating list of items in batch.""" | ||
| if column_mode == "discard_row" or data_mode == "discard_row": | ||
|
|
||
| def _lenient_item_validator(value: Any, handler: Any) -> BaseModel | None: | ||
| try: | ||
| return handler(value) | ||
| except ValidationError as val_err: | ||
| for err in val_err.errors(): | ||
| if err["type"] == "model_type": | ||
| raise | ||
|
|
||
| logger.warning( | ||
| "DLT discarded row during listed Pydantic validation", | ||
| extra={ | ||
| "resource": model.__name__, | ||
| "pydantic_errors": val_err.errors( | ||
| include_input=False, include_context=False | ||
| ), | ||
| }, | ||
| ) | ||
| return None | ||
| except Exception: | ||
| return None | ||
|
|
||
| item_type = Annotated[model | None, WrapValidator(_lenient_item_validator)] # type: ignore[valid-type] | ||
| return create_model( | ||
| "LenientList" + model.__name__, | ||
| items=(list[item_type], ...), # type: ignore[valid-type] | ||
| ) | ||
|
|
||
| return create_model( | ||
| "List" + model.__name__, | ||
| items=(list[model], ...), # type: ignore[valid-type] | ||
| ) | ||
|
|
||
|
|
||
| def _classify_validation_errors( | ||
| table_name: str, | ||
| model: Type[BaseModel], | ||
| item: TDataItem, | ||
| exc: ValidationError, | ||
| column_mode: TSchemaEvolutionMode, | ||
| data_mode: TSchemaEvolutionMode, | ||
| ) -> None: | ||
| """Classifies validation errors and raises DataValidationError for freeze mode. | ||
| For discard_row mode, returns without raising so the caller can discard the item. | ||
| For model_type errors (item is not a mapping), always re-raises. | ||
| """ | ||
| for err in exc.errors(): | ||
| if err["type"] == "model_type": | ||
| raise exc | ||
| if err["type"] == "extra_forbidden": | ||
| if column_mode == "freeze": | ||
| raise DataValidationError( | ||
| None, | ||
| table_name, | ||
| str(err["loc"]), | ||
| "columns", | ||
| "freeze", | ||
| model, | ||
| {"columns": "freeze"}, | ||
| item, | ||
| err["msg"], | ||
| ) from exc | ||
| elif column_mode == "discard_row": | ||
| logger.warning( | ||
| "DLT discarded row during Pydantic validation", | ||
| extra={ | ||
| "resource": table_name, | ||
| "pydantic_errors": exc.errors( | ||
| include_input=False, include_context=False | ||
| ), | ||
| }, | ||
| ) | ||
| return | ||
| raise NotImplementedError( | ||
| f"`{column_mode=:}` not implemented for Pydantic validation" | ||
| ) | ||
| else: | ||
| if data_mode == "freeze": | ||
| raise DataValidationError( | ||
| None, | ||
| table_name, | ||
| str(err["loc"]), | ||
| "data_type", | ||
| "freeze", | ||
| model, | ||
| {"data_type": "freeze"}, | ||
| item, | ||
| err["msg"], | ||
| ) from exc | ||
| elif data_mode == "discard_row": | ||
| logger.warning( | ||
| "DLT discarded row during Pydantic validation", | ||
| extra={ | ||
| "resource": table_name, | ||
| "pydantic_errors": exc.errors( | ||
| include_input=False, include_context=False | ||
| ), | ||
| }, | ||
| ) | ||
| return | ||
| raise NotImplementedError( | ||
| f"`{data_mode=:}` not implemented for Pydantic validation" | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import logging | ||
|
|
||
| import pytest | ||
| from dlt.common.libs import pydantic as dlt_pydantic | ||
| from dlt.common.schema.exceptions import DataValidationError | ||
| from dlt.extract.validation import PydanticValidator | ||
| from pydantic import BaseModel | ||
|
|
||
| from openhound.core import validate | ||
|
|
||
|
|
||
| class ExampleModel(BaseModel): | ||
| id: int | ||
| required_field: str | ||
|
|
||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def patch_dlt_validation(monkeypatch): | ||
| monkeypatch.setattr(dlt_pydantic, "create_list_model", validate.create_list_model) | ||
| monkeypatch.setattr( | ||
| dlt_pydantic, | ||
| "_classify_validation_errors", | ||
| validate._classify_validation_errors, | ||
| ) | ||
|
|
||
|
|
||
| def _validator(column_mode: str = "evolve", data_mode: str = "discard_row"): | ||
| validator = PydanticValidator(ExampleModel, column_mode, data_mode) | ||
| validator.table_name = "users" | ||
| return validator | ||
|
|
||
|
|
||
| def test_dlt_pydantic_discard_log(caplog): | ||
| """Checks if the validator (with discard_row) logs data type errors""" | ||
| caplog.set_level(logging.WARNING, logger="dlt") | ||
|
|
||
| result = _validator()(dict(id="not-an-int")) | ||
|
|
||
| assert result is None | ||
| assert len(caplog.records) == 1 | ||
| record = caplog.records[0] | ||
|
|
||
| assert record.resource == "users" | ||
| assert record.pydantic_errors[0]["type"] == "int_parsing" | ||
| assert record.pydantic_errors[0]["loc"] == ("id",) | ||
| assert record.pydantic_errors[1]["type"] == "missing" | ||
| assert record.pydantic_errors[1]["loc"] == ("required_field",) | ||
|
|
||
|
|
||
| def test_dlt_pydantic_freeze_exception(caplog): | ||
| """Check if DLT raises an exception instead of continuing when the data mode is set to freeze""" | ||
| caplog.set_level(logging.WARNING, logger="dlt") | ||
|
|
||
| with pytest.raises(DataValidationError) as exc_info: | ||
| _validator(data_mode="freeze")(dict(id="not-an-int")) | ||
|
|
||
| assert "id" in exc_info.value.data_item | ||
| assert exc_info.value.data_item["id"] == "not-an-int" | ||
| assert exc_info.value.contract_mode == "freeze" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discard_value is a supported contract but not supported when using Pydantic models, which is what we use for all the collectors