Source code for weaviate.collections.collections.async_

from typing import (
    Optional,
    Type,
)

from weaviate.collections.classes.config import (
    CollectionConfig,
)
from weaviate.collections.classes.internal import References
from weaviate.collections.classes.types import (
    Properties,
)
from weaviate.collections.collection import CollectionAsync
from weaviate.collections.collections.base import _CollectionsBase
from weaviate.connect import executor
from weaviate.connect.v4 import ConnectionAsync


[docs] @executor.wrap("async") class _CollectionsAsync(_CollectionsBase[ConnectionAsync]):
[docs] @executor.no_wrapping def use( self, name: str, data_model_properties: Optional[Type[Properties]] = None, data_model_references: Optional[Type[References]] = None, skip_argument_validation: bool = False, ) -> CollectionAsync[Properties, References]: collection = self._use( name=name, data_model_properties=data_model_properties, data_model_references=data_model_references, skip_argument_validation=skip_argument_validation, ) assert isinstance(collection, CollectionAsync) return collection
[docs] @executor.no_wrapping async def create_from_dict(self, config: dict) -> CollectionAsync: collection = await executor.aresult(self._create_from_dict(config)) assert isinstance(collection, CollectionAsync) return collection
[docs] @executor.no_wrapping async def create_from_config(self, config: CollectionConfig) -> CollectionAsync: collection = await executor.aresult(self._create_from_config(config)) assert isinstance(collection, CollectionAsync) return collection