Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
df47474
feat: support redis
jupyterjazz May 17, 2023
0920adc
chore: merge main
jupyterjazz Jun 12, 2023
51558b7
fix: index creation
jupyterjazz Jun 15, 2023
12da714
feat: 1st draft, needs polishing
jupyterjazz Jun 21, 2023
be7bed7
feat: query builder, tests
jupyterjazz Jun 28, 2023
9365327
Merge branch 'main' into feat-add-redis
jupyterjazz Jun 28, 2023
cb25869
chore: update poetry lock
jupyterjazz Jun 28, 2023
341fa9a
chore: run tests
jupyterjazz Jun 28, 2023
d37b28f
Merge branch 'main' into feat-add-redis
jupyterjazz Jun 28, 2023
abca1dd
fix: defaultdict for column config
jupyterjazz Jun 28, 2023
edc8a34
chore: update branch
jupyterjazz Jun 28, 2023
c5abd80
style: ignore mypy errors
jupyterjazz Jun 28, 2023
22434f1
refactor: put vectorfield args in column info
jupyterjazz Jun 28, 2023
4a96194
chore: remove unused code
jupyterjazz Jun 28, 2023
a52cdfd
docs: add docstrings
jupyterjazz Jun 28, 2023
9ac683f
test: add tensorflow test
jupyterjazz Jun 28, 2023
a7f54c3
fix: tensorflow test
jupyterjazz Jun 28, 2023
be9d771
fix: tf tst
jupyterjazz Jun 28, 2023
4dc99e6
refactor: reduce ignore types
jupyterjazz Jun 28, 2023
d2718b4
style: remove other type ignores
jupyterjazz Jun 28, 2023
9f58474
style: try removing import ignores
jupyterjazz Jun 28, 2023
7e791da
style: i think mypy hates me
jupyterjazz Jun 28, 2023
8ed3dbb
feat: batch indexing
jupyterjazz Jun 29, 2023
83afb5f
chore: bump redis version
jupyterjazz Jun 29, 2023
4b0bc73
feat: subindex not fully finished
jupyterjazz Jul 2, 2023
0b15adc
feat: finalize subindex
jupyterjazz Jul 5, 2023
e8fbc47
docs: update readme
jupyterjazz Jul 5, 2023
848e95d
chore: commits not showing
jupyterjazz Jul 5, 2023
c00abe2
Merge branch 'main' into feat-add-redis
jupyterjazz Jul 5, 2023
27dd29a
feat: del and get batched
jupyterjazz Jul 6, 2023
162c6a8
docs: update batchsize docstring
jupyterjazz Jul 6, 2023
16c4323
Merge branch 'main' into feat-add-redis
jupyterjazz Jul 6, 2023
56829d6
refactor: index name
jupyterjazz Jul 9, 2023
c414836
Merge branch 'main' into feat-add-redis
jupyterjazz Jul 9, 2023
7a5ed5e
refactor: default index name following schema
jupyterjazz Jul 9, 2023
0fecb48
chore: update branch
jupyterjazz Jul 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: index name
Signed-off-by: jupyterjazz <[email protected]>
  • Loading branch information
jupyterjazz committed Jul 9, 2023
commit 56829d6986ef5d21e48e097397b506c3ca028f63
57 changes: 28 additions & 29 deletions docarray/index/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,14 @@
class RedisDocumentIndex(BaseDocIndex, Generic[TSchema]):
def __init__(self, db_config=None, **kwargs):
"""Initialize RedisDocumentIndex"""
if db_config is not None and getattr(db_config, 'index_name'):
self._index_name = db_config.index_name
elif kwargs.get('index_name'):
self._index_name = kwargs.get('index_name')
else:
self._index_name = 'index_name__' + self._random_name()

self._index_name = None
super().__init__(db_config=db_config, **kwargs)
self._db_config = cast(RedisDocumentIndex.DBConfig, self._db_config)

self._runtime_config: RedisDocumentIndex.RuntimeConfig = cast(
RedisDocumentIndex.RuntimeConfig, self._runtime_config
)
self._prefix = self._index_name + ':'
self._prefix = self.index_name + ':'
self._text_scorer = self._db_config.text_scorer
# initialize Redis client
self._client = redis.Redis(
Expand All @@ -108,7 +102,7 @@ def _random_name() -> str:

def _create_index(self) -> None:
"""Create a new index in the Redis database if it doesn't already exist."""
if not self._check_index_exists(self._index_name):
if not self._check_index_exists(self.index_name):
schema = []
for column, info in self._column_infos.items():
if issubclass(info.docarray_type, AnyDocArray):
Expand Down Expand Up @@ -153,16 +147,16 @@ def _create_index(self) -> None:
schema.append(info.db_type('$.' + column, as_name=column))

# Create Redis Index
self._client.ft(self._index_name).create_index(
self._client.ft(self.index_name).create_index(
schema,
definition=IndexDefinition(
prefix=[self._prefix], index_type=IndexType.JSON
),
)

self._logger.info(f'index {self._index_name} has been created')
self._logger.info(f'index {self.index_name} has been created')
else:
self._logger.info(f'connected to existing {self._index_name} index')
self._logger.info(f'connected to existing {self.index_name} index')

def _check_index_exists(self, index_name: str) -> bool:
"""
Expand All @@ -181,6 +175,11 @@ def _check_index_exists(self, index_name: str) -> bool:

@property
def index_name(self):
if not self._index_name:
self._index_name = index_name = (
self._db_config.index_name or 'index_name__' + self._random_name()
)
self._logger.debug(f'Retrieved index name: {index_name}')
return self._index_name

@property
Expand Down Expand Up @@ -290,39 +289,39 @@ def _generate_items(
batch_size: int,
) -> Iterator[List[Dict[str, Any]]]:
"""
Given a dictionary of generators, yield a list of dictionaries where each
item consists of a key and a single item from the corresponding generator.
Given a dictionary of data generators, yield a list of dictionaries where each
item consists of a column name and a single item from the corresponding generator.

:param column_to_data: A dictionary where each key is a column and each value
:param column_to_data: A dictionary where each key is a column name and each value
is a generator.
:param batch_size: Size of batch to generate each time.

:yield: A list of dictionaries where each item consists of a column name and
an item from the corresponding generator. Yields until all generators
are exhausted.
"""
keys = list(column_to_data.keys())
iterators = [iter(column_to_data[key]) for key in keys]
column_names = list(column_to_data.keys())
data_generators = [iter(column_to_data[name]) for name in column_names]
batch: List[Dict[str, Any]] = []

while True:
item_dict = {}
for key, it in zip(keys, iterators):
item = next(it, None)
data_dict = {}
for name, generator in zip(column_names, data_generators):
item = next(generator, None)

if key == 'id' and not item:
if name == 'id' and not item:
if batch:
yield batch
return

if isinstance(item, AbstractTensor):
item_dict[key] = item._docarray_to_ndarray().tolist()
data_dict[name] = item._docarray_to_ndarray().tolist()
elif isinstance(item, ndarray):
item_dict[key] = item.astype(np.float32).tolist()
data_dict[name] = item.astype(np.float32).tolist()
elif item is not None:
item_dict[key] = item
data_dict[name] = item

batch.append(item_dict)
batch.append(data_dict)
if len(batch) == batch_size:
yield batch
batch = []
Expand Down Expand Up @@ -355,7 +354,7 @@ def num_docs(self) -> int:

:return: Number of documents in the index.
"""
num_docs = self._client.ft(self._index_name).info()['num_docs']
num_docs = self._client.ft(self.index_name).info()['num_docs']
return int(num_docs)

def _del_items(self, doc_ids: Sequence[str]) -> None:
Expand Down Expand Up @@ -469,7 +468,7 @@ def _hybrid_search(
'vec': np.array(query, dtype=np.float32).tobytes()
}
results = (
self._client.ft(self._index_name).search(redis_query, query_params).docs # type: ignore[arg-type]
self._client.ft(self.index_name).search(redis_query, query_params).docs # type: ignore[arg-type]
)

scores: NdArray = NdArray._docarray_from_native(
Expand Down Expand Up @@ -527,7 +526,7 @@ def _filter(self, filter_query: Any, limit: int) -> Union[DocList, List[Dict]]:
q = Query(filter_query)
q.paging(0, limit)

results = self._client.ft(index_name=self._index_name).search(q).docs
results = self._client.ft(index_name=self.index_name).search(q).docs
docs = [json.loads(doc.json) for doc in results]
return docs

Expand Down Expand Up @@ -574,7 +573,7 @@ def _text_search(
.paging(0, limit)
)

results = self._client.ft(index_name=self._index_name).search(q).docs
results = self._client.ft(index_name=self.index_name).search(q).docs

scores: NdArray = NdArray._docarray_from_native(
np.array([document['score'] for document in results])
Expand Down