Source code for weaviate.collections.batch.client

from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Optional, Type, Union

from typing_extensions import deprecated as typing_deprecated

from weaviate.collections.batch.async_ import _BatchBaseAsync
from weaviate.collections.batch.base import (
    _BatchBase,
    _BatchDataWrapper,
    _DynamicBatching,
    _FixedSizeBatching,
    _RateLimitedBatching,
    _ServerSideBatching,
)
from weaviate.collections.batch.batch_wrapper import (
    BatchClientProtocol,
    BatchClientProtocolAsync,
    _BatchMode,
    _BatchWrapper,
    _BatchWrapperAsync,
    _ContextManagerAsync,
    _ContextManagerSync,
)
from weaviate.collections.batch.sync import _BatchBaseSync
from weaviate.collections.classes.config import ConsistencyLevel, Vectorizers
from weaviate.collections.classes.internal import ReferenceInput, ReferenceInputs
from weaviate.collections.classes.tenants import Tenant
from weaviate.collections.classes.types import WeaviateProperties
from weaviate.connect.v4 import ConnectionAsync, ConnectionSync
from weaviate.exceptions import UnexpectedStatusCodeError, WeaviateUnsupportedFeatureError
from weaviate.types import UUID, VECTORS
from weaviate.util import docstring_deprecated

if TYPE_CHECKING:
    from weaviate.collections.collections.sync import _Collections


[docs] class _BatchClient(_BatchBase):
[docs] def add_object( self, collection: str, properties: Optional[WeaviateProperties] = None, references: Optional[ReferenceInputs] = None, uuid: Optional[UUID] = None, vector: Optional[VECTORS] = None, tenant: Optional[Union[str, Tenant]] = None, ) -> UUID: return super()._add_object( collection=collection, properties=properties, references=references, uuid=uuid, vector=vector, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
[docs] def add_reference( self, from_uuid: UUID, from_collection: str, from_property: str, to: ReferenceInput, tenant: Optional[Union[str, Tenant]] = None, ) -> None: super()._add_reference( from_object_uuid=from_uuid, from_object_collection=from_collection, from_property_name=from_property, to=to, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
[docs] class _BatchClientSync(_BatchBaseSync):
[docs] def add_object( self, collection: str, properties: Optional[WeaviateProperties] = None, references: Optional[ReferenceInputs] = None, uuid: Optional[UUID] = None, vector: Optional[VECTORS] = None, tenant: Optional[Union[str, Tenant]] = None, ) -> UUID: return super()._add_object( collection=collection, properties=properties, references=references, uuid=uuid, vector=vector, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
[docs] def add_reference( self, from_uuid: UUID, from_collection: str, from_property: str, to: ReferenceInput, tenant: Optional[Union[str, Tenant]] = None, ) -> None: super()._add_reference( from_object_uuid=from_uuid, from_object_collection=from_collection, from_property_name=from_property, to=to, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
[docs] class _BatchClientAsync(_BatchBaseAsync):
[docs] async def add_object( self, collection: str, properties: Optional[WeaviateProperties] = None, references: Optional[ReferenceInputs] = None, uuid: Optional[UUID] = None, vector: Optional[VECTORS] = None, tenant: Optional[Union[str, Tenant]] = None, ) -> UUID: return await super()._add_object( collection=collection, properties=properties, references=references, uuid=uuid, vector=vector, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
[docs] async def add_reference( self, from_uuid: UUID, from_collection: str, from_property: str, to: ReferenceInput, tenant: Optional[Union[str, Tenant]] = None, ) -> None: await super()._add_reference( from_object_uuid=from_uuid, from_object_collection=from_collection, from_property_name=from_property, to=to, tenant=tenant.name if isinstance(tenant, Tenant) else tenant, )
BatchClient = _BatchClient BatchClientSync = _BatchClientSync BatchClientAsync = _BatchClientAsync ClientBatchingContextManager = _ContextManagerSync[ Union[BatchClient, BatchClientSync], BatchClientProtocol ] ClientBatchingContextManagerAsync = _ContextManagerAsync[BatchClientProtocolAsync]
[docs] class _BatchClientWrapper(_BatchWrapper): def __init__( self, connection: ConnectionSync, config: "_Collections", consistency_level: Optional[ConsistencyLevel], ): super().__init__(connection, consistency_level) self.__config = config self._vectorizer_batching: Optional[bool] = None self.__executor = ThreadPoolExecutor() # define one executor per client with it shared between all child batch contexts def __create_batch_and_reset( self, batch_client: Union[Type[_BatchClient], Type[_BatchClientSync]] ): if self._vectorizer_batching is None or not self._vectorizer_batching: try: configs = self.__config.list_all(simple=True) vectorizer_batching = False for config in configs.values(): if config.vector_config is not None: vectorizer_batching = False for vec_config in config.vector_config.values(): if vec_config.vectorizer.vectorizer is not Vectorizers.NONE: vectorizer_batching = True break vectorizer_batching = vectorizer_batching else: vectorizer_batching = any( config.vectorizer_config is not None for config in configs.values() ) if vectorizer_batching: break self._vectorizer_batching = vectorizer_batching except UnexpectedStatusCodeError as e: # we might not have the rights to query all collections if e.status_code != 403: raise e self._vectorizer_batching = False self._batch_data = _BatchDataWrapper() # clear old data return _ContextManagerSync( batch_client( connection=self._connection, consistency_level=self._consistency_level, results=self._batch_data, batch_mode=self._batch_mode, executor=self.__executor, vectorizer_batching=self._vectorizer_batching, ) )
[docs] def dynamic( self, consistency_level: Optional[ConsistencyLevel] = None ) -> ClientBatchingContextManager: """Configure dynamic batching. When you exit the context manager, the final batch will be sent automatically. Args: consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`. """ self._batch_mode: _BatchMode = _DynamicBatching() self._consistency_level = consistency_level return self.__create_batch_and_reset(_BatchClient)
[docs] def fixed_size( self, batch_size: int = 100, concurrent_requests: int = 2, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManager: """Configure fixed size batches. Note that the default is dynamic batching. When you exit the context manager, the final batch will be sent automatically. Args: batch_size: The number of objects/references to be sent in one batch. If not provided, the default value is 100. concurrent_requests: The number of concurrent requests when sending batches. This controls the number of concurrent requests made to Weaviate and not the speed of batch creation within Python. consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`. """ self._batch_mode = _FixedSizeBatching(batch_size, concurrent_requests) self._consistency_level = consistency_level return self.__create_batch_and_reset(_BatchClient)
[docs] def rate_limit( self, requests_per_minute: int, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManager: """Configure batches with a rate limited vectorizer. When you exit the context manager, the final batch will be sent automatically. Args: requests_per_minute: The number of requests that the vectorizer can process per minute. consistency_level: The consistency level to be used to send batches. If not provided, the default value is `None`. """ self._batch_mode = _RateLimitedBatching(requests_per_minute) self._consistency_level = consistency_level return self.__create_batch_and_reset(_BatchClient)
[docs] @docstring_deprecated( details="Use the 'stream' method instead. This method will be removed in 4.21.0", deprecated_in="4.20.0", ) @typing_deprecated("Use the 'stream' method instead. This method will be removed in 4.21.0") def experimental( self, *, concurrency: Optional[int] = None, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManager: return self.stream(concurrency=concurrency, consistency_level=consistency_level)
[docs] def stream( self, *, concurrency: Optional[int] = None, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManager: """Configure the batching context manager to use batch streaming. When you exit the context manager, the final batch will be sent automatically. Args: concurrency: The number of concurrent streams to use when sending batches. If not provided, the default will be one. consistency_level: The consistency level to be used when inserting data. If not provided, the default value is `None`. """ if self._connection._weaviate_version.is_lower_than(1, 36, 0): raise WeaviateUnsupportedFeatureError( "Server-side batching", str(self._connection._weaviate_version), "1.36.0" ) self._batch_mode = _ServerSideBatching( # concurrency=concurrency # if concurrency is not None # else len(self._cluster.get_nodes_status()) concurrency=1, # hard-code until client-side multi-threading is fixed ) self._consistency_level = consistency_level return self.__create_batch_and_reset(_BatchClientSync)
[docs] class _BatchClientWrapperAsync(_BatchWrapperAsync): def __init__( self, connection: ConnectionAsync, ): super().__init__(connection, None) self._vectorizer_batching: Optional[bool] = None def __create_batch_and_reset(self): self._batch_data = _BatchDataWrapper() # clear old data return _ContextManagerAsync( BatchClientAsync( connection=self._connection, consistency_level=self._consistency_level, results=self._batch_data, ) )
[docs] @docstring_deprecated( details="Use the 'stream' method instead. This method will be removed in 4.21.0", deprecated_in="4.20.0", ) @typing_deprecated("Use the 'stream' method instead. This method will be removed in 4.21.0") def experimental( self, *, concurrency: Optional[int] = None, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManagerAsync: return self.stream(concurrency=concurrency, consistency_level=consistency_level)
[docs] def stream( self, *, concurrency: Optional[int] = None, consistency_level: Optional[ConsistencyLevel] = None, ) -> ClientBatchingContextManagerAsync: """Configure the batching context manager to use batch streaming. When you exit the context manager, the final batch will be sent automatically. Args: concurrency: The number of concurrent streams to use when sending batches. If not provided, the default will be one. consistency_level: The consistency level to be used when inserting data. If not provided, the default value is `None`. """ if self._connection._weaviate_version.is_lower_than(1, 36, 0): raise WeaviateUnsupportedFeatureError( "Server-side batching", str(self._connection._weaviate_version), "1.36.0" ) self._batch_mode = _ServerSideBatching( # concurrency=concurrency # if concurrency is not None # else len(self._cluster.get_nodes_status()) concurrency=1, # hard-code until client-side multi-threading is fixed ) self._consistency_level = consistency_level return self.__create_batch_and_reset()