Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3d45f57
feat: add apply function
Feb 28, 2023
9a07808
test: add benchmark tests
Feb 28, 2023
5709f97
fix: apply
Feb 28, 2023
366b6df
fix: benchmark test
Mar 1, 2023
6e8ff7c
test: benchmark
Mar 2, 2023
89eaf62
fix: apply
Mar 2, 2023
545d9ec
fix: clean up
Mar 2, 2023
d871363
chore: remove benchmark tests from general tests
Mar 2, 2023
8a436ae
chore: fix ci
Mar 2, 2023
c581b58
feat: add threading option and benchmark test
Mar 2, 2023
8bf57fb
test: use both backend options in tests
Mar 2, 2023
0c3524c
feat: add batching to abstract array
Mar 2, 2023
c335895
feat: add apply_batch and _map_batch and tests
Mar 2, 2023
cc23e4e
test: fix load from da
Mar 2, 2023
73c0d84
docs: update docstrings
Mar 2, 2023
b7c2cae
docs: add example for apply
Mar 2, 2023
3eb0c30
fix: mypy
Mar 2, 2023
7c6cb2f
refactor: clean up
Mar 2, 2023
afa5837
refactor: make batch method private
Mar 2, 2023
c69585a
fix: apply
Mar 2, 2023
8a3437a
Test: add for apply batch
Mar 2, 2023
66b78b3
fix: benchmark test increase ndocs
Mar 3, 2023
35e090a
test: clean up
Mar 3, 2023
3019522
test: try to fix
Mar 3, 2023
313d318
test: try to fix test
Mar 3, 2023
0afd5bd
fix: test
Mar 3, 2023
fdcfa23
fix: test
Mar 3, 2023
fc91dbf
fix: apply suggestions from code review
Mar 3, 2023
0d7cd1b
fix: remove print statemetns
Mar 3, 2023
b4c672b
fix: apply samis suggestion
Mar 3, 2023
18a377b
fix: add tests for func da to doc and da to other len da
Mar 3, 2023
245283f
fix: revert last commit
Mar 3, 2023
76fe8b7
test: add len assert
Mar 3, 2023
34b7f9c
test: add assertions
Mar 3, 2023
c7a968d
test: add test to for da extend in batch apply
Mar 3, 2023
6cf8ed2
test: extend with only one doc
Mar 3, 2023
5dc9e6d
test: fix
Mar 3, 2023
d3fc203
fix: test
Mar 3, 2023
45cdc4a
fix: test
Mar 3, 2023
9839602
fix: set docs in apply
Mar 3, 2023
87a93ff
fix: indices
Mar 3, 2023
eeb7fae
fix: indices
Mar 3, 2023
72aaf21
fix: indices
Mar 3, 2023
c0f8029
fix: indices
Mar 3, 2023
9b83c1f
fix:test
Mar 3, 2023
7638d86
fix: mypy
Mar 3, 2023
4a3a290
fix: type hint
Mar 3, 2023
38aae7a
fix: remove apply, only keep map
Mar 3, 2023
01900c9
refactor: map to map_docs
Mar 3, 2023
f6921e0
fix: apply suggestion
Mar 3, 2023
c3fb041
docs: add example usage
Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: remove apply, only keep map
Signed-off-by: anna-charlotte <[email protected]>
  • Loading branch information
anna-charlotte committed Mar 3, 2023
commit 38aae7afb4e3ddf44ab4a90a87dc4d6e3b0f74b2
161 changes: 2 additions & 159 deletions docarray/utils/apply.py → docarray/utils/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,7 @@
T_doc = TypeVar('T_doc', bound=BaseDocument)


def apply(
da: T,
func: Callable[[T_doc], T_doc],
backend: str = 'thread',
num_worker: Optional[int] = None,
pool: Optional[Union[Pool, ThreadPool]] = None,
show_progress: bool = False,
) -> None:
"""
Apply `func` to every Document of the given DocumentArray in-place while multithreading
or multiprocessing.

EXAMPLE USAGE

.. code-block:: python

from docarray import DocumentArray
from docarray.documents import Image
from docarray.utils.apply import apply


def load_url_to_tensor(img: Image) -> Image:
img.tensor = img.url.load()
return img


da = DocumentArray[Image]([Image(url='path/to/img.png') for _ in range(100)])
apply(
da, load_url_to_tensor, backend='thread'
) # threading is usually a good option for IO-bound tasks such as loading an image from url

for doc in da:
assert doc.tensor is not None

:param da: DocumentArray to apply function to
:param func: a function that takes a :class:`BaseDocument` as input and outputs
a :class:`BaseDocument`.
:param backend: `thread` for multithreading and `process` for multiprocessing.
Defaults to `thread`.
In general, if `func` is IO-bound then `thread` is a good choice.
On the other hand, if `func` is CPU-bound, then you may use `process`.
In practice, you should try yourselves to figure out the best value.
However, if you wish to modify the elements in-place, regardless of IO/CPU-bound,
you should always use `thread` backend.
Note that computation that is offloaded to non-python code (e.g. through np/torch/tf)
falls under the "IO-bound" category.

.. warning::
When using `process` backend, your `func` should not modify elements in-place.
This is because the multiprocessing backend passes the variable via pickle
and works in another process.
The passed object and the original object do **not** share the same memory.

:param num_worker: the number of parallel workers. If not given, the number of
CPUs in the system will be used.
:param pool: use an existing/external process or thread pool. If given, you will
be responsible for closing the pool.
:param show_progress: show a progress bar. Defaults to False.

"""
for i, doc in enumerate(_map(da, func, backend, num_worker, pool, show_progress)):
da[i] = doc


def _map(
def map(
da: T,
func: Callable[[T_doc], T_doc],
backend: str = 'thread',
Expand All @@ -89,9 +25,6 @@ def _map(
Return an iterator that applies `func` to every Document in `da` in parallel,
yielding the results.

.. seealso::
- To return :class:`DocumentArray`, please use :func:`apply`.

:param da: DocumentArray to apply function to
:param func: a function that takes a :class:`BaseDocument` as input and outputs
a :class:`BaseDocument`.
Expand Down Expand Up @@ -139,94 +72,7 @@ def _map(
yield x


def apply_batch(
da: T,
func: Union[Callable[[T], T], Callable[[T], T_doc]],
batch_size: int,
backend: str = 'thread',
num_worker: Optional[int] = None,
shuffle: bool = False,
pool: Optional[Union[Pool, ThreadPool]] = None,
show_progress: bool = False,
) -> None:
"""
Batches itself into mini-batches, applies `func` to every mini-batch in-place.

EXAMPLE USAGE

.. code-block:: python

from docarray import BaseDocument, DocumentArray
from docarray.utils.apply import apply_batch


class MyDoc(BaseDocument):
name: str


def upper_case_name(da: DocumentArray[MyDoc]) -> DocumentArray[MyDoc]:
da.name = [n.upper() for n in da.name]
return da


da = DocumentArray[MyDoc]([MyDoc(name='my orange cat') for _ in range(100)])
apply_batch(da, upper_case_name, batch_size=10)
print(da.name[:3])

.. code-block:: text

['MY ORANGE CAT', 'MY ORANGE CAT', 'MY ORANGE CAT']

:param da: DocumentArray to apply function to
:param func: a function that takes an :class:`AnyDocumentArray` as input and outputs
an :class:`AnyDocumentArray` or a :class:`BaseDocument`.
:param batch_size: size of each generated batch (except the last batch, which might
be smaller).
:param backend: `thread` for multithreading and `process` for multiprocessing.
Defaults to `thread`.
In general, if `func` is IO-bound then `thread` is a good choice.
On the other hand, if `func` is CPU-bound, then you may use `process`.
In practice, you should try yourselves to figure out the best value.
However, if you wish to modify the elements in-place, regardless of IO/CPU-bound,
you should always use `thread` backend.
Note that computation that is offloaded to non-python code (e.g. through np/torch/tf)
falls under the "IO-bound" category.

.. warning::
When using `process` backend, your `func` should not modify elements in-place.
This is because the multiprocessing backend passes the variable via pickle
and works in another process.
The passed object and the original object do **not** share the same memory.

:param num_worker: the number of parallel workers. If not given, the number of CPUs
in the system will be used.
:param shuffle: If set, shuffle the Documents before dividing into minibatches.
:param pool: use an existing/external process or thread pool. If given, you will
be responsible for closing the pool.
:param show_progress: show a progress bar. Defaults to False.
"""
diff = 0
for i, batch in enumerate(
_map_batch(
da, func, batch_size, backend, num_worker, shuffle, pool, show_progress
)
):
if i == 0:
if isinstance(batch, AnyDocumentArray):
diff = len(batch) - batch_size
else:
diff = 1 - batch_size

start = i * (batch_size + diff)
stop = (i + 1) * batch_size + (i * diff)

if isinstance(batch, da.__class__):
da[start:stop] = batch
else:
da[start:stop] = da.__class_getitem__(da.document_type)([batch])


def _map_batch(
def map_batch(
da: T,
func: Callable[[T], Union[T, T_doc]],
batch_size: int,
Expand All @@ -241,9 +87,6 @@ def _map_batch(
yielding the results.
Each element in the returned iterator is an :class:`AnyDocumentArray`.

.. seealso::
- To return :class:`DocumentArray`, please use :func:`apply_batch`.

:param batch_size: Size of each generated batch (except the last one, which might
be smaller).
:param shuffle: If set, shuffle the Documents before dividing into minibatches.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from docarray import BaseDocument, DocumentArray
from docarray.documents import Image
from docarray.typing import NdArray
from docarray.utils.apply import apply, apply_batch
from docarray.utils.map import map, map_batch
from tests.units.typing.test_bytes import IMAGE_PATHS

pytestmark = [pytest.mark.benchmark, pytest.mark.slow]
Expand All @@ -25,7 +25,7 @@ def cpu_intensive(doc: MyMatrix) -> MyMatrix:
return doc


def test_apply_multiprocessing():
def test_map_multiprocessing():
if os.cpu_count() > 1:

def time_multiprocessing(num_workers: int) -> float:
Expand All @@ -34,11 +34,17 @@ def time_multiprocessing(num_workers: int) -> float:
matrices = [rng.random(size=(1000, 1000)) for _ in range(n_docs)]
da = DocumentArray[MyMatrix]([MyMatrix(matrix=m) for m in matrices])
start_time = time()
apply(da=da, func=cpu_intensive, backend='process', num_worker=num_workers)
list(
map(
da=da, func=cpu_intensive, backend='process', num_worker=num_workers
)
)
return time() - start_time

time_1_cpu = time_multiprocessing(num_workers=1)
print(f"time_1_cpu = {time_1_cpu}")
time_2_cpu = time_multiprocessing(num_workers=2)
print(f"time_2_cpu = {time_2_cpu}")

assert time_2_cpu < time_1_cpu

Expand All @@ -52,7 +58,7 @@ def cpu_intensive_batch(da: DocumentArray[MyMatrix]) -> DocumentArray[MyMatrix]:
return da


def test_apply_batch_multiprocessing():
def test_map_batch_multiprocessing():
if os.cpu_count() > 1:

def time_multiprocessing(num_workers: int) -> float:
Expand All @@ -61,17 +67,21 @@ def time_multiprocessing(num_workers: int) -> float:
matrices = [rng.random(size=(1000, 1000)) for _ in range(n_docs)]
da = DocumentArray[MyMatrix]([MyMatrix(matrix=m) for m in matrices])
start_time = time()
apply_batch(
da=da,
func=cpu_intensive_batch,
batch_size=8,
backend='process',
num_worker=num_workers,
list(
map_batch(
da=da,
func=cpu_intensive_batch,
batch_size=8,
backend='process',
num_worker=num_workers,
)
)
return time() - start_time

time_1_cpu = time_multiprocessing(num_workers=1)
print(f"time_1_cpu = {time_1_cpu}")
time_2_cpu = time_multiprocessing(num_workers=2)
print(f"time_2_cpu = {time_2_cpu}")

assert time_2_cpu < time_1_cpu

Expand All @@ -82,18 +92,20 @@ def io_intensive(img: Image) -> Image:
return img


def test_apply_multithreading():
def test_map_multithreading():
def time_multithreading(num_workers: int) -> float:
n_docs = 100
da = DocumentArray[Image](
[Image(url=IMAGE_PATHS['png']) for _ in range(n_docs)]
)
start_time = time()
apply(da=da, func=io_intensive, backend='thread', num_worker=num_workers)
list(map(da=da, func=io_intensive, backend='thread', num_worker=num_workers))
return time() - start_time

time_1_thread = time_multithreading(num_workers=1)
print(f"time_1_thread = {time_1_thread}")
time_2_thread = time_multithreading(num_workers=2)
print(f"time_2_thread = {time_2_thread}")

assert time_2_thread < time_1_thread

Expand All @@ -105,23 +117,27 @@ def io_intensive_batch(da: DocumentArray[Image]) -> DocumentArray[Image]:
return da


def test_apply_batch_multithreading():
def test_map_batch_multithreading():
def time_multithreading_batch(num_workers: int) -> float:
n_docs = 100
da = DocumentArray[Image](
[Image(url=IMAGE_PATHS['png']) for _ in range(n_docs)]
)
start_time = time()
apply_batch(
da=da,
func=io_intensive_batch,
backend='thread',
num_worker=num_workers,
batch_size=10,
list(
map_batch(
da=da,
func=io_intensive_batch,
backend='thread',
num_worker=num_workers,
batch_size=10,
)
)
return time() - start_time

time_1_thread = time_multithreading_batch(num_workers=1)
print(f"time_1_thread = {time_1_thread}")
time_2_thread = time_multithreading_batch(num_workers=2)
print(f"time_2_thread = {time_2_thread}")

assert time_2_thread < time_1_thread
Loading