from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Generic, List, Optional, Union
from weaviate.collections.batch.base import (
_BatchBase,
_BatchDataWrapper,
_BatchMode,
_DynamicBatching,
_FixedSizeBatching,
_RateLimitedBatching,
)
from weaviate.collections.batch.batch_wrapper import _BatchWrapper, _ContextManagerWrapper
from weaviate.collections.classes.config import ConsistencyLevel, Vectorizers
from weaviate.collections.classes.internal import ReferenceInputs, ReferenceInput
from weaviate.collections.classes.types import Properties
from weaviate.connect.v4 import ConnectionSync
from weaviate.exceptions import UnexpectedStatusCodeError
from weaviate.types import UUID, VECTORS
if TYPE_CHECKING:
from weaviate.collections.config import _ConfigCollection
[docs]
class _BatchCollection(Generic[Properties], _BatchBase):
def __init__(
self,
executor: ThreadPoolExecutor,
connection: ConnectionSync,
consistency_level: Optional[ConsistencyLevel],
results: _BatchDataWrapper,
batch_mode: _BatchMode,
name: str,
tenant: Optional[str],
vectorizer_batching: bool,
) -> None:
super().__init__(
connection=connection,
consistency_level=consistency_level,
results=results,
batch_mode=batch_mode,
executor=executor,
vectorizer_batching=vectorizer_batching,
)
self.__name = name
self.__tenant = tenant
[docs]
def add_object(
self,
properties: Optional[Properties] = None,
references: Optional[ReferenceInputs] = None,
uuid: Optional[UUID] = None,
vector: Optional[VECTORS] = None,
) -> UUID:
"""Add one object to this batch.
NOTE: If the UUID of one of the objects already exists then the existing object will be replaced by the new object.
Args:
properties: The data properties of the object to be added as a dictionary.
references: The references of the object to be added as a dictionary.
uuid: The UUID of the object as an uuid.UUID object or str. If it is None an UUIDv4 will generated, by default None
vector: The embedding of the object. Can be used when a collection does not have a vectorization module or the given
vector was generated using the _identical_ vectorization module that is configured for the class. In this
case this vector takes precedence. Supported types are:
- for single vectors: `list`, 'numpy.ndarray`, `torch.Tensor` and `tf.Tensor`, by default None.
- for named vectors: Dict[str, *list above*], where the string is the name of the vector.
Returns:
The UUID of the added object. If one was not provided a UUIDv4 will be auto-generated for you and returned here.
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
return self._add_object(
collection=self.__name,
properties=properties,
references=references,
uuid=uuid,
vector=vector,
tenant=self.__tenant,
)
[docs]
def add_reference(
self, from_uuid: UUID, from_property: str, to: Union[ReferenceInput, List[UUID]]
) -> None:
"""Add a reference to this batch.
Args:
from_uuid: The UUID of the object, as an uuid.UUID object or str, that should reference another object.
from_property: The name of the property that contains the reference.
to: The UUID of the referenced object, as an uuid.UUID object or str, that is actually referenced.
For multi-target references use wvc.Reference.to_multi_target().
Raises:
WeaviateBatchValidationError: If the provided options are in the format required by Weaviate.
"""
self._add_reference(
from_uuid,
self.__name,
from_property,
to,
self.__tenant,
)
BatchCollection = _BatchCollection[Properties]
CollectionBatchingContextManager = _ContextManagerWrapper[BatchCollection[Properties]]
[docs]
class _BatchCollectionWrapper(Generic[Properties], _BatchWrapper):
def __init__(
self,
connection: ConnectionSync,
consistency_level: Optional[ConsistencyLevel],
name: str,
tenant: Optional[str],
config: "_ConfigCollection",
) -> None:
super().__init__(connection, consistency_level)
self.__name = name
self.__tenant = tenant
self.__config = config
self._vectorizer_batching: Optional[bool] = None
self.__executor = ThreadPoolExecutor()
# define one executor per client with it shared between all child batch contexts
def __create_batch_and_reset(self) -> _ContextManagerWrapper[_BatchCollection[Properties]]:
if self._vectorizer_batching is None:
try:
config = self.__config.get(simple=True)
if config.vector_config is not None:
vectorizer_batching = False
for vec_config in config.vector_config.values():
if vec_config.vectorizer.vectorizer is not Vectorizers.NONE:
vectorizer_batching = True
break
self._vectorizer_batching = vectorizer_batching
else:
self._vectorizer_batching = config.vectorizer is not Vectorizers.NONE
except UnexpectedStatusCodeError as e:
# collection does not have to exist if autoschema is enabled. Individual objects will be validated and might fail
if e.status_code != 404:
raise e
self._vectorizer_batching = False
self._batch_data = _BatchDataWrapper() # clear old data
return _ContextManagerWrapper(
_BatchCollection[Properties](
connection=self._connection,
consistency_level=self._consistency_level,
results=self._batch_data,
batch_mode=self._batch_mode,
executor=self.__executor,
name=self.__name,
tenant=self.__tenant,
vectorizer_batching=self._vectorizer_batching,
)
)
[docs]
def dynamic(self) -> CollectionBatchingContextManager[Properties]:
"""Configure dynamic batching.
When you exit the context manager, the final batch will be sent automatically.
"""
self._batch_mode: _BatchMode = _DynamicBatching()
return self.__create_batch_and_reset()
[docs]
def fixed_size(
self, batch_size: int = 100, concurrent_requests: int = 2
) -> CollectionBatchingContextManager[Properties]:
"""Configure fixed size batches. Note that the default is dynamic batching.
When you exit the context manager, the final batch will be sent automatically.
Args:
batch_size: The number of objects/references to be sent in one batch. If not provided, the default value is 100.
concurrent_requests: The number of concurrent requests when sending batches. This controls the number of concurrent requests
made to Weaviate and not the speed of batch creation within Python.
"""
self._batch_mode = _FixedSizeBatching(batch_size, concurrent_requests)
return self.__create_batch_and_reset()
[docs]
def rate_limit(self, requests_per_minute: int) -> CollectionBatchingContextManager[Properties]:
"""Configure batches with a rate limited vectorizer.
When you exit the context manager, the final batch will be sent automatically.
Args:
requests_per_minute: The number of requests that the vectorizer can process per minute.
"""
self._batch_mode = _RateLimitedBatching(requests_per_minute)
return self.__create_batch_and_reset()