import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Generic, List, TypedDict, TypeVar, Union
class ReplicationType(str, Enum):
"""Enum for replication types."""
COPY = "COPY"
MOVE = "MOVE"
class ReplicateOperationState(str, Enum):
"""Enum for replication operation states."""
REGISTERED = "REGISTERED"
HYDRATING = "HYDRATING"
FINALIZING = "FINALIZING"
DEHYDRATING = "DEHYDRATING"
READY = "READY"
CANCELLED = "CANCELLED"
@dataclass
class ReplicateOperationStatus:
"""Class representing the status of a replication operation."""
state: ReplicateOperationState
errors: List[str]
@classmethod
def _from_weaviate(cls, data: dict) -> "ReplicateOperationStatus":
return cls(
state=ReplicateOperationState(data["state"]),
errors=data["errors"] or [],
)
H = TypeVar("H", None, List[ReplicateOperationStatus])
@dataclass
class _ReplicateOperation(Generic[H]):
"""Class representing a replication operation."""
collection: str
shard: str
source_node: str
status: ReplicateOperationStatus
status_history: H
target_node: str
transfer_type: ReplicationType
uuid: uuid.UUID
@staticmethod
def _from_weaviate(
data: dict,
include_history: bool,
):
common = {
"collection": data["collection"],
"shard": data["shard"],
"source_node": data["sourceNode"],
"status": ReplicateOperationStatus._from_weaviate(data["status"]),
"target_node": data["targetNode"],
"transfer_type": ReplicationType(data["type"]),
"uuid": uuid.UUID(data["id"]),
}
if include_history and data["statusHistory"] is not None:
return _ReplicateOperation(
**common,
status_history=[
ReplicateOperationStatus._from_weaviate(status)
for status in data["statusHistory"]
],
)
return _ReplicateOperation(
**common,
status_history=None,
)
ReplicateOperationWithoutHistory = _ReplicateOperation[None]
ReplicateOperationWithHistory = _ReplicateOperation[List[ReplicateOperationStatus]]
ReplicateOperation = Union[ReplicateOperationWithoutHistory, ReplicateOperationWithHistory]
ReplicateOperations = Union[
List[ReplicateOperationWithoutHistory], List[ReplicateOperationWithHistory]
]
class _ReplicationShardReplicas(TypedDict):
shard: str
replicas: List[str]
class _ReplicationShardingState(TypedDict):
collection: str
shards: List[_ReplicationShardReplicas]
class _ReplicationShardingStateResponse(TypedDict):
shardingState: _ReplicationShardingState
[docs]
@dataclass
class ShardReplicas:
"""Class representing a shard replica."""
name: str
replicas: List[str]
[docs]
@staticmethod
def _from_weaviate(data: _ReplicationShardReplicas):
return ShardReplicas(
name=data["shard"],
replicas=data["replicas"],
)
[docs]
@dataclass
class ShardingState:
"""Class representing the sharding state of a collection."""
collection: str
shards: List[ShardReplicas]
[docs]
@staticmethod
def _from_weaviate(data: _ReplicationShardingStateResponse):
ss = data["shardingState"]
return ShardingState(
collection=ss["collection"],
shards=[ShardReplicas._from_weaviate(shard) for shard in ss["shards"]],
)
# --- RAFT cluster statistics ---
[docs]
@dataclass
class RaftConfigurationMember:
"""A member in the RAFT cluster's latest configuration."""
address: str
node_id: str
suffrage: int
[docs]
@staticmethod
def _from_weaviate(data: dict) -> "RaftConfigurationMember":
return RaftConfigurationMember(
address=data["address"],
node_id=data["id"],
suffrage=data["suffrage"],
)
[docs]
@dataclass
class RaftStats:
"""RAFT consensus statistics for a node."""
applied_index: str
commit_index: str
fsm_pending: str
last_contact: str
last_log_index: str
last_log_term: str
last_snapshot_index: str
last_snapshot_term: str
latest_configuration: List[RaftConfigurationMember]
latest_configuration_index: str
num_peers: str
protocol_version: str
protocol_version_max: str
protocol_version_min: str
snapshot_version_max: str
snapshot_version_min: str
state: str
term: str
[docs]
@staticmethod
def _from_weaviate(data: dict) -> "RaftStats":
return RaftStats(
applied_index=data.get("appliedIndex", ""),
commit_index=data.get("commitIndex", ""),
fsm_pending=data.get("fsmPending", ""),
last_contact=data.get("lastContact", ""),
last_log_index=data.get("lastLogIndex", ""),
last_log_term=data.get("lastLogTerm", ""),
last_snapshot_index=data.get("lastSnapshotIndex", ""),
last_snapshot_term=data.get("lastSnapshotTerm", ""),
latest_configuration=[
RaftConfigurationMember._from_weaviate(m)
for m in data.get("latestConfiguration", [])
],
latest_configuration_index=data.get("latestConfigurationIndex", ""),
num_peers=data.get("numPeers", ""),
protocol_version=data.get("protocolVersion", ""),
protocol_version_max=data.get("protocolVersionMax", ""),
protocol_version_min=data.get("protocolVersionMin", ""),
snapshot_version_max=data.get("snapshotVersionMax", ""),
snapshot_version_min=data.get("snapshotVersionMin", ""),
state=data.get("state", ""),
term=data.get("term", ""),
)
[docs]
@dataclass
class NodeStatistics:
"""RAFT cluster statistics for a single node."""
candidates: Dict[str, Any]
db_loaded: bool
initial_last_applied_index: int
is_voter: bool
leader_address: str
leader_id: str
name: str
is_open: bool
raft: RaftStats
ready: bool
status: str
[docs]
@staticmethod
def _from_weaviate(data: dict) -> "NodeStatistics":
return NodeStatistics(
candidates=data.get("candidates", {}),
db_loaded=data.get("dbLoaded", False),
initial_last_applied_index=data.get("initialLastAppliedIndex", 0),
is_voter=data.get("isVoter", False),
leader_address=data.get("leaderAddress", ""),
leader_id=data.get("leaderId", ""),
name=data.get("name", ""),
is_open=data.get("open", False),
raft=RaftStats._from_weaviate(data.get("raft", {})),
ready=data.get("ready", False),
status=data.get("status", ""),
)
[docs]
@dataclass
class ClusterStatistics:
"""Response from GET /v1/cluster/statistics (RAFT cluster statistics)."""
statistics: List[NodeStatistics]
synchronized: bool
[docs]
@staticmethod
def _from_weaviate(data: dict) -> "ClusterStatistics":
return ClusterStatistics(
statistics=[NodeStatistics._from_weaviate(s) for s in data.get("statistics", [])],
synchronized=data.get("synchronized", False),
)