Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: some changes
Signed-off-by: jupyterjazz <[email protected]>
  • Loading branch information
jupyterjazz committed Jul 13, 2023
commit d1edd960d74abd650ee317f5311eefa188fa2552
2 changes: 1 addition & 1 deletion docarray/index/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ def filter(
self._logger.debug(f'Executing `filter` for the query {filter_query}')
docs = self._filter(filter_query, limit=limit, **kwargs)

if isinstance(docs, List):
if isinstance(docs, List) and not isinstance(docs, DocList):
docs = self._dict_list_to_docarray(docs)

return docs
Expand Down
164 changes: 138 additions & 26 deletions docarray/index/backends/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
cast,
)

import numpy as np

from docarray import BaseDoc, DocList
from docarray.index.abstract import BaseDocIndex
from docarray.index.backends.helper import _execute_find_and_filter_query
from docarray.typing import AnyTensor
from docarray.typing.id import ID
from docarray.typing.tensor.abstract_tensor import AbstractTensor
from docarray.utils._internal._typing import safe_issubclass
from docarray.utils._internal.misc import import_library
from docarray.utils.find import _FindResult, _FindResultBatched
from docarray.utils.find import (
_FindResult,
_FindResultBatched,
FindResult,
FindResultBatched,
)

if TYPE_CHECKING:
import numpy as np
from pymilvus import (
Collection,
CollectionSchema,
Expand All @@ -38,7 +43,6 @@
utility,
)
else:
np = import_library('numpy', raise_error=False)
from pymilvus import (
Collection,
CollectionSchema,
Expand All @@ -55,7 +59,7 @@


class MilvusDocumentIndex(BaseDocIndex, Generic[TSchema]):
def __init__(self, db_config=None, index_name=None, **kwargs):
def __init__(self, db_config=None, **kwargs):
"""Initialize MilvusDocumentIndex"""
super().__init__(db_config=db_config, **kwargs)
self._db_config: MilvusDocumentIndex.DBConfig = cast(
Expand All @@ -72,8 +76,8 @@ def __init__(self, db_config=None, index_name=None, **kwargs):

self._loaded = False

self._db_config.index_name = index_name
self._validate_columns()
self._field_name = self._get_vector_field_name()
self._create_collection_name()
self._collection = self._init_index()
self._build_index()
Expand All @@ -90,7 +94,6 @@ class DBConfig(BaseDocIndex.DBConfig):
user: Optional[str] = ""
password: Optional[str] = ""
token: Optional[str] = ""
index_name: str = None
index_type: str = "IVF_FLAT"
index_metric: str = "L2"
index_params: Dict = field(default_factory=lambda: {"nlist": 1024})
Expand Down Expand Up @@ -158,7 +161,7 @@ def _init_index(self) -> Collection:
dtype=DataType.VARCHAR,
is_primary=True,
max_length=ID_VARCHAR_LEN,
), # id represents the document id
),
]
fields.extend(
[
Expand All @@ -172,16 +175,16 @@ def _init_index(self) -> Collection:
else {}
),
**(
{'dim': info.n_dim}
{'dim': info.n_dim or info.config.get('dim')}
if info.db_type == DataType.FLOAT_VECTOR
else {}
),
)
for column_name, info in self._column_infos.items()
if column_name != "id"
if column_name != 'id'
and not (
info.db_type == DataType.FLOAT_VECTOR
and column_name != self._db_config.index_name
and column_name != self._field_name
) # Only store one vector field in column
]
)
Expand Down Expand Up @@ -213,26 +216,29 @@ def _create_collection_name(self):

def _validate_columns(self):
"""
Validates if the DataFrame contains at least one vector column
(Milvus' requirement) and checks that each vector column has
dimension information specified.
Validates whether the data schema includes at least one vector column used
for embedding (as required by Milvus), and ensures that dimension information
is specified for that column.
"""
vector_columns_exist = any(
vector_columns = sum(
safe_issubclass(info.docarray_type, AbstractTensor)
and info.config.get('is_embedding', False)
for info in self._column_infos.values()
)

if not vector_columns_exist:
if vector_columns == 0:
raise ValueError(
"No vector columns found. Ensure that at least one column is of a vector type"
"Unable to find any vector columns. Please make sure that at least one "
"column is of a vector type with the is_embedding=True attribute specified."
)
elif vector_columns > 1:
raise ValueError("Specifying multiple vector fields is not supported.")

for column_name, info in self._column_infos.items():
if info.n_dim is None and safe_issubclass(
info.docarray_type, AbstractTensor
for column, info in self._column_infos.items():
if info.config.get('is_embedding') and (
not info.n_dim and not info.config.get('dim')
):
raise ValueError(
f"Dimension information is missing for column '{column_name}' which is vector type"
f"The dimension information is missing for the column '{column}', which is of vector type."
)

def _build_index(self):
Expand All @@ -247,11 +253,19 @@ def _build_index(self):
"params": self._db_config.index_params,
}

self._collection.create_index(self._db_config.index_name, index)
self._collection.create_index(self._field_name, index)
self._logger.info(
f"Index '{self._db_config.index_name}' has been successfully created"
f"Index for the field '{self._field_name}' has been successfully created"
)

def _get_vector_field_name(self):
for column, info in self._column_infos.items():
if info.db_type == DataType.FLOAT_VECTOR and info.config.get(
'is_embedding'
):
return column
return ''

def index(self, docs: Union[BaseDoc, Sequence[BaseDoc]], **kwargs):
"""Index Documents into the index.

Expand Down Expand Up @@ -279,7 +293,7 @@ def index(self, docs: Union[BaseDoc, Sequence[BaseDoc]], **kwargs):
for column_name, info in self._column_infos.items():
column_value = self._get_values_by_column([docs[i]], column_name)[0]
if info.db_type == DataType.FLOAT_VECTOR:
if column_name != self._db_config.index_name:
if column_name != self._field_name:
continue
column_value = self._map_embedding(column_value)

Expand Down Expand Up @@ -391,6 +405,46 @@ def _text_search_batched(
) -> _FindResultBatched:
raise NotImplementedError(f'{type(self)} does not support text search.')

def find(
self,
query: Union[AnyTensor, BaseDoc],
search_field: str = '',
limit: int = 10,
**kwargs,
) -> FindResult:
"""Find documents in the index using nearest neighbor search.

:param query: query vector for KNN/ANN search.
Can be either a tensor-like (np.array, torch.Tensor, etc.)
with a single axis, or a Document
:param search_field: name of the field to search on.
Documents in the index are retrieved based on this similarity
of this field to the query.
:param limit: maximum number of documents to return
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(f'Executing `find` for search field {search_field}')
if search_field != '':
raise ValueError(
'Argument search_field is not supported for MilvusDocumentIndex.'
'Set search_field to an empty string to proceed.'
)

search_field = self._field_name
if isinstance(query, BaseDoc):
query_vec = self._get_values_by_column([query], search_field)[0]
else:
query_vec = query
query_vec_np = self._to_numpy(query_vec)
docs, scores = self._find(
query_vec_np, search_field=search_field, limit=limit, **kwargs
)

if isinstance(docs, List) and not isinstance(docs, DocList):
docs = self._dict_list_to_docarray(docs)

return FindResult(documents=docs, scores=scores)

def _find(
self,
query: np.ndarray,
Expand All @@ -404,6 +458,7 @@ def _find(
anns_field=search_field,
param=self._db_config.search_params,
limit=limit,
offset=0,
expr=None,
output_fields=["serialized"],
consistency_level=self._db_config.consistency_level,
Expand All @@ -415,6 +470,63 @@ def _find(

return self._docs_from_find_response(results)

def find_batched(
self,
queries: Union[AnyTensor, DocList],
search_field: str = '',
limit: int = 10,
**kwargs,
) -> FindResultBatched:
"""Find documents in the index using nearest neighbor search.

:param queries: query vector for KNN/ANN search.
Can be either a tensor-like (np.array, torch.Tensor, etc.) with a,
or a DocList.
If a tensor-like is passed, it should have shape (batch_size, vector_dim)
:param search_field: name of the field to search on.
Documents in the index are retrieved based on this similarity
of this field to the query.
:param limit: maximum number of documents to return per query
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(f'Executing `find_batched` for search field {search_field}')

if search_field:
if '__' in search_field:
fields = search_field.split('__')
if issubclass(self._schema._get_field_type(fields[0]), AnyDocArray): # type: ignore
return self._subindices[fields[0]].find_batched(
queries,
search_field='__'.join(fields[1:]),
limit=limit,
**kwargs,
)
if search_field != '':
raise ValueError(
'Argument search_field is not supported for MilvusDocumentIndex.'
'Set search_field to an empty string to proceed.'
)
search_field = self._field_name
if isinstance(queries, Sequence):
query_vec_list = self._get_values_by_column(queries, search_field)
query_vec_np = np.stack(
tuple(self._to_numpy(query_vec) for query_vec in query_vec_list)
)
else:
query_vec_np = self._to_numpy(queries)

da_list, scores = self._find_batched(
query_vec_np, search_field=search_field, limit=limit, **kwargs
)
if (
len(da_list) > 0
and isinstance(da_list[0], List)
and not isinstance(da_list[0], DocList)
):
da_list = [self._dict_list_to_docarray(docs) for docs in da_list]

return FindResultBatched(documents=da_list, scores=scores) # type: ignore

def _find_batched(
self,
queries: np.ndarray,
Expand All @@ -425,7 +537,7 @@ def _find_batched(

results = self._collection.search(
data=queries,
anns_field=search_field,
anns_field=self._field_name,
param=self._db_config.search_params,
limit=limit,
expr=None,
Expand Down
2 changes: 1 addition & 1 deletion tests/index/milvus/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ services:

standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.2.10
image: milvusdb/milvus:v2.2.11
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
Expand Down
10 changes: 5 additions & 5 deletions tests/index/milvus/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import os
import pytest
import time
import os

import pytest

cur_dir = os.path.dirname(os.path.abspath(__file__))
milvus_yml = os.path.abspath(os.path.join(cur_dir, 'docker-compose.yml'))


@pytest.fixture(scope='session', autouse=True)
def start_storage():
os.system(f"docker-compose -f {milvus_yml} up -d --remove-orphans")
time.sleep(1)
os.system(f"docker compose -f {milvus_yml} up -d --remove-orphans")
time.sleep(10)

yield
os.system(f"docker-compose -f {milvus_yml} down --remove-orphans")
os.system(f"docker compose -f {milvus_yml} down --remove-orphans")
56 changes: 56 additions & 0 deletions tests/index/milvus/test_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import numpy as np
import pytest
from pydantic import Field

from docarray import BaseDoc
from docarray.index import MilvusDocumentIndex
from docarray.typing import NdArray
from tests.index.milvus.fixtures import start_storage # noqa: F401


pytestmark = [pytest.mark.slow, pytest.mark.index]


def test_configure_dim():
class Schema1(BaseDoc):
tens: NdArray[10] = Field(is_embedding=True)

index = MilvusDocumentIndex[Schema1]()

docs = [Schema1(tens=np.random.random((10,))) for _ in range(10)]
index.index(docs)

assert index.num_docs() == 10

class Schema2(BaseDoc):
tens: NdArray = Field(is_embedding=True, dim=10)

index = MilvusDocumentIndex[Schema2]()

docs = [Schema2(tens=np.random.random((10,))) for _ in range(10)]
index.index(docs)

assert index.num_docs() == 10

class Schema3(BaseDoc):
tens: NdArray = Field(is_embedding=True)

with pytest.raises(ValueError, match='The dimension information is missing'):
MilvusDocumentIndex[Schema3]()


def test_incorrect_vector_field():
class Schema1(BaseDoc):
tens: NdArray[10]

with pytest.raises(ValueError, match='Unable to find any vector columns'):
MilvusDocumentIndex[Schema1]()

class Schema2(BaseDoc):
tens1: NdArray[10] = Field(is_embedding=True)
tens2: NdArray[20] = Field(is_embedding=True)

with pytest.raises(
ValueError, match='Specifying multiple vector fields is not supported'
):
MilvusDocumentIndex[Schema2]()
Loading