Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3d45f57
feat: add apply function
Feb 28, 2023
9a07808
test: add benchmark tests
Feb 28, 2023
5709f97
fix: apply
Feb 28, 2023
366b6df
fix: benchmark test
Mar 1, 2023
6e8ff7c
test: benchmark
Mar 2, 2023
89eaf62
fix: apply
Mar 2, 2023
545d9ec
fix: clean up
Mar 2, 2023
d871363
chore: remove benchmark tests from general tests
Mar 2, 2023
8a436ae
chore: fix ci
Mar 2, 2023
c581b58
feat: add threading option and benchmark test
Mar 2, 2023
8bf57fb
test: use both backend options in tests
Mar 2, 2023
0c3524c
feat: add batching to abstract array
Mar 2, 2023
c335895
feat: add apply_batch and _map_batch and tests
Mar 2, 2023
cc23e4e
test: fix load from da
Mar 2, 2023
73c0d84
docs: update docstrings
Mar 2, 2023
b7c2cae
docs: add example for apply
Mar 2, 2023
3eb0c30
fix: mypy
Mar 2, 2023
7c6cb2f
refactor: clean up
Mar 2, 2023
afa5837
refactor: make batch method private
Mar 2, 2023
c69585a
fix: apply
Mar 2, 2023
8a3437a
Test: add for apply batch
Mar 2, 2023
66b78b3
fix: benchmark test increase ndocs
Mar 3, 2023
35e090a
test: clean up
Mar 3, 2023
3019522
test: try to fix
Mar 3, 2023
313d318
test: try to fix test
Mar 3, 2023
0afd5bd
fix: test
Mar 3, 2023
fdcfa23
fix: test
Mar 3, 2023
fc91dbf
fix: apply suggestions from code review
Mar 3, 2023
0d7cd1b
fix: remove print statemetns
Mar 3, 2023
b4c672b
fix: apply samis suggestion
Mar 3, 2023
18a377b
fix: add tests for func da to doc and da to other len da
Mar 3, 2023
245283f
fix: revert last commit
Mar 3, 2023
76fe8b7
test: add len assert
Mar 3, 2023
34b7f9c
test: add assertions
Mar 3, 2023
c7a968d
test: add test to for da extend in batch apply
Mar 3, 2023
6cf8ed2
test: extend with only one doc
Mar 3, 2023
5dc9e6d
test: fix
Mar 3, 2023
d3fc203
fix: test
Mar 3, 2023
45cdc4a
fix: test
Mar 3, 2023
9839602
fix: set docs in apply
Mar 3, 2023
87a93ff
fix: indices
Mar 3, 2023
eeb7fae
fix: indices
Mar 3, 2023
72aaf21
fix: indices
Mar 3, 2023
c0f8029
fix: indices
Mar 3, 2023
9b83c1f
fix:test
Mar 3, 2023
7638d86
fix: mypy
Mar 3, 2023
4a3a290
fix: type hint
Mar 3, 2023
38aae7a
fix: remove apply, only keep map
Mar 3, 2023
01900c9
refactor: map to map_docs
Mar 3, 2023
f6921e0
fix: apply suggestion
Mar 3, 2023
c3fb041
docs: add example usage
Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: map to map_docs
Signed-off-by: anna-charlotte <[email protected]>
  • Loading branch information
anna-charlotte committed Mar 3, 2023
commit 01900c9944cd0235e8ad147339d8bb1113e3e4ed
2 changes: 1 addition & 1 deletion docarray/utils/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
T_doc = TypeVar('T_doc', bound=BaseDocument)


def map(
def map_docs(
da: T,
func: Callable[[T_doc], T_doc],
backend: str = 'thread',
Expand Down
24 changes: 9 additions & 15 deletions tests/benchmark_tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from docarray import BaseDocument, DocumentArray
from docarray.documents import Image
from docarray.typing import NdArray
from docarray.utils.map import map, map_batch
from docarray.utils.map import map_batch, map_docs
from tests.units.typing.test_bytes import IMAGE_PATHS

pytestmark = [pytest.mark.benchmark, pytest.mark.slow]
Expand All @@ -25,7 +25,7 @@ def cpu_intensive(doc: MyMatrix) -> MyMatrix:
return doc


def test_map_multiprocessing():
def test_map_docs_multiprocessing():
if os.cpu_count() > 1:

def time_multiprocessing(num_workers: int) -> float:
Expand All @@ -35,16 +35,14 @@ def time_multiprocessing(num_workers: int) -> float:
da = DocumentArray[MyMatrix]([MyMatrix(matrix=m) for m in matrices])
start_time = time()
list(
map(
map_docs(
da=da, func=cpu_intensive, backend='process', num_worker=num_workers
)
)
return time() - start_time

time_1_cpu = time_multiprocessing(num_workers=1)
print(f"time_1_cpu = {time_1_cpu}")
time_2_cpu = time_multiprocessing(num_workers=2)
print(f"time_2_cpu = {time_2_cpu}")

assert time_2_cpu < time_1_cpu

Expand All @@ -58,7 +56,7 @@ def cpu_intensive_batch(da: DocumentArray[MyMatrix]) -> DocumentArray[MyMatrix]:
return da


def test_map_batch_multiprocessing():
def test_map_docs_batch_multiprocessing():
if os.cpu_count() > 1:

def time_multiprocessing(num_workers: int) -> float:
Expand All @@ -79,9 +77,7 @@ def time_multiprocessing(num_workers: int) -> float:
return time() - start_time

time_1_cpu = time_multiprocessing(num_workers=1)
print(f"time_1_cpu = {time_1_cpu}")
time_2_cpu = time_multiprocessing(num_workers=2)
print(f"time_2_cpu = {time_2_cpu}")

assert time_2_cpu < time_1_cpu

Expand All @@ -92,20 +88,20 @@ def io_intensive(img: Image) -> Image:
return img


def test_map_multithreading():
def test_map_docs_multithreading():
def time_multithreading(num_workers: int) -> float:
n_docs = 100
da = DocumentArray[Image](
[Image(url=IMAGE_PATHS['png']) for _ in range(n_docs)]
)
start_time = time()
list(map(da=da, func=io_intensive, backend='thread', num_worker=num_workers))
list(
map_docs(da=da, func=io_intensive, backend='thread', num_worker=num_workers)
)
return time() - start_time

time_1_thread = time_multithreading(num_workers=1)
print(f"time_1_thread = {time_1_thread}")
time_2_thread = time_multithreading(num_workers=2)
print(f"time_2_thread = {time_2_thread}")

assert time_2_thread < time_1_thread

Expand All @@ -117,7 +113,7 @@ def io_intensive_batch(da: DocumentArray[Image]) -> DocumentArray[Image]:
return da


def test_map_batch_multithreading():
def test_map_docs_batch_multithreading():
def time_multithreading_batch(num_workers: int) -> float:
n_docs = 100
da = DocumentArray[Image](
Expand All @@ -136,8 +132,6 @@ def time_multithreading_batch(num_workers: int) -> float:
return time() - start_time

time_1_thread = time_multithreading_batch(num_workers=1)
print(f"time_1_thread = {time_1_thread}")
time_2_thread = time_multithreading_batch(num_workers=2)
print(f"time_2_thread = {time_2_thread}")

assert time_2_thread < time_1_thread
10 changes: 5 additions & 5 deletions tests/units/util/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from docarray import BaseDocument, DocumentArray
from docarray.documents import Image
from docarray.typing import ImageUrl, NdArray
from docarray.utils.map import map, map_batch
from docarray.utils.map import map_batch, map_docs
from tests.units.typing.test_bytes import IMAGE_PATHS

N_DOCS = 2
Expand All @@ -28,7 +28,7 @@ def test_map(da, backend):
for tensor in da.tensor:
assert tensor is None

docs = list(map(da=da, func=load_from_doc, backend=backend))
docs = list(map_docs(da=da, func=load_from_doc, backend=backend))

assert len(docs) == N_DOCS
for doc in docs:
Expand All @@ -37,22 +37,22 @@ def test_map(da, backend):

def test_map_multiprocessing_lambda_func_raise_exception(da):
with pytest.raises(ValueError, match='Multiprocessing does not allow'):
list(map(da=da, func=lambda x: x, backend='process'))
list(map_docs(da=da, func=lambda x: x, backend='process'))


def test_map_multiprocessing_local_func_raise_exception(da):
def local_func(x):
return x

with pytest.raises(ValueError, match='Multiprocessing does not allow'):
list(map(da=da, func=local_func, backend='process'))
list(map_docs(da=da, func=local_func, backend='process'))


@pytest.mark.parametrize('backend', ['thread', 'process'])
def test_check_order(backend):
da = DocumentArray[Image]([Image(id=i) for i in range(N_DOCS)])

docs = list(map(da=da, func=load_from_doc, backend=backend))
docs = list(map_docs(da=da, func=load_from_doc, backend=backend))

assert len(docs) == N_DOCS
for i, doc in enumerate(docs):
Expand Down