Coverage for src/fluree_py/http/endpoint/create.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-02 03:03 +0000

1from dataclasses import dataclass 

2from typing import Any 

3 

4from fluree_py.http.mixin import ( 

5 CommitableMixin, 

6 RequestMixin, 

7 WithContextMixin, 

8 WithInsertMixin, 

9) 

10from fluree_py.http.protocol.endpoint.create import CreateBuilder, CreateReadyToCommit 

11from fluree_py.types.common import JsonArray, JsonObject 

12 

13 

14@dataclass(frozen=True, kw_only=True) 

15class CreateReadyToCommitImpl( 

16 RequestMixin, 

17 WithContextMixin["CreateReadyToCommitImpl"], 

18 WithInsertMixin["CreateReadyToCommitImpl"], 

19 CommitableMixin["CreateReadyToCommitImpl"], 

20 CreateReadyToCommit, 

21): 

22 """Implementation of a create operation ready to be committed.""" 

23 

24 endpoint: str 

25 ledger: str 

26 data: JsonObject | JsonArray | None 

27 context: dict[str, Any] | None = None 

28 

29 def get_url(self) -> str: 

30 """Get the endpoint URL for the create operation.""" 

31 return self.endpoint 

32 

33 def build_request_payload(self) -> dict[str, Any]: 

34 """Build the request payload for the create operation.""" 

35 result: dict[str, Any] = {} 

36 if self.context: 

37 result["@context"] = self.context 

38 result |= {"ledger": self.ledger, "insert": self.data} 

39 return result 

40 

41 

42@dataclass(frozen=True, kw_only=True) 

43class CreateBuilderImpl( 

44 WithContextMixin["CreateBuilderImpl"], 

45 WithInsertMixin[CreateReadyToCommitImpl], 

46 CreateBuilder, 

47): 

48 """Implementation of a create operation builder.""" 

49 

50 endpoint: str 

51 ledger: str 

52 data: JsonObject | JsonArray | None = None 

53 context: dict[str, Any] | None = None