Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/dve/core_engine/backends/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@ def __init__(self, *args: object, messages: Messages) -> None:
"""The messages to be returned as part of the error."""


class UnableToParseCSVError(MessageBearingError):
"""An error raised when unable to parse a CSV file"""

def __init__(
self,
entity_name: str,
field_check_error_message: str,
field_check_error_code: str
):
super().__init__(
messages=[
FeedbackMessage(
entity="csv_structure",
record={
entity_name: "Unable to parse file. Please check the structure of the file."
},
failure_type="submission",
is_informational=False,
error_type="csv read",
error_location=entity_name,
error_message=field_check_error_message,
error_code=field_check_error_code,
)
]
)


class BackendErrorMixin(ABC, BackendError):
"""A mixin used to create backend error type."""

Expand Down
49 changes: 41 additions & 8 deletions src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@

import duckdb as ddb
import polars as pl
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv
from duckdb import (
DuckDBPyConnection,
DuckDBPyRelation,
InvalidInputException,
StarExpression,
read_csv,
)
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.exceptions import (
EmptyFileError,
MessageBearingError,
UnableToParseCSVError,
)
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_record_index,
duckdb_write_parquet,
Expand Down Expand Up @@ -107,7 +117,14 @@ def read_to_relation( # pylint: disable=unused-argument

reader_options["columns"] = ddb_schema

rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
try:
rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
except InvalidInputException as exc:
raise UnableToParseCSVError(
entity_name="csv_structure",
field_check_error_message=self.field_check_error_message,
field_check_error_code=self.field_check_error_code,
) from exc

if self.null_empty_strings:
cleaned_cols = ",".join(
Expand Down Expand Up @@ -156,11 +173,18 @@ def read_to_relation( # pylint: disable=unused-argument

# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
# redundant
df = self.add_record_index( # pylint: disable=W0612
pl.scan_csv(resource, **reader_options).select( # type: ignore
list(polars_types.keys())
try:
df = self.add_record_index( # pylint: disable=W0612
pl.scan_csv(resource, **reader_options).select( # type: ignore
list(polars_types.keys())
)
)
)
except pl.exceptions.PolarsError as exc:
raise UnableToParseCSVError(
entity_name="csv_structure",
field_check_error_message=self.field_check_error_message,
field_check_error_code=self.field_check_error_code,
) from exc

if self.null_empty_strings:
pl_exprs = [
Expand All @@ -170,7 +194,16 @@ def read_to_relation( # pylint: disable=unused-argument
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
df = df.select(pl_exprs)

return self._connection.sql("SELECT * FROM df")
entity = self._connection.sql("SELECT * FROM df")

if entity.pl().shape[0] == 0:
raise UnableToParseCSVError(
entity_name="csv_structure",
field_check_error_message=self.field_check_error_message,
field_check_error_code=self.field_check_error_code,
)

return entity


class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import csv
from pathlib import Path
from typing import Dict, Iterator, Optional
from uuid import uuid4

import pandas as pd
import pytest
from pydantic import BaseModel

from dve.core_engine.backends.exceptions import EmptyFileError, FieldCountMismatch, MessageBearingError
from dve.core_engine.backends.exceptions import (
EmptyFileError,
FieldCountMismatch,
MessageBearingError,
UnableToParseCSVError,
)
from dve.core_engine.backends.readers import CSVFileReader
from dve.core_engine.backends.readers.utilities import get_all_model_fields
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
from datetime import date, datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from uuid import uuid4

import duckdb
import polars as pl
import pytest
from duckdb import DuckDBPyRelation
from pydantic import BaseModel

from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.exceptions import (
EmptyFileError,
MessageBearingError,
UnableToParseCSVError,
)
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
get_duckdb_type_from_annotation,
)
Expand All @@ -21,7 +26,7 @@
from dve.core_engine.backends.utilities import stringify_model
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME

# pylint: disable=C0115,C0116,W0621
# pylint: disable=C0103,C0115,C0116,W0621


class SimpleModel(BaseModel):
Expand Down Expand Up @@ -164,6 +169,25 @@ def test_DuckDBCSVReader_with_null_empty_strings(self, temp_dir):
assert entity.shape[0] == 3
assert entity.filter("test_col IS NULL").shape[0] == 2

def test_DuckDBCSVReader_with_malformed_header(self, temp_dir):
test_data_headers = '"varchar_field,bigint_field,date_field,timestamp_field"'
row_data = "hello,1,2023-04-01,2023-04-01T12:30:00"
temp_id = uuid4().hex
fqp = Path(temp_dir, f"{temp_id}.csv")

with open(fqp, mode="w", encoding="utf-8") as f:
f.write(f"{test_data_headers}\n{row_data}")

reader = DuckDBCSVReader(
header=True,
delim=",",
connection=duckdb.connect(),
)

with pytest.raises(UnableToParseCSVError) as err:
reader.read_to_relation(fqp.as_posix(), "test", SimpleModel)
assert len(err.messages) == 1


class TestPolarsToDuckDBCSVReader:
"""Test PolarsToDuckDBCSVReader"""
Expand Down Expand Up @@ -199,6 +223,26 @@ def test_PolarsToDuckDBCSVReader_with_null_empty_strings(self, temp_dir):
assert entity.shape[0] == 3
assert entity.filter("test_col IS NULL").shape[0] == 2

def test_PolarsToDuckDBCSVReader_with_malformed_header(self, temp_dir):
test_data_headers = '"varchar_field,bigint_field,date_field,timestamp_field"'
row_data = "hello,1,2023-04-01,2023-04-01T12:30:00"
temp_id = uuid4().hex
fqp = Path(temp_dir, f"{temp_id}.csv")

with open(fqp, mode="w", encoding="utf-8") as f:
f.write(f"{test_data_headers}\n{row_data}")

reader = PolarsToDuckDBCSVReader(
header=True,
delim=",",
connection=duckdb.connect(),
)

with pytest.raises(UnableToParseCSVError) as err:
reader.read_to_relation(fqp.as_posix(), "test", SimpleModel)
assert len(err.messages) == 1


class TestDuckDBCSVRepeatingHeaderReader:
"""Test DuckDBCSVRepeatingHeaderReader"""

Expand Down
Loading