Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: collection loading
Signed-off-by: jupyterjazz <[email protected]>
  • Loading branch information
jupyterjazz committed Jul 14, 2023
commit eff5ab107a40816946dfa2eff800096183f0a207
30 changes: 13 additions & 17 deletions docarray/index/backends/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,12 @@ def __init__(self, db_config=None, **kwargs):
token=self._db_config.token,
)

self._loaded = False

self._validate_columns()
self._field_name = self._get_vector_field_name()
self._create_collection_name()
self._collection = self._init_index()
self._build_index()
self._collection.load()
self._logger.info(f'{self.__class__.__name__} has been initialized')

@dataclass
Expand Down Expand Up @@ -244,6 +243,10 @@ def _validate_columns(self):
f"The dimension information is missing for the column '{column}', which is of vector type."
)

@property
def index_name(self):
return self._db_config.collection_name

def _build_index(self):
"""
Sets up an index configuration for a specific column index, which is
Expand Down Expand Up @@ -315,7 +318,8 @@ def num_docs(self) -> int:
Cannot use Milvus' num_entities method because it's not precise
especially after delete ops (#15201 issue in Milvus)
"""
self._check_loaded()

self._collection.load()

result = self._collection.query(
expr=self._always_true_expr("id"),
Expand All @@ -337,7 +341,7 @@ def _get_items(
Duplicate `doc_ids` can be omitted in the output.
"""

self._check_loaded()
self._collection.load()

result = self._collection.query(
expr="id in " + str([id for id in doc_ids]),
Expand All @@ -355,6 +359,7 @@ def _del_items(self, doc_ids: Sequence[str]):

:param doc_ids: ids to delete from the Document Store
"""
self._collection.load()
self._collection.delete(
expr="id in " + str([id for id in doc_ids]),
consistency_level=self._db_config.consistency_level,
Expand All @@ -367,7 +372,7 @@ def _filter(
filter_query: Any,
limit: int,
) -> Union[DocList, List[Dict]]:
self._check_loaded()
self._collection.load()

result = self._collection.query(
expr=filter_query,
Expand Down Expand Up @@ -454,7 +459,7 @@ def _find(
limit: int,
search_field: str = '',
) -> _FindResult:
self._check_loaded()
self._collection.load()

results = self._collection.search(
data=[query],
Expand Down Expand Up @@ -536,7 +541,7 @@ def _find_batched(
limit: int,
search_field: str = '',
) -> _FindResultBatched:
self._check_loaded()
self._collection.load()

results = self._collection.search(
data=queries,
Expand Down Expand Up @@ -570,13 +575,6 @@ def execute_query(self, query: Any, *args, **kwargs) -> Any:
)
return find_res

def _check_loaded(self):
"""This function checks if the collection is loaded and loads it if necessary"""

if not self._loaded:
self._collection.load()
self._loaded = True

def _docs_from_query_response(self, result: Sequence[Dict]) -> Sequence[TSchema]:
return DocList[self._schema](
[
Expand Down Expand Up @@ -606,7 +604,7 @@ def _docs_from_find_response(self, result: Hits) -> _FindResult:

def _always_true_expr(self, primary_key: str) -> str:
"""
Returns a Milvus expression that is always true, thus allowing for the retrieval of all entries in a Collection
Returns a Milvus expression that is always true, thus allowing for the retrieval of all entries in a Collection.
Assumes that the primary key is of type DataType.VARCHAR

:param primary_key: the name of the primary key
Expand All @@ -633,8 +631,6 @@ def _map_embedding(self, embedding: Optional[AnyTensor]) -> Optional[AnyTensor]:
return embedding

def __contains__(self, item) -> bool:
self._check_loaded()

result = self._collection.query(
expr="id in " + str([item.id]),
offset=0,
Expand Down
32 changes: 16 additions & 16 deletions tests/index/milvus/test_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,19 @@ def test_find_empty_index():
assert len(scores) == 0


def test_simple_usage():
class MyDoc(BaseDoc):
text: str
embedding: NdArray[128] = Field(is_embedding=True)

docs = [MyDoc(text='hey', embedding=np.random.rand(128)) for _ in range(200)]
queries = docs[0:3]
index = MilvusDocumentIndex[MyDoc]()
index.index(docs=DocList[MyDoc](docs))
print('num docs', index.num_docs())
resp = index.find_batched(queries=queries, limit=5)
docs_responses = resp.documents
assert len(docs_responses) == 3
for q, matches in zip(queries, docs_responses):
assert len(matches) == 5
assert q.id == matches[0].id
# def test_simple_usage():
# class MyDoc(BaseDoc):
# text: str
# embedding: NdArray[128] = Field(is_embedding=True)
#
# docs = [MyDoc(text='hey', embedding=np.random.rand(128)) for _ in range(200)]
# queries = docs[0:3]
# index = MilvusDocumentIndex[MyDoc]()
# index.index(docs=DocList[MyDoc](docs))
# print('num docs', index.num_docs())
# resp = index.find_batched(queries=queries, limit=5)
# docs_responses = resp.documents
# assert len(docs_responses) == 3
# for q, matches in zip(queries, docs_responses):
# assert len(matches) == 5
# assert q.id == matches[0].id
42 changes: 42 additions & 0 deletions tests/index/milvus/test_persist_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import numpy as np
import pytest
from pydantic import Field

from docarray import BaseDoc
from docarray.index import MilvusDocumentIndex
from docarray.typing import NdArray
from tests.index.milvus.fixtures import start_storage # noqa: F401


pytestmark = [pytest.mark.slow, pytest.mark.index]


class SimpleDoc(BaseDoc):
tens: NdArray[10] = Field(is_embedding=True)


def test_persist():
query = SimpleDoc(tens=np.random.random((10,)))

# create index
index = MilvusDocumentIndex[SimpleDoc]()

collection_name = index.index_name

assert index.num_docs() == 0

index.index([SimpleDoc(tens=np.random.random((10,))) for _ in range(10)])
assert index.num_docs() == 10
find_results_before = index.find(query, limit=5)

# load existing index
index = MilvusDocumentIndex[SimpleDoc](collection_name=collection_name)
assert index.num_docs() == 10
find_results_after = index.find(query, limit=5)
for doc_before, doc_after in zip(find_results_before[0], find_results_after[0]):
assert doc_before.id == doc_after.id
assert (doc_before.tens == doc_after.tens).all()

# add new data
index.index([SimpleDoc(tens=np.random.random((10,))) for _ in range(5)])
assert index.num_docs() == 15