Coverage for src/fluree_py/http/endpoint/transact.py: 86%
42 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 03:03 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 03:03 +0000
1from dataclasses import dataclass, replace
2from typing import Any
4from fluree_py.http.mixin import (
5 CommitableMixin,
6 RequestMixin,
7 WithContextMixin,
8 WithInsertMixin,
9 WithWhereMixin,
10)
11from fluree_py.http.protocol.endpoint import (
12 TransactionBuilder,
13 TransactionReadyToCommit,
14)
15from fluree_py.types.query.where import WhereClause
16from fluree_py.types.common import JsonArray, JsonObject
19@dataclass(frozen=True, kw_only=True)
20class TransactionBuilderImpl(
21 WithContextMixin["TransactionBuilderImpl"],
22 WithInsertMixin["TransactionReadyToCommitImpl"],
23 WithWhereMixin["TransactionBuilderImpl"],
24 TransactionBuilder,
25):
26 """Implementation of a transaction operation builder."""
28 endpoint: str
29 ledger: str
30 context: dict[str, Any] | None = None
31 where: WhereClause | None = None
32 data: JsonObject | JsonArray | None = None
33 delete_data: JsonObject | JsonArray | None = None
35 def with_delete(
36 self, data: JsonObject | JsonArray
37 ) -> "TransactionReadyToCommitImpl":
38 """Add delete operation to the transaction."""
39 updated_fields = self.__dict__.copy()
40 updated_fields["delete_data"] = data
41 return TransactionReadyToCommitImpl(**updated_fields)
44@dataclass(frozen=True, kw_only=True)
45class TransactionReadyToCommitImpl(
46 RequestMixin,
47 WithContextMixin["TransactionReadyToCommitImpl"],
48 WithWhereMixin["TransactionReadyToCommitImpl"],
49 CommitableMixin["TransactionReadyToCommitImpl"],
50 TransactionReadyToCommit,
51):
52 """Implementation of a transaction operation ready to be committed."""
54 endpoint: str
55 ledger: str
56 context: dict[str, Any] | None
57 where: WhereClause | None
58 data: JsonObject | JsonArray | None
59 delete_data: JsonObject | JsonArray | None
61 def with_delete(
62 self, data: JsonObject | JsonArray
63 ) -> "TransactionReadyToCommitImpl":
64 """Add delete operation to the transaction."""
65 return replace(self, delete_data=data)
67 def get_url(self) -> str:
68 """Get the endpoint URL for the transaction operation."""
69 return self.endpoint
71 def build_request_payload(self) -> dict[str, Any]:
72 """Build the request payload for the transaction operation."""
73 result: dict[str, Any] = {}
74 if self.context:
75 result["@context"] = self.context
76 result |= {"ledger": self.ledger}
77 if self.data:
78 result["insert"] = self.data
79 if self.delete_data:
80 result["delete"] = self.delete_data
81 if self.where:
82 result["where"] = self.where
83 return result