Source code for weaviate.collections.batch.rest

from typing import Dict, List, Optional

from httpx import Response

from weaviate.collections.classes.batch import (
    ErrorReference,
    BatchReference,
    _BatchReference,
    BatchReferenceReturn,
)
from weaviate.collections.classes.config import ConsistencyLevel
from weaviate.connect import executor
from weaviate.connect.v4 import Connection, _ExpectedStatusCodes
from weaviate.util import _decode_json_response_list


[docs] class _BatchREST: def __init__(self, consistency_level: Optional[ConsistencyLevel]) -> None: self.__consistency_level = consistency_level
[docs] def references( self, connection: Connection, *, references: List[_BatchReference] ) -> executor.Result[BatchReferenceReturn]: params: Dict[str, str] = {} if self.__consistency_level is not None: params["consistency_level"] = self.__consistency_level.value refs = [ ( {"from": ref.from_, "to": ref.to} if ref.tenant is None else {"from": ref.from_, "to": ref.to, "tenant": ref.tenant} ) for ref in references ] def resp(res: Response) -> BatchReferenceReturn: payload = _decode_json_response_list(res, "batch ref") assert payload is not None errors = { idx: ErrorReference( message=entry["result"]["errors"]["error"][0]["message"], reference=BatchReference._from_internal(references[idx]), ) for idx, entry in enumerate(payload) if entry["result"]["status"] == "FAILED" } return BatchReferenceReturn( elapsed_seconds=res.elapsed.total_seconds(), errors=errors, has_errors=len(errors) > 0, ) return executor.execute( response_callback=resp, method=connection.post, path="/batch/references", weaviate_object=refs, params=params, status_codes=_ExpectedStatusCodes(ok_in=200, error="Send ref batch"), )