Source code for weaviate.collections.classes.config_methods

import datetime
from typing import Any, Dict, List, Optional, Union, cast

from weaviate.collections.classes.config import (
    DataType,
    GenerativeSearches,
    PQEncoderDistribution,
    PQEncoderType,
    ReplicationDeletionStrategy,
    Rerankers,
    StopwordsPreset,
    Tokenization,
    VectorDistances,
    VectorFilterStrategy,
    VectorIndexType,
    Vectorizers,
    _BM25Config,
    _BQConfig,
    _CollectionConfig,
    _CollectionConfigSimple,
    _GenerativeConfig,
    _InvertedIndexConfig,
    _MultiTenancyConfig,
    _MultiVectorConfig,
    _MuveraConfig,
    _NamedVectorConfig,
    _NamedVectorizerConfig,
    _NestedProperty,
    _ObjectTTLConfig,
    _PQConfig,
    _PQEncoderConfig,
    _Property,
    _PropertyVectorizerConfig,
    _ReferenceProperty,
    _ReplicationConfig,
    _RerankerConfig,
    _RQConfig,
    _ShardingConfig,
    _SQConfig,
    _StopwordsConfig,
    _VectorIndexConfigDynamic,
    _VectorIndexConfigFlat,
    _VectorIndexConfigHNSW,
    _VectorizerConfig,
)


[docs] def _is_primitive(d_type: str) -> bool: return d_type[0][0].lower() == d_type[0][0]
[docs] def __get_rerank_config(schema: Dict[str, Any]) -> Optional[_RerankerConfig]: if ( len( rerankers := [key for key in schema.get("moduleConfig", {}).keys() if "reranker" in key] ) == 1 ): try: reranker = Rerankers(rerankers[0]) except ValueError: reranker = rerankers[0] return _RerankerConfig( model=schema["moduleConfig"][rerankers[0]], reranker=reranker, ) else: return None
[docs] def __get_generative_config(schema: Dict[str, Any]) -> Optional[_GenerativeConfig]: if ( len( generators := [ key for key in schema.get("moduleConfig", {}).keys() if "generative" in key ] ) == 1 ): try: generative = GenerativeSearches(generators[0]) except ValueError: generative = generators[0] return _GenerativeConfig( generative=generative, model=schema["moduleConfig"][generators[0]], ) else: return None
[docs] def __get_vectorizer_config(schema: Dict[str, Any]) -> Optional[_VectorizerConfig]: if __is_vectorizer_present(schema) is not None and schema.get("vectorizer", "none") != "none": vec_config: Dict[str, Any] = schema["moduleConfig"].pop(schema["vectorizer"]) try: vectorizer = Vectorizers(schema["vectorizer"]) except ValueError: vectorizer = schema["vectorizer"] return _VectorizerConfig( vectorize_collection_name=vec_config.pop("vectorizeClassName", False), model=vec_config, vectorizer=vectorizer, ) else: return None
[docs] def __is_vectorizer_present(schema: Dict[str, Any]) -> bool: # ignore single vectorizer config if named vectors are present if "vectorConfig" in schema: return False return True
[docs] def __get_vector_index_type(schema: Dict[str, Any]) -> Optional[VectorIndexType]: if "vectorIndexType" in schema: return VectorIndexType(schema["vectorIndexType"]) else: return None
[docs] def __get_quantizer_config( config: Dict[str, Any], ) -> Optional[Union[_PQConfig, _BQConfig, _SQConfig, _RQConfig]]: quantizer: Optional[Union[_PQConfig, _BQConfig, _SQConfig, _RQConfig]] = None if "bq" in config and config["bq"]["enabled"]: # values are not present for bq+hnsw quantizer = _BQConfig( cache=config["bq"].get("cache"), rescore_limit=config["bq"].get("rescoreLimit"), ) elif "sq" in config and config["sq"]["enabled"]: # values are not present for bq+hnsw quantizer = _SQConfig( cache=config["sq"].get("cache"), rescore_limit=config["sq"].get("rescoreLimit"), training_limit=config["sq"].get("trainingLimit"), ) elif "pq" in config and config["pq"].get("enabled"): quantizer = _PQConfig( internal_bit_compression=config["pq"].get("bitCompression"), segments=config["pq"].get("segments"), centroids=config["pq"].get("centroids"), training_limit=config["pq"].get("trainingLimit"), encoder=_PQEncoderConfig( type_=PQEncoderType(config["pq"].get("encoder", {}).get("type")), distribution=PQEncoderDistribution( config["pq"].get("encoder", {}).get("distribution") ), ), ) elif "rq" in config and config["rq"].get("enabled"): quantizer = _RQConfig( cache=config["rq"].get("cache"), bits=config["rq"].get("bits"), rescore_limit=config["rq"].get("rescoreLimit"), ) return quantizer
[docs] def __get_multivector_encoding(config: Dict[str, Any]) -> Optional[_MuveraConfig]: return ( None if config.get("muvera") is None or not config.get("muvera", {"enabled": False}).get("enabled") else _MuveraConfig( enabled=config["muvera"]["enabled"], ksim=config["muvera"]["ksim"], dprojections=config["muvera"]["dprojections"], repetitions=config["muvera"]["repetitions"], ) )
[docs] def __get_multivector(config: Dict[str, Any]) -> Optional[_MultiVectorConfig]: return ( None if config.get("multivector") is None or not config.get("multivector", {"enabled": False}).get("enabled") else _MultiVectorConfig( encoding=( None if config["multivector"].get("muvera") is None else __get_multivector_encoding(config["multivector"]) ), aggregation=config["multivector"]["aggregation"], ) )
[docs] def __get_hnsw_config(config: Dict[str, Any]) -> _VectorIndexConfigHNSW: quantizer = __get_quantizer_config(config) return _VectorIndexConfigHNSW( cleanup_interval_seconds=config["cleanupIntervalSeconds"], distance_metric=VectorDistances(config.get("distance")), dynamic_ef_min=config["dynamicEfMin"], dynamic_ef_max=config["dynamicEfMax"], dynamic_ef_factor=config["dynamicEfFactor"], ef=config["ef"], ef_construction=config["efConstruction"], filter_strategy=( VectorFilterStrategy(config["filterStrategy"]) if "filterStrategy" in config else VectorFilterStrategy.SWEEPING ), flat_search_cutoff=config["flatSearchCutoff"], max_connections=config["maxConnections"], quantizer=quantizer, skip=config["skip"], vector_cache_max_objects=config["vectorCacheMaxObjects"], multi_vector=__get_multivector(config), )
[docs] def __get_flat_config(config: Dict[str, Any]) -> _VectorIndexConfigFlat: quantizer = __get_quantizer_config(config) return _VectorIndexConfigFlat( distance_metric=VectorDistances(config["distance"]), quantizer=quantizer, vector_cache_max_objects=config["vectorCacheMaxObjects"], multi_vector=__get_multivector(config), )
[docs] def __get_vector_index_config( schema: Dict[str, Any], ) -> Union[_VectorIndexConfigHNSW, _VectorIndexConfigFlat, _VectorIndexConfigDynamic, None]: if "vectorIndexConfig" not in schema: return None if schema["vectorIndexType"] == "hnsw": return __get_hnsw_config(schema["vectorIndexConfig"]) elif schema["vectorIndexType"] == "flat": return __get_flat_config(schema["vectorIndexConfig"]) elif schema["vectorIndexType"] == "dynamic": return _VectorIndexConfigDynamic( distance_metric=VectorDistances(schema["vectorIndexConfig"]["distance"]), threshold=schema["vectorIndexConfig"].get("threshold"), hnsw=__get_hnsw_config(schema["vectorIndexConfig"]["hnsw"]), flat=__get_flat_config(schema["vectorIndexConfig"]["flat"]), ) else: return None
[docs] def __get_vector_config( schema: Dict[str, Any], simple: bool ) -> Optional[Dict[str, _NamedVectorConfig]]: if "vectorConfig" in schema: named_vectors: Dict[str, _NamedVectorConfig] = {} for name in schema["vectorConfig"]: named_vector = schema["vectorConfig"][name] vectorizer = named_vector["vectorizer"].keys() assert len(vectorizer) == 1 vectorizer_str: str = str(list(vectorizer)[0]) vec_config: Dict[str, Any] = named_vector["vectorizer"][vectorizer_str] if vec_config is None: vec_config = {} props = vec_config.pop("properties", None) vector_index_config = __get_vector_index_config(named_vector) assert vector_index_config is not None try: vec: Union[str, Vectorizers] = Vectorizers(vectorizer_str) except ValueError: vec = vectorizer_str named_vectors[name] = _NamedVectorConfig( vectorizer=_NamedVectorizerConfig( vectorizer=vec, model=vec_config, source_properties=props, ), vector_index_config=vector_index_config, ) return named_vectors else: return None
[docs] def __get_vectorizer(schema: Dict[str, Any]) -> Optional[Union[str, Vectorizers]]: if "vectorConfig" in schema: return None vectorizer = str(schema["vectorizer"]) try: return Vectorizers(vectorizer) except ValueError: return vectorizer
[docs] def _collection_config_simple_from_json(schema: Dict[str, Any]) -> _CollectionConfigSimple: return _CollectionConfigSimple( name=schema["class"], description=schema.get("description"), generative_config=__get_generative_config(schema), object_ttl_config=_get_object_ttl_config(schema), properties=( _properties_from_config(schema) if schema.get("properties") is not None else [] ), references=( _references_from_config(schema) if schema.get("properties") is not None else [] ), reranker_config=__get_rerank_config(schema), vectorizer_config=__get_vectorizer_config(schema), vectorizer=__get_vectorizer(schema), vector_config=__get_vector_config(schema, simple=True), )
[docs] def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig: return _CollectionConfig( name=schema["class"], description=schema.get("description"), generative_config=__get_generative_config(schema), inverted_index_config=_InvertedIndexConfig( bm25=_BM25Config( b=schema["invertedIndexConfig"]["bm25"]["b"], k1=schema["invertedIndexConfig"]["bm25"]["k1"], ), cleanup_interval_seconds=schema["invertedIndexConfig"]["cleanupIntervalSeconds"], index_null_state=cast(dict, schema["invertedIndexConfig"]).get("indexNullState") is True, index_property_length=cast(dict, schema["invertedIndexConfig"]).get( "indexPropertyLength" ) is True, index_timestamps=cast(dict, schema["invertedIndexConfig"]).get("indexTimestamps") is True, stopwords=_StopwordsConfig( preset=StopwordsPreset(schema["invertedIndexConfig"]["stopwords"]["preset"]), additions=schema["invertedIndexConfig"]["stopwords"]["additions"], removals=schema["invertedIndexConfig"]["stopwords"]["removals"], ), ), multi_tenancy_config=_MultiTenancyConfig( enabled=schema.get("multiTenancyConfig", {}).get("enabled", False), auto_tenant_creation=schema.get("multiTenancyConfig", {}).get( "autoTenantCreation", False ), auto_tenant_activation=schema.get("multiTenancyConfig", {}).get( "autoTenantActivation", False ), ), object_ttl_config=_get_object_ttl_config(schema), properties=( _properties_from_config(schema) if schema.get("properties") is not None else [] ), references=( _references_from_config(schema) if schema.get("properties") is not None else [] ), replication_config=_ReplicationConfig( factor=schema["replicationConfig"]["factor"], async_enabled=schema["replicationConfig"].get("asyncEnabled", False), deletion_strategy=( ReplicationDeletionStrategy(schema["replicationConfig"]["deletionStrategy"]) if "deletionStrategy" in schema["replicationConfig"] else ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION ), ), reranker_config=__get_rerank_config(schema), sharding_config=( None if schema.get("multiTenancyConfig", {}).get("enabled", False) else _ShardingConfig( virtual_per_physical=schema["shardingConfig"]["virtualPerPhysical"], desired_count=schema["shardingConfig"]["desiredCount"], actual_count=schema["shardingConfig"]["actualCount"], desired_virtual_count=schema["shardingConfig"]["desiredVirtualCount"], actual_virtual_count=schema["shardingConfig"]["actualVirtualCount"], key=schema["shardingConfig"]["key"], strategy=schema["shardingConfig"]["strategy"], function=schema["shardingConfig"]["function"], ) ), vector_index_config=__get_vector_index_config(schema), vector_index_type=__get_vector_index_type(schema), vectorizer_config=__get_vectorizer_config(schema), vectorizer=__get_vectorizer(schema), vector_config=__get_vector_config(schema, simple=False), )
[docs] def _get_object_ttl_config(schema: Dict[str, Any]) -> Optional[_ObjectTTLConfig]: if "objectTtlConfig" in schema and schema["objectTtlConfig"].get("enabled", False): time_to_live = schema["objectTtlConfig"].get("defaultTtl") if time_to_live is not None and isinstance(time_to_live, int): time_to_live = datetime.timedelta(seconds=time_to_live) delete_on = schema["objectTtlConfig"]["deleteOn"] if delete_on == "_lastUpdateTimeUnix": delete_on = "updateTime" elif delete_on == "_creationTimeUnix": delete_on = "creationTime" return _ObjectTTLConfig( enabled=True, delete_on=delete_on, filter_expired_objects=schema["objectTtlConfig"]["filterExpiredObjects"], time_to_live=time_to_live, ) else: return None
[docs] def _collection_configs_from_json(schema: Dict[str, Any]) -> Dict[str, _CollectionConfig]: configs = { schema["class"]: _collection_config_from_json(schema) for schema in schema["classes"] } return dict(sorted(configs.items()))
[docs] def _collection_configs_simple_from_json( schema: Dict[str, Any], ) -> Dict[str, _CollectionConfigSimple]: configs = { schema["class"]: _collection_config_simple_from_json(schema) for schema in schema["classes"] } return dict(sorted(configs.items()))
[docs] def _nested_properties_from_config(props: List[Dict[str, Any]]) -> List[_NestedProperty]: return [ _NestedProperty( data_type=DataType(prop["dataType"][0]), description=prop.get("description"), index_filterable=prop["indexFilterable"], index_searchable=prop["indexSearchable"], name=prop["name"], nested_properties=( _nested_properties_from_config(prop["nestedProperties"]) if prop.get("nestedProperties") is not None else None ), tokenization=( Tokenization(prop["tokenization"]) if prop.get("tokenization") is not None else None ), ) for prop in props ]
[docs] def _properties_from_config(schema: Dict[str, Any]) -> List[_Property]: return [ _Property( data_type=DataType(prop["dataType"][0]), description=prop.get("description"), index_filterable=prop["indexFilterable"], index_range_filters=prop.get("indexRangeFilters", False), index_searchable=prop["indexSearchable"], name=prop["name"], nested_properties=( _nested_properties_from_config(prop["nestedProperties"]) if prop.get("nestedProperties") is not None else None ), tokenization=( Tokenization(prop["tokenization"]) if prop.get("tokenization") is not None else None ), vectorizer_config=( _PropertyVectorizerConfig( skip=prop["moduleConfig"][schema["vectorizer"]].get("skip", False), vectorize_property_name=prop["moduleConfig"][schema["vectorizer"]].get( "vectorizePropertyName", False ), ) if schema.get("vectorizer", "none") != "none" and prop.get("moduleConfig", None) is not None else None ), vectorizer_configs=( { k: _PropertyVectorizerConfig( skip=v.get("skip", False), vectorize_property_name=v.get("vectorizePropertyName", False), ) for k, v in prop.get("moduleConfig", {}).items() } if "vectorConfig" in schema else None ), vectorizer=(schema.get("vectorizer", "none") if "vectorConfig" not in schema else None), ) for prop in schema["properties"] if _is_primitive(prop["dataType"]) ]
[docs] def _references_from_config(schema: Dict[str, Any]) -> List[_ReferenceProperty]: return [ _ReferenceProperty( target_collections=prop["dataType"], description=prop.get("description"), name=prop["name"], ) for prop in schema["properties"] if not _is_primitive(prop["dataType"]) ]