Skip to content

Commit b155a9f

Browse files
janbucharvdusek
andauthored
refactor!: RequestQueue and service management rehaul (#429)
- closes #423 - closes #174 - closes #203 - related to #354 - let's investigate this, but I believe that it won't go away until #433 is resolved - closes #83 Locking in memory storage was not implemented - see #433 --------- Co-authored-by: Vlada Dusek <[email protected]>
1 parent ada0990 commit b155a9f

17 files changed

Lines changed: 498 additions & 340 deletions

src/crawlee/_utils/lru_cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
T = TypeVar('T')
99

1010

11-
class LRUCache(MutableMapping, Generic[T]):
11+
class LRUCache(MutableMapping[str, T], Generic[T]):
1212
"""Attempt to reimplement LRUCache from `@apify/datastructures` using `OrderedDict`."""
1313

1414
def __init__(self, max_length: int) -> None:

src/crawlee/base_storage_client/base_request_queue_client.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
ProcessedRequest,
1212
ProlongRequestLockResponse,
1313
Request,
14-
RequestListResponse,
1514
RequestQueueHead,
1615
RequestQueueHeadWithLocks,
1716
RequestQueueMetadata,
@@ -185,17 +184,3 @@ async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsO
185184
Args:
186185
requests: The requests to delete from the queue.
187186
"""
188-
189-
@abstractmethod
190-
async def list_requests(
191-
self,
192-
*,
193-
limit: int | None = None,
194-
exclusive_start_id: str | None = None,
195-
) -> RequestListResponse:
196-
"""List requests from the queue.
197-
198-
Args:
199-
limit: How many requests to retrieve.
200-
exclusive_start_id: All requests up to this one (including) are skipped from the result.
201-
"""

src/crawlee/basic_crawler/basic_crawler.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
from tldextract import TLDExtract
1919
from typing_extensions import NotRequired, TypedDict, TypeVar, Unpack, assert_never
2020

21-
from crawlee import Glob
21+
from crawlee import Glob, service_container
2222
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
2323
from crawlee._utils.wait import wait_for
2424
from crawlee.autoscaling import AutoscaledPool, ConcurrencySettings
2525
from crawlee.autoscaling.snapshotter import Snapshotter
2626
from crawlee.autoscaling.system_status import SystemStatus
2727
from crawlee.basic_crawler.context_pipeline import ContextPipeline
2828
from crawlee.basic_crawler.router import Router
29-
from crawlee.configuration import Configuration
3029
from crawlee.enqueue_strategy import EnqueueStrategy
3130
from crawlee.errors import (
3231
ContextPipelineInitializationError,
@@ -35,7 +34,6 @@
3534
SessionError,
3635
UserDefinedErrorHandlerError,
3736
)
38-
from crawlee.events import LocalEventManager
3937
from crawlee.http_clients import HttpxHttpClient
4038
from crawlee.log_config import CrawleeLogFormatter
4139
from crawlee.models import BaseRequestData, DatasetItemsListPage, Request, RequestState
@@ -47,6 +45,8 @@
4745
if TYPE_CHECKING:
4846
import re
4947

48+
from crawlee.configuration import Configuration
49+
from crawlee.events.event_manager import EventManager
5050
from crawlee.http_clients import BaseHttpClient, HttpResponse
5151
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
5252
from crawlee.sessions import Session
@@ -77,6 +77,7 @@ class BasicCrawlerOptions(TypedDict, Generic[TCrawlingContext]):
7777
retry_on_blocked: NotRequired[bool]
7878
proxy_configuration: NotRequired[ProxyConfiguration]
7979
statistics: NotRequired[Statistics[StatisticsState]]
80+
event_manager: NotRequired[EventManager]
8081
configure_logging: NotRequired[bool]
8182
_context_pipeline: NotRequired[ContextPipeline[TCrawlingContext]]
8283
_additional_context_managers: NotRequired[Sequence[AsyncContextManager]]
@@ -111,6 +112,7 @@ def __init__(
111112
retry_on_blocked: bool = True,
112113
proxy_configuration: ProxyConfiguration | None = None,
113114
statistics: Statistics | None = None,
115+
event_manager: EventManager | None = None,
114116
configure_logging: bool = True,
115117
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
116118
_additional_context_managers: Sequence[AsyncContextManager] | None = None,
@@ -138,6 +140,7 @@ def __init__(
138140
retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
139141
proxy_configuration: A HTTP proxy configuration to be used for making requests
140142
statistics: A preconfigured `Statistics` instance if you wish to use non-default configuration
143+
event_manager: A custom `EventManager` instance if you wish to use a non-default one
141144
configure_logging: If set to True, the crawler will configure the logging infrastructure
142145
_context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
143146
This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
@@ -164,7 +167,7 @@ def __init__(
164167
self._max_session_rotations = max_session_rotations
165168

166169
self._request_provider = request_provider
167-
self._configuration = configuration or Configuration.get_global_configuration()
170+
self._configuration = configuration or service_container.get_configuration()
168171

169172
self._request_handler_timeout = request_handler_timeout
170173
self._internal_timeout = (
@@ -175,8 +178,7 @@ def __init__(
175178

176179
self._tld_extractor = TLDExtract(cache_dir=tempfile.TemporaryDirectory().name)
177180

178-
self._event_manager = LocalEventManager() # TODO: switch based on configuration
179-
# https://github.com/apify/crawlee-py/issues/83
181+
self._event_manager = event_manager or service_container.get_event_manager()
180182
self._snapshotter = Snapshotter(self._event_manager)
181183
self._pool = AutoscaledPool(
182184
system_status=SystemStatus(self._snapshotter),

src/crawlee/configuration.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
from datetime import timedelta
6-
from typing import Annotated, ClassVar, cast
6+
from typing import Annotated
77

88
from pydantic import AliasChoices, Field
99
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -22,8 +22,6 @@ class Configuration(BaseSettings):
2222
purge_on_start: Whether to purge the storage on start.
2323
"""
2424

25-
_default_instance: ClassVar[Self | None] = None
26-
2725
model_config = SettingsConfigDict(populate_by_name=True)
2826

2927
internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None
@@ -206,12 +204,19 @@ class Configuration(BaseSettings):
206204
),
207205
] = False
208206

209-
in_cloud: Annotated[bool, Field(alias='crawlee_in_cloud')] = False
210-
211207
@classmethod
212208
def get_global_configuration(cls) -> Self:
213209
"""Retrieve the global instance of the configuration."""
214-
if Configuration._default_instance is None:
215-
Configuration._default_instance = cls()
210+
from crawlee import service_container
211+
212+
if service_container.get_configuration_if_set() is None:
213+
service_container.set_configuration(cls())
214+
215+
global_instance = service_container.get_configuration()
216+
217+
if not isinstance(global_instance, cls):
218+
raise TypeError(
219+
f'Requested global configuration object of type {cls}, but {global_instance.__class__} was found'
220+
)
216221

217-
return cast(Self, Configuration._default_instance)
222+
return global_instance

src/crawlee/memory_storage_client/request_queue_client.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
ProcessedRequest,
3131
ProlongRequestLockResponse,
3232
Request,
33-
RequestListResponse,
3433
RequestQueueHead,
3534
RequestQueueHeadWithLocks,
3635
RequestQueueMetadata,
@@ -215,7 +214,14 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:
215214

216215
@override
217216
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
218-
raise NotImplementedError('This method is not supported in memory storage.')
217+
result = await self.list_head(limit=limit)
218+
return RequestQueueHeadWithLocks(
219+
lock_secs=lock_secs,
220+
limit=result.limit,
221+
had_multiple_clients=result.had_multiple_clients,
222+
queue_modified_at=result.queue_modified_at,
223+
items=result.items,
224+
)
219225

220226
@override
221227
async def add_request(
@@ -380,7 +386,7 @@ async def prolong_request_lock(
380386
forefront: bool = False,
381387
lock_secs: int,
382388
) -> ProlongRequestLockResponse:
383-
raise NotImplementedError('This method is not supported in memory storage.')
389+
return ProlongRequestLockResponse(lock_expires_at=datetime.now(timezone.utc))
384390

385391
@override
386392
async def delete_request_lock(
@@ -389,7 +395,7 @@ async def delete_request_lock(
389395
*,
390396
forefront: bool = False,
391397
) -> None:
392-
raise NotImplementedError('This method is not supported in memory storage.')
398+
return None
393399

394400
@override
395401
async def batch_add_requests(
@@ -431,15 +437,6 @@ async def batch_add_requests(
431437
async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse:
432438
raise NotImplementedError('This method is not supported in memory storage.')
433439

434-
@override
435-
async def list_requests(
436-
self,
437-
*,
438-
limit: int | None = None,
439-
exclusive_start_id: str | None = None,
440-
) -> RequestListResponse:
441-
raise NotImplementedError('This method is not supported in memory storage.')
442-
443440
async def update_timestamps(self, *, has_been_modified: bool) -> None:
444441
"""Update the timestamps of the request queue."""
445442
self._accessed_at = datetime.now(timezone.utc)

src/crawlee/models.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,6 @@ class RequestQueueHeadWithLocks(RequestQueueHead):
350350
"""Model for request queue head with locks."""
351351

352352
lock_secs: Annotated[int, Field(alias='lockSecs')]
353-
items: Annotated[list[Request], Field(alias='items', default_factory=list)]
354353

355354

356355
class BaseListPage(BaseModel):
@@ -449,13 +448,3 @@ class BatchRequestsOperationResponse(BaseModel):
449448

450449
processed_requests: Annotated[list[ProcessedRequest], Field(alias='processedRequests')]
451450
unprocessed_requests: Annotated[list[UnprocessedRequest], Field(alias='unprocessedRequests')]
452-
453-
454-
class RequestListResponse(BaseModel):
455-
"""Response to a request list call."""
456-
457-
model_config = ConfigDict(populate_by_name=True)
458-
459-
limit: Annotated[int, Field()]
460-
exclusive_start_key: Annotated[str | None, Field(alias='exclusiveStartId')]
461-
items: Annotated[list[Request], Field()]

src/crawlee/service_container.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Literal
4+
5+
from typing_extensions import NotRequired, TypedDict
6+
7+
from crawlee.configuration import Configuration
8+
from crawlee.events.local_event_manager import LocalEventManager
9+
from crawlee.memory_storage_client.memory_storage_client import MemoryStorageClient
10+
11+
if TYPE_CHECKING:
12+
from crawlee.base_storage_client.base_storage_client import BaseStorageClient
13+
from crawlee.events.event_manager import EventManager
14+
15+
16+
StorageClientType = Literal['cloud', 'local']
17+
18+
19+
class _Services(TypedDict):
20+
local_storage_client: NotRequired[BaseStorageClient]
21+
cloud_storage_client: NotRequired[BaseStorageClient]
22+
configuration: NotRequired[Configuration]
23+
event_manager: NotRequired[EventManager]
24+
25+
26+
_services = _Services()
27+
_default_storage_client_type: StorageClientType = 'local'
28+
29+
30+
class ServiceConflictError(RuntimeError):
31+
"""Thrown when a service is getting reconfigured."""
32+
33+
def __init__(self, service_name: str, new_value: object, old_value: object) -> None:
34+
super().__init__(
35+
f"Service '{service_name}' was already set (existing value is '{old_value}', new value is '{new_value}')."
36+
)
37+
38+
39+
def get_storage_client(*, client_type: StorageClientType | None = None) -> BaseStorageClient:
40+
"""Get the storage client instance for the current environment.
41+
42+
Args:
43+
client_type: Allows retrieving a specific storage client type, regardless of where we are running.
44+
45+
Returns:
46+
The current storage client instance.
47+
"""
48+
if client_type is None:
49+
client_type = _default_storage_client_type
50+
51+
if client_type == 'cloud':
52+
if 'cloud_storage_client' not in _services:
53+
raise RuntimeError('Cloud client was not provided.')
54+
return _services['cloud_storage_client']
55+
56+
if 'local_storage_client' not in _services:
57+
_services['local_storage_client'] = MemoryStorageClient()
58+
59+
return _services['local_storage_client']
60+
61+
62+
def set_local_storage_client(local_client: BaseStorageClient) -> None:
63+
"""Set the local storage client instance.
64+
65+
Args:
66+
local_client: The local storage client instance.
67+
"""
68+
if (existing_service := _services.get('local_storage_client')) and existing_service is not local_client:
69+
raise ServiceConflictError('local_storage_client', local_client, existing_service)
70+
71+
_services['local_storage_client'] = local_client
72+
73+
74+
def set_cloud_storage_client(cloud_client: BaseStorageClient) -> None:
75+
"""Set the cloud storage client instance.
76+
77+
Args:
78+
cloud_client: The cloud storage client instance.
79+
"""
80+
if (existing_service := _services.get('cloud_storage_client')) and existing_service is not cloud_client:
81+
raise ServiceConflictError('cloud_storage_client', cloud_client, existing_service)
82+
83+
_services['cloud_storage_client'] = cloud_client
84+
85+
86+
def set_default_storage_client_type(client_type: StorageClientType) -> None:
87+
"""Set the default storage client type."""
88+
global _default_storage_client_type # noqa: PLW0603
89+
_default_storage_client_type = client_type
90+
91+
92+
def get_configuration() -> Configuration:
93+
"""Get the configuration object."""
94+
if 'configuration' not in _services:
95+
_services['configuration'] = Configuration()
96+
97+
return _services['configuration']
98+
99+
100+
def get_configuration_if_set() -> Configuration | None:
101+
"""Get the configuration object, or None if it hasn't been set yet."""
102+
return _services.get('configuration')
103+
104+
105+
def set_configuration(configuration: Configuration) -> None:
106+
"""Set the configuration object."""
107+
if (existing_service := _services.get('configuration')) and existing_service is not configuration:
108+
raise ServiceConflictError('configuration', configuration, existing_service)
109+
110+
_services['configuration'] = configuration
111+
112+
113+
def get_event_manager() -> EventManager:
114+
"""Get the event manager."""
115+
if 'event_manager' not in _services:
116+
_services['event_manager'] = LocalEventManager()
117+
118+
return _services['event_manager']
119+
120+
121+
def set_event_manager(event_manager: EventManager) -> None:
122+
"""Set the event manager."""
123+
if (existing_service := _services.get('event_manager')) and existing_service is not event_manager:
124+
raise ServiceConflictError('event_manager', event_manager, existing_service)
125+
126+
_services['event_manager'] = event_manager

src/crawlee/statistics/statistics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
from typing_extensions import Self, TypeVar
1010

11+
import crawlee.service_container
1112
from crawlee._utils.recurring_task import RecurringTask
12-
from crawlee.events import LocalEventManager
1313
from crawlee.events.types import Event, EventPersistStateData
1414
from crawlee.statistics import FinalStatistics, StatisticsPersistedState, StatisticsState
1515
from crawlee.statistics.error_tracker import ErrorTracker
@@ -85,7 +85,7 @@ def __init__(
8585
self.error_tracker = ErrorTracker()
8686
self.error_tracker_retry = ErrorTracker()
8787

88-
self._events = event_manager or LocalEventManager()
88+
self._events = event_manager or crawlee.service_container.get_event_manager()
8989

9090
self._requests_in_progress = dict[str, RequestProcessingRecord]()
9191

0 commit comments

Comments
 (0)