from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Sequence, TypedDict, Union
from pydantic import BaseModel
from weaviate.cluster.types import Verbosity
[docs]
class PermissionData(TypedDict):
collection: str
object: str # noqa: A003
tenant: str
[docs]
class PermissionCollections(TypedDict):
collection: str
tenant: str
[docs]
class PermissionsTenants(TypedDict):
collection: str
tenant: str
[docs]
class PermissionNodes(TypedDict):
collection: str
verbosity: Verbosity
[docs]
class PermissionBackup(TypedDict):
collection: str
[docs]
class PermissionRoles(TypedDict):
role: str
# action is always present in WeaviatePermission
[docs]
class WeaviatePermissionRequired(TypedDict):
action: str
[docs]
class WeaviatePermission(
WeaviatePermissionRequired, total=False
): # Add total=False to make all fields optional
data: Optional[PermissionData]
collections: Optional[PermissionCollections]
nodes: Optional[PermissionNodes]
backups: Optional[PermissionBackup]
roles: Optional[PermissionRoles]
tenants: Optional[PermissionsTenants]
[docs]
class WeaviateRole(TypedDict):
name: str
permissions: List[WeaviatePermission]
class _Action:
pass
[docs]
class CollectionsAction(str, _Action, Enum):
CREATE = "create_collections"
READ = "read_collections"
UPDATE = "update_collections"
DELETE = "delete_collections"
MANAGE = "manage_collections"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in CollectionsAction]
[docs]
class TenantsAction(str, _Action, Enum):
CREATE = "create_tenants"
READ = "read_tenants"
UPDATE = "update_tenants"
DELETE = "delete_tenants"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in TenantsAction]
[docs]
class DataAction(str, _Action, Enum):
CREATE = "create_data"
READ = "read_data"
UPDATE = "update_data"
DELETE = "delete_data"
MANAGE = "manage_data"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in DataAction]
[docs]
class RolesAction(str, _Action, Enum):
MANAGE = "manage_roles"
READ = "read_roles"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in RolesAction]
[docs]
class UsersAction(str, _Action, Enum):
MANAGE = "manage_users"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in UsersAction]
[docs]
class ClusterAction(str, _Action, Enum):
READ = "read_cluster"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in ClusterAction]
[docs]
class NodesAction(str, _Action, Enum):
READ = "read_nodes"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in NodesAction]
[docs]
class BackupsAction(str, _Action, Enum):
MANAGE = "manage_backups"
[docs]
@staticmethod
def values() -> List[str]:
return [action.value for action in BackupsAction]
class _Permission(BaseModel):
@abstractmethod
def _to_weaviate(self) -> WeaviatePermission:
raise NotImplementedError()
class _CollectionsPermission(_Permission):
collection: str
tenant: str
action: CollectionsAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"collections": {
"collection": self.collection,
"tenant": self.tenant,
},
}
[docs]
class TenantsPermission(_Permission):
collection: str
action: TenantsAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"tenants": {
"collection": self.collection,
"tenant": "*",
},
}
class _NodesPermission(_Permission):
verbosity: Verbosity
collection: str
action: NodesAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"nodes": {
"collection": self.collection,
"verbosity": self.verbosity,
},
}
class _RolesPermission(_Permission):
role: str
action: RolesAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"roles": {
"role": self.role,
},
}
class _UsersPermission(_Permission):
action: UsersAction
def _to_weaviate(self) -> WeaviatePermission:
return {"action": self.action}
class _BackupsPermission(_Permission):
collection: str
action: BackupsAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"backups": {
"collection": self.collection,
},
}
class _ClusterPermission(_Permission):
action: ClusterAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
}
class _DataPermission(_Permission):
collection: str
tenant: str
object_: str
action: DataAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"data": {
"collection": self.collection,
"object": self.object_,
"tenant": self.tenant,
},
}
[docs]
@dataclass
class CollectionsPermission:
collection: str
action: CollectionsAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"collections": {
"collection": self.collection,
"tenant": "*",
},
}
[docs]
@dataclass
class DataPermission:
collection: str
action: DataAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"data": {
"collection": self.collection,
"object": "*",
"tenant": "*",
},
}
[docs]
@dataclass
class RolesPermission:
role: str
action: RolesAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"roles": {
"role": self.role,
},
}
[docs]
@dataclass
class UsersPermission:
action: UsersAction
def _to_weaviate(self) -> WeaviatePermission:
return {"action": self.action}
[docs]
@dataclass
class ClusterPermission:
action: ClusterAction
def _to_weaviate(self) -> WeaviatePermission:
return {"action": self.action}
[docs]
@dataclass
class BackupsPermission:
collection: str
action: BackupsAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"backups": {
"collection": self.collection,
},
}
[docs]
@dataclass
class NodesPermission:
collection: Optional[str]
verbosity: Verbosity
action: NodesAction
def _to_weaviate(self) -> WeaviatePermission:
return {
"action": self.action,
"nodes": {
"collection": self.collection or "*",
"verbosity": self.verbosity,
},
}
PermissionsOutputType = Union[
ClusterPermission,
CollectionsPermission,
DataPermission,
RolesPermission,
UsersPermission,
BackupsPermission,
NodesPermission,
TenantsPermission,
]
[docs]
@dataclass
class Role:
name: str
cluster_permissions: List[ClusterPermission]
collections_permissions: List[CollectionsPermission]
data_permissions: List[DataPermission]
roles_permissions: List[RolesPermission]
users_permissions: List[UsersPermission]
backups_permissions: List[BackupsPermission]
nodes_permissions: List[NodesPermission]
tenants_permissions: List[TenantsPermission]
@property
def permissions(self) -> List[PermissionsOutputType]:
permissions: List[PermissionsOutputType] = []
permissions.extend(self.cluster_permissions)
permissions.extend(self.collections_permissions)
permissions.extend(self.data_permissions)
permissions.extend(self.roles_permissions)
permissions.extend(self.users_permissions)
permissions.extend(self.backups_permissions)
permissions.extend(self.nodes_permissions)
permissions.extend(self.tenants_permissions)
return permissions
@classmethod
def _from_weaviate_role(cls, role: WeaviateRole) -> "Role":
cluster_permissions: List[ClusterPermission] = []
users_permissions: List[UsersPermission] = []
collections_permissions: List[CollectionsPermission] = []
roles_permissions: List[RolesPermission] = []
data_permissions: List[DataPermission] = []
backups_permissions: List[BackupsPermission] = []
nodes_permissions: List[NodesPermission] = []
tenants_permissions: List[TenantsPermission] = []
for permission in role["permissions"]:
if permission["action"] in ClusterAction.values():
cluster_permissions.append(
ClusterPermission(action=ClusterAction(permission["action"]))
)
elif permission["action"] in UsersAction.values():
users_permissions.append(UsersPermission(action=UsersAction(permission["action"])))
elif permission["action"] in CollectionsAction.values():
collections = permission.get("collections")
if collections is not None:
collections_permissions.append(
CollectionsPermission(
collection=collections["collection"],
action=CollectionsAction(permission["action"]),
)
)
elif permission["action"] in TenantsAction.values():
tenants = permission.get("tenants")
if tenants is not None:
tenants_permissions.append(
TenantsPermission(
collection=tenants["collection"],
action=TenantsAction(permission["action"]),
)
)
elif permission["action"] in RolesAction.values():
roles = permission.get("roles")
if roles is not None:
roles_permissions.append(
RolesPermission(
role=roles["role"], action=RolesAction(permission["action"])
)
)
elif permission["action"] in DataAction.values():
data = permission.get("data")
if data is not None:
data_permissions.append(
DataPermission(
collection=data["collection"],
action=DataAction(permission["action"]),
)
)
elif permission["action"] in BackupsAction.values():
backups = permission.get("backups")
if backups is not None:
backups_permissions.append(
BackupsPermission(
collection=backups["collection"],
action=BackupsAction(permission["action"]),
)
)
elif permission["action"] in NodesAction.values():
nodes = permission.get("nodes")
if nodes is not None:
nodes_permissions.append(
NodesPermission(
collection=nodes.get("collection"),
verbosity=nodes["verbosity"],
action=NodesAction(permission["action"]),
)
)
else:
raise ValueError(
f"The actions of role {role['name']} are mixed between levels somehow!"
)
return cls(
name=role["name"],
cluster_permissions=cluster_permissions,
users_permissions=users_permissions,
collections_permissions=collections_permissions,
roles_permissions=roles_permissions,
data_permissions=data_permissions,
backups_permissions=backups_permissions,
nodes_permissions=nodes_permissions,
tenants_permissions=tenants_permissions,
)
[docs]
@dataclass
class User:
name: str
ActionsType = Union[_Action, Sequence[_Action]]
PermissionsInputType = Union[
_Permission,
Sequence[_Permission],
Sequence[Sequence[_Permission]],
Sequence[Union[_Permission, Sequence[_Permission]]],
]
PermissionsCreateType = List[_Permission]
class _DataFactory:
@staticmethod
def create(*, collection: str) -> _DataPermission:
return _DataPermission(
collection=collection, tenant="*", object_="*", action=DataAction.CREATE
)
@staticmethod
def read(*, collection: str) -> _DataPermission:
return _DataPermission(
collection=collection, tenant="*", object_="*", action=DataAction.READ
)
@staticmethod
def update(*, collection: str) -> _DataPermission:
return _DataPermission(
collection=collection, tenant="*", object_="*", action=DataAction.UPDATE
)
@staticmethod
def delete(*, collection: str) -> _DataPermission:
return _DataPermission(
collection=collection, tenant="*", object_="*", action=DataAction.DELETE
)
class _CollectionsFactory:
@staticmethod
def create(*, collection: Optional[str] = None) -> _CollectionsPermission:
return _CollectionsPermission(
collection=collection or "*", tenant="*", action=CollectionsAction.CREATE
)
@staticmethod
def read(*, collection: Optional[str] = None) -> _CollectionsPermission:
return _CollectionsPermission(
collection=collection or "*", tenant="*", action=CollectionsAction.READ
)
@staticmethod
def update(*, collection: Optional[str] = None) -> _CollectionsPermission:
return _CollectionsPermission(
collection=collection or "*", tenant="*", action=CollectionsAction.UPDATE
)
@staticmethod
def delete(*, collection: Optional[str] = None) -> _CollectionsPermission:
return _CollectionsPermission(
collection=collection or "*", tenant="*", action=CollectionsAction.DELETE
)
class _TenantsFactory:
@staticmethod
def create(*, collection: Optional[str] = None) -> TenantsPermission:
return TenantsPermission(collection=collection or "*", action=TenantsAction.CREATE)
@staticmethod
def read(*, collection: Optional[str] = None) -> TenantsPermission:
return TenantsPermission(collection=collection or "*", action=TenantsAction.READ)
@staticmethod
def update(*, collection: Optional[str] = None) -> TenantsPermission:
return TenantsPermission(collection=collection or "*", action=TenantsAction.UPDATE)
@staticmethod
def delete(*, collection: Optional[str] = None) -> TenantsPermission:
return TenantsPermission(collection=collection or "*", action=TenantsAction.DELETE)
class _RolesFactory:
@staticmethod
def manage(*, role: Optional[str] = None) -> _RolesPermission:
return _RolesPermission(role=role or "*", action=RolesAction.MANAGE)
@staticmethod
def read(*, role: Optional[str] = None) -> _RolesPermission:
return _RolesPermission(role=role or "*", action=RolesAction.READ)
class _UsersFactory:
@staticmethod
def manage() -> _UsersPermission:
return _UsersPermission(action=UsersAction.MANAGE)
class _ClusterFactory:
@staticmethod
def read() -> _ClusterPermission:
return _ClusterPermission(action=ClusterAction.READ)
class _NodesFactory:
@staticmethod
def read(
*, collection: Optional[str] = None, verbosity: Verbosity = "minimal"
) -> _NodesPermission:
return _NodesPermission(
collection=collection or "*", action=NodesAction.READ, verbosity=verbosity
)
class _BackupsFactory:
@staticmethod
def manage(*, collection: Optional[str] = None) -> _BackupsPermission:
return _BackupsPermission(collection=collection or "*", action=BackupsAction.MANAGE)
[docs]
class Actions:
Data = DataAction
Collections = CollectionsAction
Roles = RolesAction
Cluster = ClusterAction
Nodes = NodesAction
Backups = BackupsAction
Tenants = TenantsAction
[docs]
class Permissions:
[docs]
@staticmethod
def data(
*,
collection: Union[str, Sequence[str]],
create: bool = False,
read: bool = False,
update: bool = False,
delete: bool = False,
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(collection, str):
collection = [collection]
for c in collection:
if create:
permissions.append(_DataFactory.create(collection=c))
if read:
permissions.append(_DataFactory.read(collection=c))
if update:
permissions.append(_DataFactory.update(collection=c))
if delete:
permissions.append(_DataFactory.delete(collection=c))
return permissions
[docs]
@staticmethod
def collections(
*,
collection: Union[str, Sequence[str]],
create_collection: bool = False,
read_config: bool = False,
update_config: bool = False,
delete_collection: bool = False,
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(collection, str):
collection = [collection]
for c in collection:
if create_collection:
permissions.append(_CollectionsFactory.create(collection=c))
if read_config:
permissions.append(_CollectionsFactory.read(collection=c))
if update_config:
permissions.append(_CollectionsFactory.update(collection=c))
if delete_collection:
permissions.append(_CollectionsFactory.delete(collection=c))
return permissions
[docs]
@staticmethod
def tenants(
*,
collection: Union[str, Sequence[str]],
create: bool = False,
read: bool = False,
update: bool = False,
delete: bool = False,
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(collection, str):
collection = [collection]
for c in collection:
if create:
permissions.append(_TenantsFactory.create(collection=c))
if read:
permissions.append(_TenantsFactory.read(collection=c))
if update:
permissions.append(_TenantsFactory.update(collection=c))
if delete:
permissions.append(_TenantsFactory.delete(collection=c))
return permissions
[docs]
@staticmethod
def roles(
*, role: Union[str, Sequence[str]], read: bool = False, manage: bool = False
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(role, str):
role = [role]
for r in role:
if read:
permissions.append(_RolesFactory.read(role=r))
if manage:
permissions.append(_RolesFactory.manage(role=r))
return permissions
[docs]
@staticmethod
def backup(
*, collection: Union[str, Sequence[str]], manage: bool = False
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(collection, str):
collection = [collection]
for c in collection:
if manage:
permissions.append(_BackupsFactory.manage(collection=c))
return permissions
[docs]
@staticmethod
def nodes(
*,
collection: Union[str, Sequence[str]],
verbosity: Verbosity = "minimal",
read: bool = False,
) -> PermissionsCreateType:
permissions: List[_Permission] = []
if isinstance(collection, str):
collection = [collection]
for c in collection:
if read:
permissions.append(_NodesFactory.read(collection=c, verbosity=verbosity))
return permissions
[docs]
@staticmethod
def cluster(*, read: bool = False) -> PermissionsCreateType:
permissions: List[_Permission] = []
if read:
permissions.append(_ClusterFactory.read())
return permissions