-
-
Notifications
You must be signed in to change notification settings - Fork 181
support aioredis v2 #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support aioredis v2 #259
Conversation
Codecov Report
@@ Coverage Diff @@
## master #259 +/- ##
==========================================
- Coverage 99.68% 98.74% -0.95%
==========================================
Files 11 11
Lines 947 953 +6
Branches 151 153 +2
==========================================
- Hits 944 941 -3
- Misses 1 6 +5
- Partials 2 6 +4
Continue to review full report at Codecov.
|
Thanks so much for working on this @Yolley. I really want to get it merged and released. Do you want to work on it and great it ready for full review? Then I'll review and we can go from there. I agree the decoding situation is a mess, I assume we'd have no luck asking the maintainers of aioredis to make decoding a per-command option since they want to stick to the redis-py interface? I don't want to have two connections/pools just for this. I think we should implement our own |
I want to work on it, but I am not sure when I get the time to polish this PR. I agree with implementing our own |
Okay, it seems that it won't be that easy, they have 2 different parsers that can decode responses: |
This comment has been minimized.
This comment has been minimized.
@samuelcolvin so, I've decided to go with context manager and a context variable, so that we could decode on per-request basis (I decided to stick with context vars to not subclass almost everything from aioredis-py). This change will drop support for python 3.6 (because context vars were introduced in 3.7), also I've turned off type checks for |
@samuelcolvin I've marked this PR ready for merge, please take a look when you get the chance, I at least need to understand if my view of aioredis 2 support in arq coincides with yours. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a review of the code, but seems like you've forgot to remove a few comments.
Just letting you know that I've implemented your fork in a project, and everything works very well so far! EDIT: Except using INFO [None] app.core.arq:44 Arq worker has started.
WARNING [None] arq.worker:484 557.76s ! cron:my_example_cron_task max retries 1 exceeded
WARNING [None] arq.jobs:228 error serializing result of cron:my_example_cron_task
Traceback (most recent call last):
File ".../.venv/lib/python3.10/site-packages/arq/jobs.py", line 226, in serialize_result
return serializer(data)
File ".../.venv/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'JobExecutionFailed' object |
I tried this PR and I have this issue: when I restart Redis/Sentinel (while the worker is still running) I have this error:
It tries to read the content as UTF-8 but it's a binary pickle data. edit: I've found a way to go around this issue by fully re-creating the worker when Redis is interrupted. |
Does it only happen with Sentinel, did you restart every instance or only main? I've tried to restart a Redis (without Sentinel) instance, the worker just failed
There is no handlers for these errors in current arq release, so it's not a regression. |
Yes I use Sentinel (3 redis of which 2 replicas, 3 sentinels) edit: for people like me that have a lot of code using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall this looks great, a few fairly small things to change and one big decision on parser.py
arq/connections.py
Outdated
await pipe.unwatch() | ||
await pipe.watch(job_key) | ||
job_exists = await pipe.exists(job_key) | ||
job_result_exists = await pipe.exists(result_key_prefix + job_id) | ||
if job_exists or job_result_exists: | ||
await pipe.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we chain these commands to avoid unnecessary awaits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what exactly you wanted to chain here, unwatch and watch should be executed one after another, so I chained just 2 exists
in a gather
.
arq/parser.py
Outdated
encoder_options_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar('encoder_kwargs', default=None) | ||
|
||
|
||
class ContextAwareEncoder(Encoder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to add some docstring that explain the purpose of these classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some description to the classes.
arq/worker.py
Outdated
@@ -409,7 +409,7 @@ async def start_jobs(self, job_ids: List[str]) -> None: | |||
async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 | |||
start_ms = timestamp_ms() | |||
coros = ( | |||
self.pool.get(job_key_prefix + job_id, encoding=None), | |||
self.pool.get(job_key_prefix + job_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use a pipeline here I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, added the pipeline.
setup.cfg
Outdated
@@ -1,6 +1,6 @@ | |||
[tool:pytest] | |||
testpaths = tests | |||
timeout = 5 | |||
;timeout = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to keep something here, are some tests really taking more than 5 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I overlooked it, I added it when I was working on the changes for v2, needed to run tests without timeout. Currently in my environment only test_redis_sentinel_failure
fails because of timeout, and I think it should be expected, because with 5 retries and 1 second sleep test_redis_sentinel_failure
will always run for at least 5 seconds. I think we need to fix this test (may be decrease max retries?).
arq/parser.py
Outdated
@@ -0,0 +1,126 @@ | |||
# type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my main question about this PR (and about migrating to aioredis v2 in general).
In general, it's a real shame that the developers of aioredis 2 decided to constraint the encode/decode decision to be made on the connection/pool, not on an individual command. I assume this was so they code do (or not do) the encoding in hiredis and improve performance?
What you've written here is very cool and a sophisticated approach, but it doesn't actually avoid backwards compatibility problems, and in some ways makes the logic within the library more complex. I also wonder if it will incur a performance penalty?
It makes me sad to say it, but I wonder if we should removing this and just use .decode()
everywhere that we want a string?
Always returning bytes and forcing the user/developer to call .decode()
is definitely the explicit keep-it-simple-stupid approach.
For the tests we can just compare to bytes, no need for either with self.pool.encoder_context...
or .decode()
.
It will mean in application logic that users (including me!) have to add .decode()
in a lot of places, but the alternative is to add with self.pool.encoder_context...
in nearly as many places - there's no route that doesn't cause a big fat change to the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think their reasoning was just to port redis-py. Because redis-py does not support decode on per-request basis, they also decided not to add it. But redis-py maintainer actually thought of adding this feature, they even add to hiredis an ability to change encoding settings at runtime redis/hiredis-py#96 (and we use it in ContextAwareHiredisParser
). Also I am pretty sure that I saw a branch in redis-py
repository where they started working on the encoding context, but it seems it was removed at some point (so may be they decided against it).
In terms of performance, as I understand, changing encoding on HiRedis should not be a huge overhead in terms of performance, this method just updates parameters. But I am not sure about how much context variables handling can influence performance. I can run some tests later.
But if you really think that we should stick with simpler approcah, I can remove context encoding and return to decode
calls (basically what I did with first set of changes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They even mention encoding context manager in lock.py
https://github.com/aio-libs/aioredis-py/blob/master/aioredis/lock.py#L235 (I guess it was just copied from redis-py
).
arq/connections.py
Outdated
job_result_exists = pipe.exists(result_key_prefix + job_id) | ||
await pipe.execute() | ||
if await job_exists or await job_result_exists: | ||
await pipe.unwatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can remove unwatch
? It was required since with aioredis<2 connections often still had a watch in place from previous commands
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, ran tests several times, seems to be working fine without unwatch.
arq/connections.py
Outdated
await tr.execute() | ||
except MultiExecError: | ||
await pipe.execute() | ||
except (ResponseError, WatchError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need to catch ResponseError
here too? This case is specifically for when the watched
key get's set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure that at some point in my tests I got this error, now I can't reproduce it, removed it from except block.
This comment has been minimized.
This comment has been minimized.
@PerAStad please can you move this to a new issue, optionally referencing this PR. |
Just a note to tell that I've used Arq + aioredis2 for 2 months now in production and it works well. |
I've added a conversion for expire parameters to milliseconds, because aioredis-py v2 requires EX or PX to be provided as integer, so I just decided to convert floats to milliseconds and pass it as PX to aioredis calls. Also added a test for it (will fail without these changes). |
This is my attempt at aioredis2 support, I've made these changes while aioredis was in beta, but they are 100% compatible with release version. All tests pass, but I needed to change some of them, there was an issue with pipe, I did not manage to reproduce multi-exec error, so I commented 2 asserts in
test_multi_exec
. Also because there is no way to decode response per call in aioredis2, I added.decode()
in some places, I am not happy with this solution, may be we need to have 2 separate clients: one for pickled objects fetch, one for everything else, I am not sure. Also with this pr we will totally drop support for aioredis v1. Overall I was making this feature in a bit of a rush, therefore it will be a draft pr, may be it can become a base for the actual pr/release.