from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Optional, Type, Union
from weaviate.collections.batch.base import (
_BatchBase,
_BatchBaseNew,
_BatchDataWrapper,
_DynamicBatching,
_FixedSizeBatching,
_RateLimitedBatching,
_ServerSideBatching,
)
from weaviate.collections.batch.batch_wrapper import (
BatchClientProtocol,
_BatchMode,
_BatchWrapper,
_ContextManagerWrapper,
)
from weaviate.collections.classes.config import ConsistencyLevel, Vectorizers
from weaviate.collections.classes.internal import ReferenceInput, ReferenceInputs
from weaviate.collections.classes.tenants import Tenant
from weaviate.collections.classes.types import WeaviateProperties
from weaviate.connect.v4 import ConnectionSync
from weaviate.exceptions import UnexpectedStatusCodeError, WeaviateUnsupportedFeatureError
from weaviate.types import UUID, VECTORS
if TYPE_CHECKING:
from weaviate.collections.collections.sync import _Collections
[docs]
class _BatchClient(_BatchBase):
[docs]
def add_object(
self,
collection: str,
properties: Optional[WeaviateProperties] = None,
references: Optional[ReferenceInputs] = None,
uuid: Optional[UUID] = None,
vector: Optional[VECTORS] = None,
tenant: Optional[Union[str, Tenant]] = None,
) -> UUID:
"""Add one object to this batch.
NOTE: If the UUID of one of the objects already exists then the existing object will be
replaced by the new object.
Args:
collection: The name of the collection this object belongs to.
properties: The data properties of the object to be added as a dictionary.
references: The references of the object to be added as a dictionary.
uuid: The UUID of the object as an uuid.UUID object or str. It can be a Weaviate beacon or Weaviate href.
If it is None an UUIDv4 will generated, by default None
vector: The embedding of the object. Can be used when a collection does not have a vectorization module or the given
vector was generated using the _identical_ vectorization module that is configured for the class. In this
case this vector takes precedence.
Supported types are:
- for single vectors: `list`, 'numpy.ndarray`, `torch.Tensor` and `tf.Tensor`, by default None.
- for named vectors: Dict[str, *list above*], where the string is the name of the vector.
tenant: The tenant name or Tenant object to be used for this request.
Returns:
The UUID of the added object. If one was not provided a UUIDv4 will be auto-generated for you and returned here.
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
return super()._add_object(
collection=collection,
properties=properties,
references=references,
uuid=uuid,
vector=vector,
tenant=tenant.name if isinstance(tenant, Tenant) else tenant,
)
[docs]
def add_reference(
self,
from_uuid: UUID,
from_collection: str,
from_property: str,
to: ReferenceInput,
tenant: Optional[Union[str, Tenant]] = None,
) -> None:
"""Add one reference to this batch.
Args:
from_uuid: The UUID of the object, as an uuid.UUID object or str, that should reference another object.
from_collection: The name of the collection that should reference another object.
from_property: The name of the property that contains the reference.
to: The UUID of the referenced object, as an uuid.UUID object or str, that is actually referenced.
For multi-target references use wvc.Reference.to_multi_target().
tenant: The tenant name or Tenant object to be used for this request.
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
super()._add_reference(
from_object_uuid=from_uuid,
from_object_collection=from_collection,
from_property_name=from_property,
to=to,
tenant=tenant.name if isinstance(tenant, Tenant) else tenant,
)
[docs]
class _BatchClientNew(_BatchBaseNew):
[docs]
def add_object(
self,
collection: str,
properties: Optional[WeaviateProperties] = None,
references: Optional[ReferenceInputs] = None,
uuid: Optional[UUID] = None,
vector: Optional[VECTORS] = None,
tenant: Optional[Union[str, Tenant]] = None,
) -> UUID:
"""Add one object to this batch.
NOTE: If the UUID of one of the objects already exists then the existing object will be
replaced by the new object.
Args:
collection: The name of the collection this object belongs to.
properties: The data properties of the object to be added as a dictionary.
references: The references of the object to be added as a dictionary.
uuid: The UUID of the object as an uuid.UUID object or str. It can be a Weaviate beacon or Weaviate href.
If it is None an UUIDv4 will generated, by default None
vector: The embedding of the object. Can be used when a collection does not have a vectorization module or the given
vector was generated using the _identical_ vectorization module that is configured for the class. In this
case this vector takes precedence.
Supported types are:
- for single vectors: `list`, 'numpy.ndarray`, `torch.Tensor` and `tf.Tensor`, by default None.
- for named vectors: Dict[str, *list above*], where the string is the name of the vector.
tenant: The tenant name or Tenant object to be used for this request.
Returns:
The UUID of the added object. If one was not provided a UUIDv4 will be auto-generated for you and returned here.
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
return super()._add_object(
collection=collection,
properties=properties,
references=references,
uuid=uuid,
vector=vector,
tenant=tenant.name if isinstance(tenant, Tenant) else tenant,
)
[docs]
def add_reference(
self,
from_uuid: UUID,
from_collection: str,
from_property: str,
to: ReferenceInput,
tenant: Optional[Union[str, Tenant]] = None,
) -> None:
"""Add one reference to this batch.
Args:
from_uuid: The UUID of the object, as an uuid.UUID object or str, that should reference another object.
from_collection: The name of the collection that should reference another object.
from_property: The name of the property that contains the reference.
to: The UUID of the referenced object, as an uuid.UUID object or str, that is actually referenced.
For multi-target references use wvc.Reference.to_multi_target().
tenant: The tenant name or Tenant object to be used for this request.
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
super()._add_reference(
from_object_uuid=from_uuid,
from_object_collection=from_collection,
from_property_name=from_property,
to=to,
tenant=tenant.name if isinstance(tenant, Tenant) else tenant,
)
BatchClient = _BatchClient
BatchClientNew = _BatchClientNew
ClientBatchingContextManager = _ContextManagerWrapper[
Union[BatchClient, BatchClientNew], BatchClientProtocol
]
[docs]
class _BatchClientWrapper(_BatchWrapper):
def __init__(
self,
connection: ConnectionSync,
config: "_Collections",
consistency_level: Optional[ConsistencyLevel],
):
super().__init__(connection, consistency_level)
self.__config = config
self._vectorizer_batching: Optional[bool] = None
self.__executor = ThreadPoolExecutor()
# define one executor per client with it shared between all child batch contexts
def __create_batch_and_reset(
self, batch_client: Union[Type[_BatchClient], Type[_BatchClientNew]]
):
if self._vectorizer_batching is None or not self._vectorizer_batching:
try:
configs = self.__config.list_all(simple=True)
vectorizer_batching = False
for config in configs.values():
if config.vector_config is not None:
vectorizer_batching = False
for vec_config in config.vector_config.values():
if vec_config.vectorizer.vectorizer is not Vectorizers.NONE:
vectorizer_batching = True
break
vectorizer_batching = vectorizer_batching
else:
vectorizer_batching = any(
config.vectorizer_config is not None for config in configs.values()
)
if vectorizer_batching:
break
self._vectorizer_batching = vectorizer_batching
except UnexpectedStatusCodeError as e:
# we might not have the rights to query all collections
if e.status_code != 403:
raise e
self._vectorizer_batching = False
self._batch_data = _BatchDataWrapper() # clear old data
return _ContextManagerWrapper(
batch_client(
connection=self._connection,
consistency_level=self._consistency_level,
results=self._batch_data,
batch_mode=self._batch_mode,
executor=self.__executor,
vectorizer_batching=self._vectorizer_batching,
)
)
[docs]
def dynamic(
self, consistency_level: Optional[ConsistencyLevel] = None
) -> ClientBatchingContextManager:
"""Configure dynamic batching.
When you exit the context manager, the final batch will be sent automatically.
Args:
consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`.
"""
self._batch_mode: _BatchMode = _DynamicBatching()
self._consistency_level = consistency_level
return self.__create_batch_and_reset(_BatchClient)
[docs]
def fixed_size(
self,
batch_size: int = 100,
concurrent_requests: int = 2,
consistency_level: Optional[ConsistencyLevel] = None,
) -> ClientBatchingContextManager:
"""Configure fixed size batches. Note that the default is dynamic batching.
When you exit the context manager, the final batch will be sent automatically.
Args:
batch_size: The number of objects/references to be sent in one batch. If not provided, the default value is 100.
concurrent_requests: The number of concurrent requests when sending batches. This controls the number of concurrent requests
made to Weaviate and not the speed of batch creation within Python.
consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`.
"""
self._batch_mode = _FixedSizeBatching(batch_size, concurrent_requests)
self._consistency_level = consistency_level
return self.__create_batch_and_reset(_BatchClient)
[docs]
def rate_limit(
self,
requests_per_minute: int,
consistency_level: Optional[ConsistencyLevel] = None,
) -> ClientBatchingContextManager:
"""Configure batches with a rate limited vectorizer.
When you exit the context manager, the final batch will be sent automatically.
Args:
requests_per_minute: The number of requests that the vectorizer can process per minute.
consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`.
"""
self._batch_mode = _RateLimitedBatching(requests_per_minute)
self._consistency_level = consistency_level
return self.__create_batch_and_reset(_BatchClient)
[docs]
def experimental(
self,
*,
concurrency: Optional[int] = None,
consistency_level: Optional[ConsistencyLevel] = None,
) -> ClientBatchingContextManager:
"""Configure the batching context manager using the experimental server-side batching mode.
When you exit the context manager, the final batch will be sent automatically.
"""
if self._connection._weaviate_version.is_lower_than(1, 34, 0):
raise WeaviateUnsupportedFeatureError(
"Server-side batching", str(self._connection._weaviate_version), "1.34.0"
)
self._batch_mode = _ServerSideBatching(
# concurrency=concurrency
# if concurrency is not None
# else len(self._cluster.get_nodes_status())
concurrency=1, # hard-code until client-side multi-threading is fixed
)
self._consistency_level = consistency_level
return self.__create_batch_and_reset(_BatchClientNew)