Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
azhavoro committed Nov 14, 2024
1 parent 06125ca commit 1831cc4
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 81 deletions.
124 changes: 76 additions & 48 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import rq
from django.conf import settings
from django.core.cache import caches
from django.db import models as django_models
from django.utils import timezone as django_tz
from redis.exceptions import LockError
from rest_framework.exceptions import NotFound, ValidationError
Expand Down Expand Up @@ -76,6 +77,46 @@ class CvatCacheTimestampMismatchError(Exception):
pass


def enqueue_create_chunk_job(
queue: rq.Queue,
rq_job_id: str,
create_callback: Callback,
*,
blocking_timeout: int = 50,
rq_job_result_ttl: int = 60,
rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks
):
try:
with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout):
rq_job = queue.fetch_job(rq_job_id)

if not rq_job:
rq_job = queue.enqueue(
create_callback,
job_id=rq_job_id,
result_ttl=rq_job_result_ttl,
failure_ttl=rq_job_failure_ttl,
)
except LockError:
raise TimeoutError(f"Cannot acquire lock for {rq_job_id}")

retries = settings.CVAT_CHUNK_CREATE_TIMEOUT // settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL or 1
while retries > 0:
job_status = rq_job.get_status()
if job_status in ("finished",):
return
elif job_status in ("failed",):
job_meta = rq_job.get_meta()
exc_type = job_meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception)
exc_args = job_meta.get(RQJobMetaField.EXCEPTION_ARGS, ("Cannot create chunk",))
raise exc_type(*exc_args)

time.sleep(settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL)
retries -= 1

raise TimeoutError(f"Chunk processing takes too long {rq_job_id}")


def _is_run_inside_rq() -> bool:
return rq.get_current_job() is not None

Expand All @@ -88,7 +129,7 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]:
else:
if isinstance(
func_arg,
(models.Task, models.Segment, models.Job, models.CloudStorage, models.Data),
django_models.Model,
):
result.append(func_arg.id)
elif isinstance(func_arg, list):
Expand All @@ -98,24 +139,34 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]:

return result


@attrs.define
class Callback:
_callable: Callable[..., DataWithMime]
_args: list[Any] = attrs.field(converter=_convert_args_for_callback, factory=list)
_kwargs: dict[str, Any] = attrs.Factory(dict)
_callable: Callable[..., DataWithMime] = attrs.field(
validator=attrs.validators.is_callable(),
)
_args: list[Any] = attrs.field(
factory=list,
validator=attrs.validators.instance_of(list),
converter=_convert_args_for_callback,
)
_kwargs: dict[str, Union[bool, int, float, str, None]] = attrs.field(
factory=dict,
validator=attrs.validators.deep_mapping(
key_validator=attrs.validators.instance_of(str),
value_validator=attrs.validators.instance_of((bool, int, float, str, type(None))),
mapping_validator=attrs.validators.instance_of(dict),
),
)

def __call__(self) -> DataWithMime:
return self._callable(*self._args, **self._kwargs)


class MediaCache:
_QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value
_QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-"
_SLEEP_TIMEOUT = settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL
_CHUNK_CREATE_TIMEOUT = settings.CVAT_CHUNK_CREATE_TIMEOUT
_CACHE_NAME = "media"
_LOCK_TIMEOUT = 50
_RQ_JOB_RESULT_TTL = 60
_RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks
_PREVIEW_TTL = settings.CVAT_PREVIEW_CACHE_TTL

@staticmethod
Expand Down Expand Up @@ -170,23 +221,6 @@ def _create_and_set_cache_item(

return item

def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool:
retries = self._CHUNK_CREATE_TIMEOUT // self._SLEEP_TIMEOUT or 1
while retries > 0:
job_status = rq_job.get_status()
if job_status in ("finished",):
return True
elif job_status in ("failed",):
job_meta = rq_job.get_meta()
exc_type = job_meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception)
exc_args = job_meta.get(RQJobMetaField.EXCEPTION_ARGS, ("Cannot create chunk",))
raise exc_type(*exc_args)

time.sleep(self._SLEEP_TIMEOUT)
retries -= 1

return False

def _create_cache_item(
self,
key: str,
Expand All @@ -207,28 +241,22 @@ def _create_cache_item(
cache_item_ttl=cache_item_ttl,
)
else:
try:
with get_rq_lock_for_job(queue, rq_id, blocking_timeout=self._LOCK_TIMEOUT):
rq_job = queue.fetch_job(rq_id)

if not rq_job:
rq_job = queue.enqueue(
self._drop_return_value,
self._create_and_set_cache_item,
key,
create_callback,
cache_item_ttl=cache_item_ttl,
job_id=rq_id,
result_ttl=self._RQ_JOB_RESULT_TTL,
failure_ttl=self._RQ_JOB_FAILURE_TTL,
)
except LockError:
raise TimeoutError(f"Cannot acquire lock for {key}")

if self._wait_for_rq_job(rq_job):
item = self._get_cache_item(key)
else:
raise TimeoutError(f"Chunk processing takes too long {key}")
enqueue_create_chunk_job(
queue=queue,
rq_job_id=rq_id,
create_callback=Callback(
callable=self._drop_return_value,
args=[
self._create_and_set_cache_item,
key,
create_callback,
],
kwargs={
"cache_item_ttl": cache_item_ttl,
},
),
)
item = self._get_cache_item(key)

slogger.glob.info(f"Ending to prepare chunk: key {key}")

Expand Down
98 changes: 65 additions & 33 deletions cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import re
import shutil
import string
import django_rq
import rq.defaults as rq_defaults

from tempfile import NamedTemporaryFile
Expand All @@ -22,13 +23,14 @@
from decimal import Decimal

from rest_framework import serializers, exceptions
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.db import transaction
from django.utils import timezone
from numpy import random

from cvat.apps.dataset_manager.formats.utils import get_label_color
from cvat.apps.engine.frame_provider import TaskFrameProvider
from cvat.apps.engine.frame_provider import TaskFrameProvider, FrameQuality
from cvat.apps.engine.utils import format_list, parse_exception_message
from cvat.apps.engine import field_validation, models
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status
Expand Down Expand Up @@ -980,8 +982,8 @@ def validate(self, attrs):

@transaction.atomic
def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job:
from cvat.apps.engine.cache import MediaCache
from cvat.apps.engine.frame_provider import FrameQuality, JobFrameProvider, prepare_chunk
from cvat.apps.engine.cache import MediaCache, Callback, enqueue_chunk_create_job
from cvat.apps.engine.frame_provider import JobFrameProvider
from cvat.apps.dataset_manager.task import JobAnnotation, AnnotationManager

db_job = instance
Expand Down Expand Up @@ -1129,7 +1131,6 @@ def _to_rel_frame(abs_frame: int) -> int:
job_annotation.delete(job_annotation_manager.data)

# Update chunks
task_frame_provider = TaskFrameProvider(db_task)
job_frame_provider = JobFrameProvider(db_job)
updated_segment_chunk_ids = set(
job_frame_provider.get_chunk_number(updated_segment_frame_id)
Expand All @@ -1138,44 +1139,34 @@ def _to_rel_frame(abs_frame: int) -> int:
segment_frames = sorted(segment_frame_set)
segment_frame_map = dict(zip(segment_honeypots, requested_frames))

media_cache = MediaCache()
queue = django_rq.get_queue(settings.CVAT_QUEUES.CHUNKS.value)
for chunk_id in sorted(updated_segment_chunk_ids):
chunk_frames = segment_frames[
chunk_id * db_data.chunk_size :
(chunk_id + 1) * db_data.chunk_size
]

for quality in FrameQuality.__members__.values():
def _write_updated_static_chunk():
def _iterate_chunk_frames():
for chunk_frame in chunk_frames:
db_frame = all_task_frames[chunk_frame]
chunk_real_frame = segment_frame_map.get(chunk_frame, chunk_frame)
yield (
task_frame_provider.get_frame(
chunk_real_frame, quality=quality
).data,
os.path.basename(db_frame.path),
chunk_frame,
)

with closing(_iterate_chunk_frames()) as frame_iter:
chunk, _ = prepare_chunk(
frame_iter, quality=quality, db_task=db_task, dump_unchanged=True,
)

get_chunk_path = {
FrameQuality.COMPRESSED: db_data.get_compressed_segment_chunk_path,
FrameQuality.ORIGINAL: db_data.get_original_segment_chunk_path,
}[quality]

with open(get_chunk_path(chunk_id, db_segment.id), 'wb') as f:
f.write(chunk.getvalue())

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM:
_write_updated_static_chunk()
rq_id = f"job_{db_segment.id}_write_chunk_{chunk_id}_{quality}"
enqueue_chunk_create_job(
queue=queue,
rq_job_id=rq_id,
create_callback=Callback(
callable=self._write_updated_static_chunk,
args=[
db_task.id,
db_segment.id,
chunk_id,
chunk_frames,
quality,
{chunk_frame: all_task_frames[chunk_frame].path for chunk_frame in chunk_frames},
segment_frame_map,
],
),
)

media_cache.remove_segment_chunk(db_segment, chunk_id, quality=quality)
MediaCache().remove_segment_chunk(db_segment, chunk_id, quality=quality)

db_segment.chunks_updated_date = timezone.now()
db_segment.save(update_fields=['chunks_updated_date'])
Expand All @@ -1199,6 +1190,47 @@ def _iterate_chunk_frames():

return instance

@staticmethod
def _write_updated_static_chunk(
db_task_id: int,
db_segment_id: int,
chunk_id: int,
chunk_frames: list[int],
quality: FrameQuality,
frame_path_map: dict[int, str],
segment_frame_map: dict[int,int]
):
from cvat.apps.engine.frame_provider import prepare_chunk

db_task = models.Task.objects.get(pk=db_task_id)
task_frame_provider = TaskFrameProvider(db_task)
db_data = db_task.data

def _iterate_chunk_frames():
for chunk_frame in chunk_frames:
db_frame_path = frame_path_map[chunk_frame]
chunk_real_frame = segment_frame_map.get(chunk_frame, chunk_frame)
yield (
task_frame_provider.get_frame(
chunk_real_frame, quality=quality
).data,
os.path.basename(db_frame_path),
chunk_frame,
)

with closing(_iterate_chunk_frames()) as frame_iter:
chunk, _ = prepare_chunk(
frame_iter, quality=quality, db_task=db_task, dump_unchanged=True,
)

get_chunk_path = {
FrameQuality.COMPRESSED: db_data.get_compressed_segment_chunk_path,
FrameQuality.ORIGINAL: db_data.get_original_segment_chunk_path,
}[quality]

with open(get_chunk_path(chunk_id, db_segment_id), 'wb') as f:
f.write(chunk.getvalue())

class JobValidationLayoutReadSerializer(serializers.Serializer):
honeypot_count = serializers.IntegerField(min_value=0, required=False)
honeypot_frames = serializers.ListField(
Expand Down

0 comments on commit 1831cc4

Please sign in to comment.