Coverage for src/fluree_py/http/endpoint/transact.py: 86%

42 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-02 03:03 +0000

1from dataclasses import dataclass, replace 

2from typing import Any 

3 

4from fluree_py.http.mixin import ( 

5 CommitableMixin, 

6 RequestMixin, 

7 WithContextMixin, 

8 WithInsertMixin, 

9 WithWhereMixin, 

10) 

11from fluree_py.http.protocol.endpoint import ( 

12 TransactionBuilder, 

13 TransactionReadyToCommit, 

14) 

15from fluree_py.types.query.where import WhereClause 

16from fluree_py.types.common import JsonArray, JsonObject 

17 

18 

19@dataclass(frozen=True, kw_only=True) 

20class TransactionBuilderImpl( 

21 WithContextMixin["TransactionBuilderImpl"], 

22 WithInsertMixin["TransactionReadyToCommitImpl"], 

23 WithWhereMixin["TransactionBuilderImpl"], 

24 TransactionBuilder, 

25): 

26 """Implementation of a transaction operation builder.""" 

27 

28 endpoint: str 

29 ledger: str 

30 context: dict[str, Any] | None = None 

31 where: WhereClause | None = None 

32 data: JsonObject | JsonArray | None = None 

33 delete_data: JsonObject | JsonArray | None = None 

34 

35 def with_delete( 

36 self, data: JsonObject | JsonArray 

37 ) -> "TransactionReadyToCommitImpl": 

38 """Add delete operation to the transaction.""" 

39 updated_fields = self.__dict__.copy() 

40 updated_fields["delete_data"] = data 

41 return TransactionReadyToCommitImpl(**updated_fields) 

42 

43 

44@dataclass(frozen=True, kw_only=True) 

45class TransactionReadyToCommitImpl( 

46 RequestMixin, 

47 WithContextMixin["TransactionReadyToCommitImpl"], 

48 WithWhereMixin["TransactionReadyToCommitImpl"], 

49 CommitableMixin["TransactionReadyToCommitImpl"], 

50 TransactionReadyToCommit, 

51): 

52 """Implementation of a transaction operation ready to be committed.""" 

53 

54 endpoint: str 

55 ledger: str 

56 context: dict[str, Any] | None 

57 where: WhereClause | None 

58 data: JsonObject | JsonArray | None 

59 delete_data: JsonObject | JsonArray | None 

60 

61 def with_delete( 

62 self, data: JsonObject | JsonArray 

63 ) -> "TransactionReadyToCommitImpl": 

64 """Add delete operation to the transaction.""" 

65 return replace(self, delete_data=data) 

66 

67 def get_url(self) -> str: 

68 """Get the endpoint URL for the transaction operation.""" 

69 return self.endpoint 

70 

71 def build_request_payload(self) -> dict[str, Any]: 

72 """Build the request payload for the transaction operation.""" 

73 result: dict[str, Any] = {} 

74 if self.context: 

75 result["@context"] = self.context 

76 result |= {"ledger": self.ledger} 

77 if self.data: 

78 result["insert"] = self.data 

79 if self.delete_data: 

80 result["delete"] = self.delete_data 

81 if self.where: 

82 result["where"] = self.where 

83 return result