Coverage for src/fluree_py/query/select/pydantic/builder.py: 90%

136 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-02 03:03 +0000

1from dataclasses import dataclass, field 

2from types import UnionType 

3from typing import ( 

4 Any, 

5 Protocol, 

6 Type, 

7 TypeAlias, 

8 TypeVar, 

9 Union, 

10 get_args, 

11 get_origin, 

12 get_type_hints, 

13 runtime_checkable, 

14) 

15 

16from pydantic import BaseModel, ConfigDict 

17 

18from fluree_py.query.select.pydantic.type_checker import TypeChecker 

19from fluree_py.query.select.pydantic.warning_manager import WarningManager 

20from fluree_py.query.select.pydantic.warning import ( 

21 ListOrderWarning, 

22 PossibleEmptyModelWarning, 

23) 

24from fluree_py.query.select.pydantic.error import ( 

25 DeeplyNestedStructureError, 

26 FlureeSelectError, 

27 InvalidFieldTypeError, 

28 ModelConfigError, 

29 MissingIdFieldError, 

30 TypeProcessingError, 

31) 

32 

33 

34T = TypeVar("T", bound=BaseModel) 

35FieldType: TypeAlias = Union[ 

36 str, 

37 int, 

38 float, 

39 bool, 

40 list[Any], 

41 dict[str, Any], 

42 type[BaseModel], 

43 None, 

44] # More specific type for field types in Pydantic models 

45 

46 

47@runtime_checkable 

48class HasModelConfig(Protocol): 

49 """Protocol for objects that have a model_config attribute.""" 

50 

51 model_config: ConfigDict 

52 

53 

54@dataclass 

55class FlureeSelectBuilder: 

56 """Builds Fluree select queries from Pydantic models. 

57 

58 Example: 

59 >>> class User(BaseModel): 

60 ... id: str 

61 ... name: str 

62 >>> builder = FlureeSelectBuilder() 

63 >>> query = builder.build(User) 

64 >>> assert query == ["*"] 

65 """ 

66 

67 warning_manager: WarningManager = field(default_factory=WarningManager) 

68 select: list[Any] = field(default_factory=lambda: ["*"]) 

69 _processed_models: set[Type[BaseModel]] = field(default_factory=set) 

70 

71 def _validate_model_config(self, model: Type[BaseModel]) -> None: 

72 """Validate the model configuration. 

73 

74 Raises: 

75 ModelConfigError: If the model configuration is invalid 

76 """ 

77 if not TypeChecker.has_model_config(model): 

78 return 

79 

80 try: 

81 extra = model.model_config.get("extra", "ignore") 

82 if extra not in ("allow", "ignore", "forbid"): 

83 raise ModelConfigError( 

84 f"Invalid 'extra' configuration value: {extra}. " 

85 "Must be one of: 'allow', 'ignore', 'forbid'" 

86 ) 

87 except Exception as e: 

88 if not isinstance(e, FlureeSelectError): 

89 raise ModelConfigError( 

90 f"Error validating model configuration for {model.__name__}: {str(e)}" 

91 ) from e 

92 raise 

93 

94 def _process_nested_model( 

95 self, 

96 field_name: str, 

97 field_type: Type[BaseModel], 

98 ) -> dict[str, Any]: 

99 """Process a nested model and return its select structure. 

100 

101 Raises: 

102 MissingIdFieldError: If the nested model requires an id field but doesn't have one 

103 ModelConfigError: If there's an issue with the model configuration 

104 """ 

105 # Prevent infinite recursion 

106 if field_type in self._processed_models: 

107 return {field_name: ["*"]} 

108 self._processed_models.add(field_type) 

109 

110 # Validate model configuration 

111 self._validate_model_config(field_type) 

112 

113 if TypeChecker.check_model_requires_id( 

114 field_type 

115 ) and not TypeChecker.check_model_has_id(field_type): 

116 raise MissingIdFieldError( 

117 f"Nested model '{field_type.__name__}' must have an 'id' field" 

118 ) 

119 

120 fields = get_type_hints(field_type, include_extras=True) 

121 select = ["*"] 

122 

123 for nested_field_name, nested_field_type in fields.items(): 

124 if TypeChecker.is_id_field(nested_field_name): 

125 continue 

126 

127 real_type = TypeChecker.get_real_type(nested_field_type) 

128 self._process_field(nested_field_name, real_type, select) 

129 

130 return {field_name: select} 

131 

132 def _process_field( 

133 self, field_name: str, field_type: Any, select: list[Any] 

134 ) -> None: 

135 """Process a field and add its select structure to the result. 

136 

137 Raises: 

138 InvalidFieldTypeError: If the field type is not supported 

139 TypeProcessingError: If there's an error processing the field type 

140 """ 

141 try: 

142 match field_type: 

143 case t if TypeChecker.is_list_type(t): 

144 self.warning_manager.add_warning( 

145 ListOrderWarning, 

146 f"Field '{field_name}' is a list type. Order will be non-deterministic.", 

147 ) 

148 args = get_args(t) 

149 if args: 

150 inner_type = TypeChecker.get_real_type(args[0]) 

151 if TypeChecker.is_base_model(inner_type): 

152 select.append( 

153 self._process_nested_model(field_name, inner_type) 

154 ) 

155 elif TypeChecker.is_dict_type(inner_type): 

156 select.append({field_name: ["*"]}) 

157 return 

158 

159 case t if TypeChecker.is_base_model(t): 

160 select.append(self._process_nested_model(field_name, t)) 

161 return 

162 

163 case t if TypeChecker.is_dict_type(t): 

164 select.append({field_name: ["*"]}) 

165 return 

166 

167 case t if TypeChecker.is_primitive_type(t): 

168 # Primitive types are included in "*" so we don't need to add them explicitly 

169 return 

170 

171 case _: 

172 raise InvalidFieldTypeError( 

173 f"Unsupported field type for '{field_name}': {field_type}" 

174 ) 

175 except Exception as e: 

176 if not isinstance(e, FlureeSelectError): 

177 raise TypeProcessingError( 

178 f"Error processing field '{field_name}' of type {field_type}: {str(e)}" 

179 ) from e 

180 raise 

181 

182 def _handle_union_type(self, field_type: Any) -> Any: 

183 """Handle Union types by extracting the first non-None type.""" 

184 origin = get_origin(field_type) 

185 if origin is UnionType: 

186 types = [t for t in get_args(field_type) if t is not type(None)] # noqa: E721 

187 if types: 

188 return types[0] 

189 return field_type 

190 

191 def _check_deeply_nested_structures(self, fields: dict[str, Any]) -> None: 

192 """Check for deeply nested structures that are not supported. 

193 

194 Raises: 

195 DeeplyNestedStructureError: If a deeply nested structure is found 

196 """ 

197 for field_name, field_type in fields.items(): 

198 try: 

199 match field_type: 

200 case t if TypeChecker.is_dict_type(t): 

201 if TypeChecker.dict_max_depth(t) > 1: 

202 raise DeeplyNestedStructureError( 

203 f"Deeply nested dictionaries are not supported in field '{field_name}'" 

204 ) 

205 

206 case t if TypeChecker.is_list_type(t): 

207 args = get_args(t) 

208 if args: 

209 inner_type = args[0] 

210 if TypeChecker.is_tuple_type(inner_type): 

211 raise DeeplyNestedStructureError( 

212 f"Tuples are not supported in field '{field_name}'" 

213 ) 

214 if TypeChecker.dict_max_depth(inner_type) > 1: 

215 raise DeeplyNestedStructureError( 

216 f"Deeply nested dictionaries are not supported in field '{field_name}'" 

217 ) 

218 case _: 

219 pass 

220 except Exception as e: 

221 if not isinstance(e, FlureeSelectError): 

222 raise TypeProcessingError( 

223 f"Error checking nested structure for field '{field_name}': {str(e)}" 

224 ) from e 

225 raise 

226 

227 def _check_optional_fields(self, fields: dict[str, Any]) -> None: 

228 """Check for optional fields and add appropriate warnings.""" 

229 non_id_fields = [f for f in fields if not TypeChecker.is_id_field(f)] 

230 if not non_id_fields: 

231 return 

232 

233 all_optional = True 

234 for field_name in non_id_fields: 

235 field_type = fields[field_name] 

236 real_type = TypeChecker.get_real_type(field_type) 

237 

238 if TypeChecker.is_list_type(real_type) or TypeChecker.is_base_model( 

239 real_type 

240 ): 

241 all_optional = False 

242 break 

243 

244 if all_optional: 

245 self.warning_manager.add_warning( 

246 PossibleEmptyModelWarning, 

247 "Model has only optional fields. This may result in an empty model when inserting.", 

248 ) 

249 elif len(non_id_fields) == 1: 

250 field_name = non_id_fields[0] 

251 field_type = fields[field_name] 

252 real_type = TypeChecker.get_real_type(field_type) 

253 if not ( 

254 TypeChecker.is_list_type(real_type) 

255 or TypeChecker.is_base_model(real_type) 

256 ): 

257 self.warning_manager.add_warning( 

258 PossibleEmptyModelWarning, 

259 f"Model has a single optional field '{field_name}'. This may result in an empty model when inserting.", 

260 ) 

261 

262 def build(self, pydantic_model: Type[BaseModel]) -> list[Any]: 

263 """Build a Fluree select query structure from a Pydantic model. 

264 

265 Raises: 

266 MissingIdFieldError: If the model is missing a required 'id' field 

267 DeeplyNestedStructureError: If the model contains unsupported deeply nested structures 

268 ModelConfigError: If there's an issue with the model configuration 

269 """ 

270 model_type = ( 

271 type(pydantic_model) 

272 if isinstance(pydantic_model, BaseModel) 

273 else pydantic_model 

274 ) 

275 fields = get_type_hints(model_type, include_extras=True) 

276 

277 # Early validation checks 

278 if "id" not in fields: 

279 raise MissingIdFieldError("Model must have an 'id' field") 

280 

281 # Validate model configuration 

282 self._validate_model_config(model_type) 

283 

284 # Check for deeply nested structures 

285 self._check_deeply_nested_structures(fields) 

286 

287 # Process each field 

288 for field_name, field_type in fields.items(): 

289 if field_name == "id": 

290 continue 

291 

292 field_type = self._handle_union_type(field_type) 

293 self._process_field(field_name, field_type, self.select) 

294 

295 # Check for optional fields 

296 self._check_optional_fields(fields) 

297 

298 # Emit all collected warnings 

299 self.warning_manager.emit_warnings() 

300 return self.select 

301 

302 

303def from_pydantic(model: Type[BaseModel]) -> list[Any]: 

304 """Convert a Pydantic model to a Fluree select query structure. 

305 

306 Example: 

307 >>> class User(BaseModel): 

308 ... id: str 

309 ... name: str 

310 >>> query = from_pydantic(User) 

311 >>> assert query == ["*"] 

312 """ 

313 builder = FlureeSelectBuilder() 

314 return builder.build(model)