Skip to content

Commit 4918cb3

Browse files
authored
feat(array): add post interface (#181)
1 parent 774d0ce commit 4918cb3

File tree

7 files changed

+237
-1
lines changed

7 files changed

+237
-1
lines changed

docarray/array/mixins/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .embed import EmbedMixin
66
from .empty import EmptyMixin
77
from .evaluation import EvaluationMixin
8+
from .find import FindMixin
89
from .getattr import GetAttributeMixin
910
from .getitem import GetItemMixin
1011
from .group import GroupMixin
@@ -16,9 +17,9 @@
1617
from .io.json import JsonIOMixin
1718
from .io.pushpull import PushPullMixin
1819
from .match import MatchMixin
19-
from .find import FindMixin
2020
from .parallel import ParallelMixin
2121
from .plot import PlotMixin
22+
from .post import PostMixin
2223
from .pydantic import PydanticMixin
2324
from .reduce import ReduceMixin
2425
from .sample import SampleMixin
@@ -50,6 +51,7 @@ class AllMixins(
5051
TraverseMixin,
5152
PlotMixin,
5253
SampleMixin,
54+
PostMixin,
5355
TextToolsMixin,
5456
EvaluationMixin,
5557
ReduceMixin,

docarray/array/mixins/post.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from typing import TYPE_CHECKING
2+
3+
if TYPE_CHECKING:
4+
from ... import DocumentArray
5+
6+
7+
class PostMixin:
8+
"""Helper functions for posting DocumentArray to Jina Flow. """
9+
10+
def post(self, host: str, show_progress: bool = False) -> 'DocumentArray':
11+
"""Posting itself to a remote Flow/Sandbox and get the modified DocumentArray back
12+
13+
:param host: a host string. Can be one of the following:
14+
- `grpc://192.168.0.123:8080/endpoint`
15+
- `websocket://192.168.0.123:8080/endpoint`
16+
- `http://192.168.0.123:8080/endpoint`
17+
- `jinahub://Hello/endpoint`
18+
- `jinahub+docker://Hello/endpoint`
19+
- `jinahub+sandbox://Hello/endpoint`
20+
21+
:param show_progress: if to show a progressbar
22+
:return: the new DocumentArray returned from remote
23+
"""
24+
25+
from urllib.parse import urlparse
26+
27+
r = urlparse(host)
28+
_on = r.path or '/'
29+
_port = r.port or None
30+
standardized_host = (
31+
r._replace(netloc=r.netloc.replace(f':{r.port}', ''))
32+
._replace(path='')
33+
.geturl()
34+
)
35+
36+
if r.scheme.startswith('jinahub'):
37+
from jina import Flow
38+
39+
f = Flow(quiet=True).add(uses=standardized_host)
40+
with f:
41+
return f.post(_on, inputs=self, show_progress=show_progress)
42+
elif r.scheme in ('grpc', 'http', 'websocket'):
43+
if _port is None:
44+
raise ValueError(f'can not determine port from {host}')
45+
46+
from jina import Client
47+
48+
c = Client(host=r.hostname, port=_port, protocol=r.scheme)
49+
return c.post(_on, inputs=self, show_progress=show_progress)
50+
else:
51+
raise ValueError(f'unsupported scheme: {r.scheme}')

docs/fundamentals/documentarray/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@ matching
4040
evaluation
4141
parallelization
4242
visualization
43+
post
4344
```
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
(da-post)=
2+
# Process via External Flow or Executor
3+
4+
```{tip}
5+
This feature requires `jina` dependency. Please install Jina via `pip install jina`.
6+
```
7+
8+
You can call an external Flow/Sandbox/Executor to "process" a DocumentArray via {meth}`~docarray.array.mixins.post.PostMixin.post`. The external Flow/Executor can be either local, remote, or inside Docker container.
9+
10+
For example, to use an existing Flow on `192.168.2.3` on port `12345` to process a DocumentArray:
11+
12+
```python
13+
from docarray import DocumentArray
14+
15+
da = DocumentArray.empty(10)
16+
17+
r = da.post('grpc://192.168.2.3:12345')
18+
r.summary()
19+
```
20+
21+
One can also use any [Executor from Jina Hub](https://hub.jina.ai), e.g.
22+
```python
23+
from docarray import DocumentArray, Document
24+
25+
da = DocumentArray([Document(text='Hi Alex, are you with your laptop?')])
26+
r = da.post('jinahub+sandbox://CoquiTTS7', show_progress=True)
27+
28+
r.summary()
29+
```
30+
31+
```text
32+
Documents Summary
33+
34+
Length 1
35+
Homogenous Documents True
36+
Common Attributes ('id', 'mime_type', 'text', 'uri')
37+
38+
Attributes Summary
39+
40+
Attribute Data type #Unique values Has empty value
41+
──────────────────────────────────────────────────────────
42+
id ('str',) 1 False
43+
mime_type ('str',) 1 False
44+
text ('str',) 1 False
45+
uri ('str',) 1 False
46+
```
47+
48+
## Accept schemes
49+
50+
{meth}`~docarray.array.mixins.post.PostMixin.post` accepts a URI-like scheme that supports a wide range of Flow/Hub Executor. It is described as below:
51+
52+
```text
53+
scheme://netloc[:port][/path]
54+
```
55+
56+
| Attribute | Supported Values | Meaning |
57+
|-----------|---------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
58+
| `scheme` | 1. One of `grpc`, `websocket`, `http` | `protocol` of the connected Flow |
59+
| | 2. One of `jinahub`, `jinahub+docker`, `jinhub+sandbox` | Jina hub executor in source code, Docker container, sandbox |
60+
| `netloc` | 1. Host address | `host` of the connected Flow |
61+
| | 2. Hub Executor name | Any Executor [listed here](https://hub.jina.ai) |
62+
| `:port` | e.g. `:55566` | `port` of the connected Flow. This is required when using `scheme` type (1) ; it is ignored when using hub-related `scheme` type (2) |
63+
| `/path` | e.g. `/foo` | The endpoint of the Executor you want to call. |
64+
65+
66+
Some examples:
67+
- `.post('websocket://localhost:8081/foo')`: call the `/foo` endpoint of the Flow on `localhost` port `8081` with `websocket` protocol to process the DocumentArray; processing is on local.
68+
- `.post('grpc://192.168.12.2:12345/foo')`: call the `/foo` endpoint of the Flow on `192.168.12.2` port `12345` with `grpc` protocol to process the DocumentArray; processing is on remote.
69+
- `.post('jinahub://Hello/foo')`: call the `/foo` endpoint of the Hub Executor `Hello` to process the DocumentArray; porcessing is on local.
70+
- `.post('jinahub+sandbox://Hello/foo')`: call the `/foo` endpoint of the Hub Sandbox `Hello` to process the DocumentArray; porcessing is on remote.
71+
72+
## Read more
73+
74+
For more explanation of Flow, Hub Executor and Sandbox, please refer to [Jina docs](https://docs.jina.ai).

docs/fundamentals/jina-support/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ They work exactly same. You will be using the same install of `docarray` in your
3636

3737
You can update DocArray package without updating Jina via `pip install -U docarray`. This often works unless otherwise specified in the release note of Jina.
3838

39+
## Direct invoke Jina/Hub Executor
40+
41+
As described {ref}`here <da-post>`, one can simply use an external Jina Flow/Executor as a regular function to process a DocumentArray.
42+
3943
## Local code as a service
4044

4145
Considering the example below, where we use DocArray to pre-process an image DocumentArray:

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
'transformers==4.16.2',
9292
'weaviate-client~=3.3.0',
9393
'annlite>=0.3.0',
94+
'jina',
9495
],
9596
},
9697
classifiers=[
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import multiprocessing
2+
import time
3+
4+
import pytest
5+
6+
from docarray import DocumentArray
7+
from docarray.helper import random_port
8+
9+
10+
@pytest.mark.parametrize(
11+
'conn_config',
12+
[
13+
(dict(protocol='grpc'), 'grpc://127.0.0.1:$port/'),
14+
(dict(protocol='grpc'), 'grpc://127.0.0.1:$port'),
15+
(dict(protocol='websocket'), 'websocket://127.0.0.1:$port'),
16+
# (dict(protocol='http'), 'http://127.0.0.1:$port'), this somehow does not work on GH workflow
17+
],
18+
)
19+
@pytest.mark.parametrize('show_pbar', [True, False])
20+
def test_post_to_a_flow(show_pbar, conn_config):
21+
from jina import Flow
22+
23+
def start_flow(stop_event, **kwargs):
24+
"""start a blocking Flow."""
25+
with Flow(**kwargs) as f:
26+
f.block(stop_event=stop_event)
27+
print('bye')
28+
29+
e = multiprocessing.Event() # create new Event
30+
31+
p = random_port()
32+
t = multiprocessing.Process(
33+
name='Blocked-Flow',
34+
target=start_flow,
35+
args=(e,),
36+
kwargs={**conn_config[0], 'port': p},
37+
)
38+
t.start()
39+
40+
time.sleep(1)
41+
42+
da = DocumentArray.empty(100)
43+
try:
44+
da.post(conn_config[1].replace('$port', str(p)))
45+
except:
46+
raise
47+
finally:
48+
e.set()
49+
t.join()
50+
time.sleep(1)
51+
52+
53+
@pytest.mark.parametrize(
54+
'hub_uri', ['jinahub://Hello', 'jinahub+docker://Hello', 'jinahub+sandbox://Hello']
55+
)
56+
def test_post_with_jinahub(hub_uri):
57+
da = DocumentArray.empty(100)
58+
da.post(hub_uri)
59+
60+
61+
def test_post_bad_scheme():
62+
da = DocumentArray.empty(100)
63+
with pytest.raises(ValueError):
64+
da.post('haha')
65+
66+
67+
def test_endpoint():
68+
from jina import Executor, requests, Flow
69+
70+
class MyExec(Executor):
71+
@requests(on='/foo')
72+
def foo(self, docs: DocumentArray, **kwargs):
73+
docs.texts = ['foo'] * len(docs)
74+
75+
@requests(on='/bar')
76+
def bar(self, docs: DocumentArray, **kwargs):
77+
docs.texts = ['bar'] * len(docs)
78+
79+
def start_flow(stop_event, **kwargs):
80+
"""start a blocking Flow."""
81+
with Flow(**kwargs).add(uses=MyExec) as f:
82+
f.block(stop_event=stop_event)
83+
84+
e = multiprocessing.Event() # create new Event
85+
86+
p = random_port()
87+
t = multiprocessing.Process(
88+
name='Blocked-Flow', target=start_flow, args=(e,), kwargs={'port': p}
89+
)
90+
t.start()
91+
92+
time.sleep(1)
93+
N = 100
94+
da = DocumentArray.empty(N)
95+
try:
96+
assert da.post(f'grpc://0.0.0.0:{p}/')[:, 'text'] == [''] * N
97+
assert da.post(f'grpc://0.0.0.0:{p}/foo').texts == ['foo'] * N
98+
assert da.post(f'grpc://0.0.0.0:{p}/bar').texts == ['bar'] * N
99+
except:
100+
raise
101+
finally:
102+
e.set()
103+
t.join()

0 commit comments

Comments
 (0)