Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare chunks in a worker process #8618

Merged
merged 24 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed comments
  • Loading branch information
azhavoro committed Nov 11, 2024
commit 353a44e5d51c703c255a87f137e5d40242eae9bc
149 changes: 82 additions & 67 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
overload,
)

import attrs
import av
import cv2
import django_rq
Expand Down Expand Up @@ -75,13 +76,48 @@ class CvatCacheTimestampMismatchError(Exception):
pass


def _is_run_inside_rq() -> bool:
return rq.get_current_job() is not None


def _convert_args_for_callback(func_args: list[Any]) -> list[Any]:
result = []
for func_arg in func_args:
if _is_run_inside_rq():
result.append(func_arg)
else:
if isinstance(
func_arg,
(models.Task, models.Segment, models.Job, models.CloudStorage, models.Data),
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
):
result.append(func_arg.id)
elif isinstance(func_arg, list):
result.append(_convert_args_for_callback(func_arg))
else:
result.append(func_arg)

return result


@attrs.define
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
class Callback:
_callable: Callable[..., DataWithMime]
_args: list[Any] = attrs.field(converter=_convert_args_for_callback, factory=list)
_kwargs: dict[str, Any] = attrs.Factory(dict)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def __call__(
self,
) -> Any:
return self._callable(*self._args, **self._kwargs)


class MediaCache:
_QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value
_QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-"
_SLEEP_TIMEOUT = settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL
_CHUNK_CREATE_TIMEOUT = settings.CVAT_CHUNK_CREATE_TIMEOUT
_CACHE_NAME = "media"
_LOCK_TIMEOUT = 5
_LOCK_TIMEOUT = 50
_RQ_JOB_RESULT_TTL = 60
_RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks
_PREVIEW_TTL = settings.CVAT_PREVIEW_CACHE_TTL
Expand All @@ -97,10 +133,9 @@ def _get_checksum(value: bytes) -> int:
def _get_or_set_cache_item(
self,
key: str,
create_callback: Callable[..., DataWithMime],
*args: Any,
create_callback: Callback,
*,
cache_item_ttl: Optional[int] = None,
**kwargs: Any,
) -> _CacheItem:
item = self._get_cache_item(key)
if item:
Expand All @@ -109,9 +144,7 @@ def _get_or_set_cache_item(
return self._create_cache_item(
key,
create_callback,
*args,
cache_item_ttl=cache_item_ttl,
**kwargs,
)

def _get_queue(self) -> rq.Queue:
Expand All @@ -128,13 +161,11 @@ def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs:
def _create_and_set_cache_item(
cls,
key: str,
create_callback: Callable[..., DataWithMime],
*args: Any,
create_callback: Callback,
cache_item_ttl: Optional[int] = None,
**kwargs: Any,
) -> DataWithMime:
timestamp = django_tz.now()
item_data = create_callback(*args, **kwargs)
item_data = create_callback()
item_data_bytes = item_data[0].getvalue()
item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp)
if item_data_bytes:
Expand Down Expand Up @@ -163,24 +194,21 @@ def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool:
def _create_cache_item(
self,
key: str,
create_callback: Callable[..., DataWithMime],
*args: Any,
create_callback: Callback,
*,
cache_item_ttl: Optional[int] = None,
**kwargs: Any,
) -> _CacheItem:

queue = self._get_queue()
rq_id = self._make_queue_job_id(key)

slogger.glob.info(f"Starting to prepare chunk: key {key}")
if self._is_run_inside_rq():
if _is_run_inside_rq():
with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None):
item = self._create_and_set_cache_item(
key,
create_callback,
*args,
cache_item_ttl=cache_item_ttl,
**kwargs,
)
else:
try:
Expand All @@ -193,9 +221,7 @@ def _create_cache_item(
self._create_and_set_cache_item,
key,
create_callback,
*args,
cache_item_ttl=cache_item_ttl,
**kwargs,
job_id=rq_id,
result_ttl=self._RQ_JOB_RESULT_TTL,
failure_ttl=self._RQ_JOB_FAILURE_TTL,
Expand Down Expand Up @@ -237,12 +263,13 @@ def _get_cache_item(self, key: str) -> Optional[_CacheItem]:

return item

def _validate_cache_item_timestamp(self, item: _CacheItem, expected_timestamp: datetime):
if item:
if item[3] < expected_timestamp:
raise CvatCacheTimestampMismatchError(
f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}"
)
def _validate_cache_item_timestamp(
self, item: _CacheItem, expected_timestamp: datetime
) -> _CacheItem:
if item[3] < expected_timestamp:
raise CvatCacheTimestampMismatchError(
f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}"
)

return item

Expand Down Expand Up @@ -302,18 +329,17 @@ def _to_data_with_mime(self, cache_item: Optional[_CacheItem]) -> Optional[DataW

return cache_item[:2]

def _is_run_inside_rq(self) -> bool:
return rq.get_current_job() is not None

def get_or_set_segment_chunk(
self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality
) -> DataWithMime:

item = self._get_or_set_cache_item(
self._make_chunk_key(db_segment, chunk_number, quality=quality),
self.prepare_segment_chunk,
db_segment,
chunk_number,
quality=quality,
Callback(
callable=self.prepare_segment_chunk,
args=[db_segment, chunk_number],
kwargs={"quality": quality},
),
)
db_segment.refresh_from_db(fields=["chunks_updated_date"])

Expand All @@ -334,22 +360,14 @@ def get_or_set_task_chunk(
self,
db_task: models.Task,
chunk_number: int,
set_callback: Callback,
*,
quality: FrameQuality,
set_callback: Callable[..., DataWithMime],
set_callback_args: Union[list[Any], None] = None,
set_callback_kwargs: Union[dict[str, Any], None] = None,
) -> DataWithMime:
if set_callback_args is None:
set_callback_args = []
if set_callback_kwargs is None:
set_callback_kwargs = {}

item = self._get_or_set_cache_item(
self._make_chunk_key(db_task, chunk_number, quality=quality),
set_callback,
*set_callback_args,
**set_callback_kwargs,
)
db_task.refresh_from_db(fields=["segment_set"])

Expand All @@ -372,20 +390,12 @@ def get_or_set_segment_task_chunk(
chunk_number: int,
*,
quality: FrameQuality,
set_callback: Callable[..., DataWithMime],
set_callback_args: Union[list[Any], None] = None,
set_callback_kwargs: Union[dict[str, Any], None] = None,
set_callback: Callback,
) -> DataWithMime:
if set_callback_args is None:
set_callback_args = []
if set_callback_kwargs is None:
set_callback_kwargs = {}

item = self._get_or_set_cache_item(
self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality),
set_callback,
*set_callback_args,
**set_callback_kwargs,
)
db_segment.refresh_from_db(fields=["chunks_updated_date"])

Expand All @@ -399,19 +409,24 @@ def get_or_set_selective_job_chunk(
return self._to_data_with_mime(
self._get_or_set_cache_item(
self._make_chunk_key(db_job, chunk_number, quality=quality),
self.prepare_masked_range_segment_chunk,
self._make_callback_db_object_arg(db_job.segment),
chunk_number,
quality=quality,
Callback(
callable=self.prepare_masked_range_segment_chunk,
args=[db_job.segment, chunk_number],
kwargs={
"quality": quality,
},
),
)
)

def get_or_set_segment_preview(self, db_segment: models.Segment) -> DataWithMime:
return self._to_data_with_mime(
self._get_or_set_cache_item(
self._make_preview_key(db_segment),
self._prepare_segment_preview,
self._make_callback_db_object_arg(db_segment),
Callback(
callable=self._prepare_segment_preview,
args=[db_segment],
),
cache_item_ttl=self._PREVIEW_TTL,
)
)
Expand All @@ -423,21 +438,17 @@ def remove_segment_chunk(
self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality)
)

def _make_callback_db_object_arg(
self,
db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage],
) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]:
return db_obj if self._is_run_inside_rq() else db_obj.id

def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]:
return self._to_data_with_mime(self._get_cache_item(self._make_preview_key(db_storage)))

def get_or_set_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithMime:
return self._to_data_with_mime(
self._get_or_set_cache_item(
self._make_preview_key(db_storage),
self._prepare_cloud_preview,
self._make_callback_db_object_arg(db_storage),
Callback(
callable=self._prepare_cloud_preview,
args=[db_storage],
),
cache_item_ttl=self._PREVIEW_TTL,
)
)
Expand All @@ -448,9 +459,10 @@ def get_or_set_frame_context_images_chunk(
return self._to_data_with_mime(
self._get_or_set_cache_item(
self._make_context_image_preview_key(db_data, frame_number),
self.prepare_context_images_chunk,
self._make_callback_db_object_arg(db_data),
frame_number,
Callback(
callable=self.prepare_context_images_chunk,
args=[db_data, frame_number],
),
)
)

Expand Down Expand Up @@ -585,8 +597,11 @@ def _read_raw_frames(
yield from MediaCache._read_raw_images(db_task, frame_ids, manifest_path=manifest_path)

def prepare_segment_chunk(
self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality
self, db_segment: Union[models.Segment, int], chunk_number: int, *, quality: FrameQuality
) -> DataWithMime:
if isinstance(db_segment, int):
db_segment = models.Segment.objects.get(pk=db_segment)

if db_segment.type == models.SegmentType.RANGE:
return self.prepare_range_segment_chunk(db_segment, chunk_number, quality=quality)
elif db_segment.type == models.SegmentType.SPECIFIC_FRAMES:
Expand Down
40 changes: 25 additions & 15 deletions cvat/apps/engine/frame_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from rest_framework.exceptions import ValidationError

from cvat.apps.engine import models
from cvat.apps.engine.cache import DataWithMime, MediaCache, prepare_chunk
from cvat.apps.engine.cache import Callback, DataWithMime, MediaCache, prepare_chunk
from cvat.apps.engine.media_extractors import (
FrameQuality,
IMediaReader,
Expand Down Expand Up @@ -315,20 +315,25 @@ def get_chunk(
self._db_task,
chunk_number,
quality=quality,
set_callback=self._get_chunk_create_callback,
set_callback_args=(
cache._make_callback_db_object_arg(self._db_task),
[cache._make_callback_db_object_arg(s) for s in matching_segments],
{f: self.get_rel_frame_number(f) for f in task_chunk_frame_set},
quality,
set_callback=Callback(
callable=self._get_chunk_create_callback,
args=[
self._db_task,
matching_segments,
{f: self.get_rel_frame_number(f) for f in task_chunk_frame_set},
quality,
],
),
)

return return_type(data=buffer, mime=mime_type)

@staticmethod
def _get_chunk_create_callback(
db_task: models.Task | int, matching_segments, task_chunk_frames_with_rel_numbers, quality
db_task: Union[models.Task, int],
matching_segments: list[models.Segment],
task_chunk_frames_with_rel_numbers: dict[int, int],
quality: FrameQuality,
) -> DataWithMime:
# Create and return a joined / cleaned chunk
task_chunk_frames = OrderedDict()
Expand Down Expand Up @@ -687,20 +692,25 @@ def get_chunk(
self._db_segment,
chunk_number,
quality=quality,
set_callback=self._get_chunk_create_callback,
set_callback_args=(
cache._make_callback_db_object_arg(self._db_segment),
segment_chunk_frame_ids,
chunk_number,
quality,
set_callback=Callback(
callable=self._get_chunk_create_callback,
args=[
self._db_segment,
segment_chunk_frame_ids,
chunk_number,
quality,
],
),
)

return return_type(data=buffer, mime=mime_type)

@staticmethod
def _get_chunk_create_callback(
db_segment: models.Segment | int, segment_chunk_frame_ids, chunk_number, quality
db_segment: Union[models.Segment, int],
segment_chunk_frame_ids: list[int],
chunk_number: int,
quality: FrameQuality,
) -> DataWithMime:
# Create and return a joined / cleaned chunk
if isinstance(db_segment, int):
Expand Down
Loading
Loading