Skip to content

support aioredis v2 #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Mar 9, 2022
Merged

support aioredis v2 #259

merged 22 commits into from
Mar 9, 2022

Conversation

Yolley
Copy link
Contributor

@Yolley Yolley commented Aug 7, 2021

This is my attempt at aioredis2 support, I've made these changes while aioredis was in beta, but they are 100% compatible with release version. All tests pass, but I needed to change some of them, there was an issue with pipe, I did not manage to reproduce multi-exec error, so I commented 2 asserts in test_multi_exec . Also because there is no way to decode response per call in aioredis2, I added .decode() in some places, I am not happy with this solution, may be we need to have 2 separate clients: one for pickled objects fetch, one for everything else, I am not sure. Also with this pr we will totally drop support for aioredis v1. Overall I was making this feature in a bit of a rush, therefore it will be a draft pr, may be it can become a base for the actual pr/release.

@codecov
Copy link

codecov bot commented Sep 2, 2021

Codecov Report

Merging #259 (c3319b7) into master (fe185b7) will decrease coverage by 0.94%.
The diff coverage is 91.56%.

❗ Current head c3319b7 differs from pull request most recent head c9d6b64. Consider uploading reports for the commit c9d6b64 to get more accurate results

@@            Coverage Diff             @@
##           master     #259      +/-   ##
==========================================
- Coverage   99.68%   98.74%   -0.95%     
==========================================
  Files          11       11              
  Lines         947      953       +6     
  Branches      151      153       +2     
==========================================
- Hits          944      941       -3     
- Misses          1        6       +5     
- Partials        2        6       +4     
Impacted Files Coverage Δ
arq/jobs.py 97.90% <66.66%> (-1.39%) ⬇️
arq/connections.py 95.52% <87.50%> (-4.48%) ⬇️
arq/worker.py 99.33% <100.00%> (-0.23%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fe185b7...c9d6b64. Read the comment docs.

@samuelcolvin
Copy link
Member

Thanks so much for working on this @Yolley. I really want to get it merged and released.

Do you want to work on it and great it ready for full review? Then I'll review and we can go from there.

I agree the decoding situation is a mess, I assume we'd have no luck asking the maintainers of aioredis to make decoding a per-command option since they want to stick to the redis-py interface?

I don't want to have two connections/pools just for this. I think we should implement our own get command with an encoding argument, but I haven't played with aioredis 2 enough yet to really be sure.

@Yolley
Copy link
Contributor Author

Yolley commented Sep 5, 2021

Thanks so much for working on this @Yolley. I really want to get it merged and released.

Do you want to work on it and great it ready for full review? Then I'll review and we can go from there.

I agree the decoding situation is a mess, I assume we'd have no luck asking the maintainers of aioredis to make decoding a per-command option since they want to stick to the redis-py interface?

I don't want to have two connections/pools just for this. I think we should implement our own get command with an encoding argument, but I haven't played with aioredis 2 enough yet to really be sure.

I want to work on it, but I am not sure when I get the time to polish this PR. I agree with implementing our own get method in ArqRedis (at least until aioredis-py supports it in some way, if they agree to do so).

@Yolley
Copy link
Contributor Author

Yolley commented Sep 6, 2021

Okay, it seems that it won't be that easy, they have 2 different parsers that can decode responses: PythonParser that uses Encoder https://github.com/aio-libs/aioredis-py/blob/master/aioredis/connection.py#L342, Encoder has force flag https://github.com/aio-libs/aioredis-py/blob/master/aioredis/connection.py#L131 that we can try to utilize, but read_response in PythonParser (it decodes the response, calls Encoder) does not accept any parameters, so we will need to write a subclass of this parser it seems.
And there is also HiredisParser https://github.com/aio-libs/aioredis-py/blob/master/aioredis/connection.py#L412, it uses encoding from hiredis, so it seems that we will also need to write our wrapper for hiredis responses, that will actually decode the response (that's how they did it in aioredis-py v1 it seems https://github.com/aio-libs/aioredis-py/blob/62a75ba8cac8236eff6836248e1be7b68af1b37a/aioredis/parser.py#L103 )

@sondrelg

This comment has been minimized.

@Yolley
Copy link
Contributor Author

Yolley commented Oct 5, 2021

@samuelcolvin so, I've decided to go with context manager and a context variable, so that we could decode on per-request basis (I decided to stick with context vars to not subclass almost everything from aioredis-py). This change will drop support for python 3.6 (because context vars were introduced in 3.7), also I've turned off type checks for parser.py, because it contains subclasses of aioredis parser, and currently aioredis is poorly annotated. All tests pass, but I guess I will need to add some more tests (may be for hiredis and for context encoding itself), but first please take a look at my changes and tell, if you are okay with this approach.

samtx added a commit to samtx/passpredict-app that referenced this pull request Oct 19, 2021
@Yolley Yolley marked this pull request as ready for review November 8, 2021 11:23
@Yolley
Copy link
Contributor Author

Yolley commented Nov 8, 2021

@samuelcolvin I've marked this PR ready for merge, please take a look when you get the chance, I at least need to understand if my view of aioredis 2 support in arq coincides with yours.

Copy link
Collaborator

@JonasKs JonasKs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a review of the code, but seems like you've forgot to remove a few comments.

@JonasKs
Copy link
Collaborator

JonasKs commented Nov 10, 2021

Just letting you know that I've implemented your fork in a project, and everything works very well so far!

EDIT: Except using msgpack when tasks fails, but I don't think that's connected to this PR:

INFO      [None] app.core.arq:44   Arq worker has started.
WARNING   [None] arq.worker:484    557.76s ! cron:my_example_cron_task max retries 1 exceeded
WARNING   [None] arq.jobs:228      error serializing result of cron:my_example_cron_task
Traceback (most recent call last):
  File ".../.venv/lib/python3.10/site-packages/arq/jobs.py", line 226, in serialize_result
    return serializer(data)
  File ".../.venv/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'JobExecutionFailed' object

@elrik75
Copy link

elrik75 commented Nov 21, 2021

I tried this PR and I have this issue: when I restart Redis/Sentinel (while the worker is still running) I have this error:

Traceback (most recent call last):
  File "./catwalk/redis/rpc.py", line 105, in launch_worker
    await worker.main()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 308, in main
    await self._poll_iteration()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 347, in _poll_iteration
    t.result()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 419, in run_job
    v, job_try, _ = await asyncio.gather(*coros)
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/client.py", line 1064, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/client.py", line 1080, in parse_response
    response = await connection.read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 65, in read_response
    return await super().read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/connection.py", line 854, in read_response
    response = await self._parser.read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/parser.py", line 110, in read_response
    response = self._reader.gets()
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

It tries to read the content as UTF-8 but it's a binary pickle data.

edit: I've found a way to go around this issue by fully re-creating the worker when Redis is interrupted.

@Yolley
Copy link
Contributor Author

Yolley commented Nov 24, 2021

I tried this PR and I have this issue: when I restart Redis/Sentinel (while the worker is still running) I have this error:

Traceback (most recent call last):
  File "./catwalk/redis/rpc.py", line 105, in launch_worker
    await worker.main()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 308, in main
    await self._poll_iteration()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 347, in _poll_iteration
    t.result()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/worker.py", line 419, in run_job
    v, job_try, _ = await asyncio.gather(*coros)
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/client.py", line 1064, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/client.py", line 1080, in parse_response
    response = await connection.read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/sentinel.py", line 65, in read_response
    return await super().read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/aioredis/connection.py", line 854, in read_response
    response = await self._parser.read_response()
  File "/home/elrik/Info/SecretSauce/cat-walk/venv/lib/python3.7/site-packages/arq/parser.py", line 110, in read_response
    response = self._reader.gets()
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

It tries to read the content as UTF-8 but it's a binary pickle data.

edit: I've found a way to go around this issue by fully re-creating the worker when Redis is interrupted.

Does it only happen with Sentinel, did you restart every instance or only main? I've tried to restart a Redis (without Sentinel) instance, the worker just failed

aioredis.exceptions.ConnectionError: Error connecting to localhost:6379. Multiple exceptions: [Errno 61] Connect call failed ('::1', 6379, 0, 0), [Errno 61] Connect call failed ('127.0.0.1', 6379).

There is no handlers for these errors in current arq release, so it's not a regression.

@elrik75
Copy link

elrik75 commented Nov 24, 2021

Yes I use Sentinel (3 redis of which 2 replicas, 3 sentinels)
Basically I cannot stop my process because redis/sentinel fails, I have 3 instances and Sentinel will do its job and I will have a new Redis Master soon. So I have a system that tries to reconnect to redis every second. It worked well until now:
with Arq, I used a task that relaunch worker.main() after a redis error. But I got the issue. As @samuelcolvin wrote about encoding issue with aioredis2, I thought it was maybe an issue of this PR. In fact, I did not try with aioredis1.3 as all my Redis code will fail (I use a lot of Redis, not only Arq).
As I said, I found a way that works well by re-creating all the workers after a Redis disconnection.

edit: for people like me that have a lot of code using aioredis2 are dependent of this PR. So thank you very much!
edit: note that I didn't noticed any other issue for now with your PR

@euri10 euri10 mentioned this pull request Dec 2, 2021
Copy link
Member

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall this looks great, a few fairly small things to change and one big decision on parser.py

Comment on lines 148 to 153
await pipe.unwatch()
await pipe.watch(job_key)
job_exists = await pipe.exists(job_key)
job_result_exists = await pipe.exists(result_key_prefix + job_id)
if job_exists or job_result_exists:
await pipe.reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we chain these commands to avoid unnecessary awaits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what exactly you wanted to chain here, unwatch and watch should be executed one after another, so I chained just 2 exists in a gather.

arq/parser.py Outdated
encoder_options_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar('encoder_kwargs', default=None)


class ContextAwareEncoder(Encoder):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to add some docstring that explain the purpose of these classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some description to the classes.

arq/worker.py Outdated
@@ -409,7 +409,7 @@ async def start_jobs(self, job_ids: List[str]) -> None:
async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
start_ms = timestamp_ms()
coros = (
self.pool.get(job_key_prefix + job_id, encoding=None),
self.pool.get(job_key_prefix + job_id),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use a pipeline here I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, added the pipeline.

setup.cfg Outdated
@@ -1,6 +1,6 @@
[tool:pytest]
testpaths = tests
timeout = 5
;timeout = 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to keep something here, are some tests really taking more than 5 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I overlooked it, I added it when I was working on the changes for v2, needed to run tests without timeout. Currently in my environment only test_redis_sentinel_failure fails because of timeout, and I think it should be expected, because with 5 retries and 1 second sleep test_redis_sentinel_failure will always run for at least 5 seconds. I think we need to fix this test (may be decrease max retries?).

arq/parser.py Outdated
@@ -0,0 +1,126 @@
# type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my main question about this PR (and about migrating to aioredis v2 in general).

In general, it's a real shame that the developers of aioredis 2 decided to constraint the encode/decode decision to be made on the connection/pool, not on an individual command. I assume this was so they code do (or not do) the encoding in hiredis and improve performance?

What you've written here is very cool and a sophisticated approach, but it doesn't actually avoid backwards compatibility problems, and in some ways makes the logic within the library more complex. I also wonder if it will incur a performance penalty?

It makes me sad to say it, but I wonder if we should removing this and just use .decode() everywhere that we want a string?

Always returning bytes and forcing the user/developer to call .decode() is definitely the explicit keep-it-simple-stupid approach.

For the tests we can just compare to bytes, no need for either with self.pool.encoder_context... or .decode().

It will mean in application logic that users (including me!) have to add .decode() in a lot of places, but the alternative is to add with self.pool.encoder_context... in nearly as many places - there's no route that doesn't cause a big fat change to the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think their reasoning was just to port redis-py. Because redis-py does not support decode on per-request basis, they also decided not to add it. But redis-py maintainer actually thought of adding this feature, they even add to hiredis an ability to change encoding settings at runtime redis/hiredis-py#96 (and we use it in ContextAwareHiredisParser). Also I am pretty sure that I saw a branch in redis-py repository where they started working on the encoding context, but it seems it was removed at some point (so may be they decided against it).

In terms of performance, as I understand, changing encoding on HiRedis should not be a huge overhead in terms of performance, this method just updates parameters. But I am not sure about how much context variables handling can influence performance. I can run some tests later.

But if you really think that we should stick with simpler approcah, I can remove context encoding and return to decode calls (basically what I did with first set of changes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They even mention encoding context manager in lock.py https://github.com/aio-libs/aioredis-py/blob/master/aioredis/lock.py#L235 (I guess it was just copied from redis-py).

job_result_exists = pipe.exists(result_key_prefix + job_id)
await pipe.execute()
if await job_exists or await job_result_exists:
await pipe.unwatch()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can remove unwatch? It was required since with aioredis<2 connections often still had a watch in place from previous commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, ran tests several times, seems to be working fine without unwatch.

await tr.execute()
except MultiExecError:
await pipe.execute()
except (ResponseError, WatchError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to catch ResponseError here too? This case is specifically for when the watched key get's set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure that at some point in my tests I got this error, now I can't reproduce it, removed it from except block.

@ghost

This comment has been minimized.

@samuelcolvin samuelcolvin mentioned this pull request Jan 26, 2022
@samuelcolvin
Copy link
Member

@PerAStad please can you move this to a new issue, optionally referencing this PR.

@ghost ghost mentioned this pull request Jan 26, 2022
@elrik75
Copy link

elrik75 commented Jan 26, 2022

Just a note to tell that I've used Arq + aioredis2 for 2 months now in production and it works well.

@Yolley
Copy link
Contributor Author

Yolley commented Feb 1, 2022

I've added a conversion for expire parameters to milliseconds, because aioredis-py v2 requires EX or PX to be provided as integer, so I just decided to convert floats to milliseconds and pass it as PX to aioredis calls. Also added a test for it (will fail without these changes).

@JonasKs JonasKs mentioned this pull request Mar 5, 2022
@samuelcolvin samuelcolvin merged commit 77031fd into python-arq:master Mar 9, 2022
@samuelcolvin
Copy link
Member

thank you so much @Yolley for all your work on this. Since it's now passing I've merge this PR, although there's a bit more work to do before a release. I'll comment on #250.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants