Coverage for src/fluree_py/query/select/pydantic/builder.py: 90%
136 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 03:03 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 03:03 +0000
1from dataclasses import dataclass, field
2from types import UnionType
3from typing import (
4 Any,
5 Protocol,
6 Type,
7 TypeAlias,
8 TypeVar,
9 Union,
10 get_args,
11 get_origin,
12 get_type_hints,
13 runtime_checkable,
14)
16from pydantic import BaseModel, ConfigDict
18from fluree_py.query.select.pydantic.type_checker import TypeChecker
19from fluree_py.query.select.pydantic.warning_manager import WarningManager
20from fluree_py.query.select.pydantic.warning import (
21 ListOrderWarning,
22 PossibleEmptyModelWarning,
23)
24from fluree_py.query.select.pydantic.error import (
25 DeeplyNestedStructureError,
26 FlureeSelectError,
27 InvalidFieldTypeError,
28 ModelConfigError,
29 MissingIdFieldError,
30 TypeProcessingError,
31)
34T = TypeVar("T", bound=BaseModel)
35FieldType: TypeAlias = Union[
36 str,
37 int,
38 float,
39 bool,
40 list[Any],
41 dict[str, Any],
42 type[BaseModel],
43 None,
44] # More specific type for field types in Pydantic models
47@runtime_checkable
48class HasModelConfig(Protocol):
49 """Protocol for objects that have a model_config attribute."""
51 model_config: ConfigDict
54@dataclass
55class FlureeSelectBuilder:
56 """Builds Fluree select queries from Pydantic models.
58 Example:
59 >>> class User(BaseModel):
60 ... id: str
61 ... name: str
62 >>> builder = FlureeSelectBuilder()
63 >>> query = builder.build(User)
64 >>> assert query == ["*"]
65 """
67 warning_manager: WarningManager = field(default_factory=WarningManager)
68 select: list[Any] = field(default_factory=lambda: ["*"])
69 _processed_models: set[Type[BaseModel]] = field(default_factory=set)
71 def _validate_model_config(self, model: Type[BaseModel]) -> None:
72 """Validate the model configuration.
74 Raises:
75 ModelConfigError: If the model configuration is invalid
76 """
77 if not TypeChecker.has_model_config(model):
78 return
80 try:
81 extra = model.model_config.get("extra", "ignore")
82 if extra not in ("allow", "ignore", "forbid"):
83 raise ModelConfigError(
84 f"Invalid 'extra' configuration value: {extra}. "
85 "Must be one of: 'allow', 'ignore', 'forbid'"
86 )
87 except Exception as e:
88 if not isinstance(e, FlureeSelectError):
89 raise ModelConfigError(
90 f"Error validating model configuration for {model.__name__}: {str(e)}"
91 ) from e
92 raise
94 def _process_nested_model(
95 self,
96 field_name: str,
97 field_type: Type[BaseModel],
98 ) -> dict[str, Any]:
99 """Process a nested model and return its select structure.
101 Raises:
102 MissingIdFieldError: If the nested model requires an id field but doesn't have one
103 ModelConfigError: If there's an issue with the model configuration
104 """
105 # Prevent infinite recursion
106 if field_type in self._processed_models:
107 return {field_name: ["*"]}
108 self._processed_models.add(field_type)
110 # Validate model configuration
111 self._validate_model_config(field_type)
113 if TypeChecker.check_model_requires_id(
114 field_type
115 ) and not TypeChecker.check_model_has_id(field_type):
116 raise MissingIdFieldError(
117 f"Nested model '{field_type.__name__}' must have an 'id' field"
118 )
120 fields = get_type_hints(field_type, include_extras=True)
121 select = ["*"]
123 for nested_field_name, nested_field_type in fields.items():
124 if TypeChecker.is_id_field(nested_field_name):
125 continue
127 real_type = TypeChecker.get_real_type(nested_field_type)
128 self._process_field(nested_field_name, real_type, select)
130 return {field_name: select}
132 def _process_field(
133 self, field_name: str, field_type: Any, select: list[Any]
134 ) -> None:
135 """Process a field and add its select structure to the result.
137 Raises:
138 InvalidFieldTypeError: If the field type is not supported
139 TypeProcessingError: If there's an error processing the field type
140 """
141 try:
142 match field_type:
143 case t if TypeChecker.is_list_type(t):
144 self.warning_manager.add_warning(
145 ListOrderWarning,
146 f"Field '{field_name}' is a list type. Order will be non-deterministic.",
147 )
148 args = get_args(t)
149 if args:
150 inner_type = TypeChecker.get_real_type(args[0])
151 if TypeChecker.is_base_model(inner_type):
152 select.append(
153 self._process_nested_model(field_name, inner_type)
154 )
155 elif TypeChecker.is_dict_type(inner_type):
156 select.append({field_name: ["*"]})
157 return
159 case t if TypeChecker.is_base_model(t):
160 select.append(self._process_nested_model(field_name, t))
161 return
163 case t if TypeChecker.is_dict_type(t):
164 select.append({field_name: ["*"]})
165 return
167 case t if TypeChecker.is_primitive_type(t):
168 # Primitive types are included in "*" so we don't need to add them explicitly
169 return
171 case _:
172 raise InvalidFieldTypeError(
173 f"Unsupported field type for '{field_name}': {field_type}"
174 )
175 except Exception as e:
176 if not isinstance(e, FlureeSelectError):
177 raise TypeProcessingError(
178 f"Error processing field '{field_name}' of type {field_type}: {str(e)}"
179 ) from e
180 raise
182 def _handle_union_type(self, field_type: Any) -> Any:
183 """Handle Union types by extracting the first non-None type."""
184 origin = get_origin(field_type)
185 if origin is UnionType:
186 types = [t for t in get_args(field_type) if t is not type(None)] # noqa: E721
187 if types:
188 return types[0]
189 return field_type
191 def _check_deeply_nested_structures(self, fields: dict[str, Any]) -> None:
192 """Check for deeply nested structures that are not supported.
194 Raises:
195 DeeplyNestedStructureError: If a deeply nested structure is found
196 """
197 for field_name, field_type in fields.items():
198 try:
199 match field_type:
200 case t if TypeChecker.is_dict_type(t):
201 if TypeChecker.dict_max_depth(t) > 1:
202 raise DeeplyNestedStructureError(
203 f"Deeply nested dictionaries are not supported in field '{field_name}'"
204 )
206 case t if TypeChecker.is_list_type(t):
207 args = get_args(t)
208 if args:
209 inner_type = args[0]
210 if TypeChecker.is_tuple_type(inner_type):
211 raise DeeplyNestedStructureError(
212 f"Tuples are not supported in field '{field_name}'"
213 )
214 if TypeChecker.dict_max_depth(inner_type) > 1:
215 raise DeeplyNestedStructureError(
216 f"Deeply nested dictionaries are not supported in field '{field_name}'"
217 )
218 case _:
219 pass
220 except Exception as e:
221 if not isinstance(e, FlureeSelectError):
222 raise TypeProcessingError(
223 f"Error checking nested structure for field '{field_name}': {str(e)}"
224 ) from e
225 raise
227 def _check_optional_fields(self, fields: dict[str, Any]) -> None:
228 """Check for optional fields and add appropriate warnings."""
229 non_id_fields = [f for f in fields if not TypeChecker.is_id_field(f)]
230 if not non_id_fields:
231 return
233 all_optional = True
234 for field_name in non_id_fields:
235 field_type = fields[field_name]
236 real_type = TypeChecker.get_real_type(field_type)
238 if TypeChecker.is_list_type(real_type) or TypeChecker.is_base_model(
239 real_type
240 ):
241 all_optional = False
242 break
244 if all_optional:
245 self.warning_manager.add_warning(
246 PossibleEmptyModelWarning,
247 "Model has only optional fields. This may result in an empty model when inserting.",
248 )
249 elif len(non_id_fields) == 1:
250 field_name = non_id_fields[0]
251 field_type = fields[field_name]
252 real_type = TypeChecker.get_real_type(field_type)
253 if not (
254 TypeChecker.is_list_type(real_type)
255 or TypeChecker.is_base_model(real_type)
256 ):
257 self.warning_manager.add_warning(
258 PossibleEmptyModelWarning,
259 f"Model has a single optional field '{field_name}'. This may result in an empty model when inserting.",
260 )
262 def build(self, pydantic_model: Type[BaseModel]) -> list[Any]:
263 """Build a Fluree select query structure from a Pydantic model.
265 Raises:
266 MissingIdFieldError: If the model is missing a required 'id' field
267 DeeplyNestedStructureError: If the model contains unsupported deeply nested structures
268 ModelConfigError: If there's an issue with the model configuration
269 """
270 model_type = (
271 type(pydantic_model)
272 if isinstance(pydantic_model, BaseModel)
273 else pydantic_model
274 )
275 fields = get_type_hints(model_type, include_extras=True)
277 # Early validation checks
278 if "id" not in fields:
279 raise MissingIdFieldError("Model must have an 'id' field")
281 # Validate model configuration
282 self._validate_model_config(model_type)
284 # Check for deeply nested structures
285 self._check_deeply_nested_structures(fields)
287 # Process each field
288 for field_name, field_type in fields.items():
289 if field_name == "id":
290 continue
292 field_type = self._handle_union_type(field_type)
293 self._process_field(field_name, field_type, self.select)
295 # Check for optional fields
296 self._check_optional_fields(fields)
298 # Emit all collected warnings
299 self.warning_manager.emit_warnings()
300 return self.select
303def from_pydantic(model: Type[BaseModel]) -> list[Any]:
304 """Convert a Pydantic model to a Fluree select query structure.
306 Example:
307 >>> class User(BaseModel):
308 ... id: str
309 ... name: str
310 >>> query = from_pydantic(User)
311 >>> assert query == ["*"]
312 """
313 builder = FlureeSelectBuilder()
314 return builder.build(model)