Skip to content

Commit 1bd0e09

Browse files
committed
Sketch of crawler lifecycle implementation
1 parent 2e9e749 commit 1bd0e09

13 files changed

Lines changed: 448 additions & 41 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pyee = "^11.1.0"
5252
typing-extensions = "^4.1.0"
5353
pydantic = "^2.6.3"
5454
eval-type-backport = "^0.1.3"
55+
pydantic-settings = "^2.2.1"
56+
httpx = "^0.27.0"
5557

5658
[tool.poetry.group.dev.dependencies]
5759
build = "~1.1.0"

src/crawlee/_utils/wait.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import asyncio
2+
from datetime import timedelta
3+
from typing import Awaitable, TypeVar
4+
5+
T = TypeVar('T')
6+
7+
8+
async def wait_for(fut: Awaitable[T], timeout: timedelta, timeout_message: str) -> T:
9+
try:
10+
return await asyncio.wait_for(fut, timeout.total_seconds())
11+
except asyncio.TimeoutError as ex:
12+
raise asyncio.TimeoutError(timeout_message) from ex
Lines changed: 204 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,39 @@
11
# Inspiration: https://github.com/apify/crawlee/blob/v3.7.3/packages/basic-crawler/src/internals/basic-crawler.ts
2-
32
from __future__ import annotations
43

5-
from typing import Awaitable, Callable
4+
from datetime import timedelta
5+
from logging import getLogger
6+
from typing import TYPE_CHECKING, Awaitable, Callable, Generic
7+
8+
from typing_extensions import TypeVar
69

10+
from crawlee._utils.wait import wait_for
711
from crawlee.autoscaling import AutoscaledPool
812
from crawlee.autoscaling.snapshotter import Snapshotter
913
from crawlee.autoscaling.system_status import SystemStatus
14+
from crawlee.basic_crawler.context_pipeline import RequestHandlerError
1015
from crawlee.basic_crawler.router import Router
11-
from crawlee.basic_crawler.types import BasicCrawlingContext
16+
from crawlee.basic_crawler.types import BasicCrawlingContext, CreateRequestSchema, FinalStatistics, RequestState
17+
from crawlee.config import Configuration
18+
from crawlee.events.local_event_manager import LocalEventManager
19+
20+
if TYPE_CHECKING:
21+
from crawlee.basic_crawler.context_pipeline import ContextPipeline
22+
from crawlee.storages.request_provider import RequestProvider
23+
24+
25+
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
26+
ErrorHandler = Callable[[TCrawlingContext, Exception], Awaitable[TCrawlingContext]]
27+
FailedRequestHandler = Callable[[TCrawlingContext, Exception], Awaitable[None]]
28+
29+
logger = getLogger(__name__)
1230

1331

14-
class BasicCrawler:
32+
class UserDefinedErrorHandlerError(Exception):
33+
"""Wraps an exception thrown from an user-defined error handler."""
34+
35+
36+
class BasicCrawler(Generic[TCrawlingContext]):
1537
"""Provides a simple framework for parallel crawling of web pages.
1638
1739
The URLs to crawl are fed either from a static list of URLs or from a dynamic queue of URLs enabling recursive
@@ -22,7 +44,19 @@ class BasicCrawler:
2244
one of its subclasses.
2345
"""
2446

25-
def __init__(self: BasicCrawler, *, router: Callable[[BasicCrawlingContext], Awaitable] | None = None) -> None:
47+
def __init__(
48+
self,
49+
*,
50+
router: Callable[[TCrawlingContext], Awaitable] | None = None,
51+
_context_pipeline: ContextPipeline[TCrawlingContext],
52+
# TODO: make request_provider optional (and instantiate based on configuration if None is supplied)
53+
request_provider: RequestProvider,
54+
min_concurrency: int | None = None,
55+
max_concurrency: int | None = None,
56+
max_requests_per_minute: int | None = None,
57+
configuration: Configuration | None = None,
58+
request_handler_timeout: timedelta = timedelta(minutes=1),
59+
) -> None:
2660
if isinstance(router, Router):
2761
self._router = router
2862
elif router is not None:
@@ -31,33 +65,185 @@ def __init__(self: BasicCrawler, *, router: Callable[[BasicCrawlingContext], Awa
3165
else:
3266
self._router = None
3367

68+
self._context_pipeline = _context_pipeline
69+
70+
self._error_handler: ErrorHandler[BasicCrawlingContext] | None = None
71+
self._failed_request_handler: FailedRequestHandler[BasicCrawlingContext] | None = None
72+
73+
pool_kwargs = {}
74+
if min_concurrency is not None:
75+
pool_kwargs['min_concurrency'] = min_concurrency
76+
if max_concurrency is not None:
77+
pool_kwargs['max_concurrency'] = max_concurrency
78+
pool_kwargs['max_tasks_per_minute'] = max_requests_per_minute
79+
80+
self._request_provider = request_provider
81+
self._configuration = configuration or Configuration()
82+
83+
self._request_handler_timeout = request_handler_timeout
84+
self._internal_timeout = (
85+
self._configuration.internal_timeout
86+
if self._configuration.internal_timeout is not None
87+
else max(2 * request_handler_timeout, timedelta(minutes=5))
88+
)
89+
90+
self._event_manager = LocalEventManager() # TODO: switch based on configuration
91+
self._snapshotter = Snapshotter(self._event_manager)
3492
self._pool = AutoscaledPool(
35-
system_status=SystemStatus(Snapshotter()),
36-
is_finished_function=self._is_finished_function,
37-
is_task_ready_function=self._is_task_ready_function,
38-
run_task_function=self._run_task_function,
93+
system_status=SystemStatus(self._snapshotter),
94+
is_finished_function=self.__is_finished_function,
95+
is_task_ready_function=self.__is_task_ready_function,
96+
run_task_function=self.__run_task_function,
97+
**pool_kwargs,
3998
)
4099

41100
@property
42-
def router(self: BasicCrawler) -> Router[BasicCrawlingContext]:
101+
def router(self) -> Router[TCrawlingContext]:
43102
"""The router used to handle each individual crawling request."""
44103
if self._router is None:
45-
self._router = Router[BasicCrawlingContext]()
104+
self._router = Router[TCrawlingContext]()
46105

47106
return self._router
48107

49108
@router.setter
50-
def router(self: BasicCrawler, router: Router[BasicCrawlingContext]) -> None:
109+
def router(self, router: Router[TCrawlingContext]) -> None:
51110
if self._router is not None:
52111
raise RuntimeError('A router is already set')
53112

54113
self._router = router
55114

56-
def _is_finished_function(self: BasicCrawler) -> bool:
57-
pass
115+
def error_handler(self, handler: ErrorHandler[BasicCrawlingContext]) -> ErrorHandler[BasicCrawlingContext]:
116+
"""Decorator for configuring an error handler (called after a request handler error and before retrying)."""
117+
self._error_handler = handler
118+
return handler
119+
120+
def failed_request_handler(
121+
self, handler: FailedRequestHandler[BasicCrawlingContext]
122+
) -> FailedRequestHandler[BasicCrawlingContext]:
123+
"""Decorator for configuring a failed request handler (called after max retries are reached)."""
124+
self._failed_request_handler = handler
125+
return handler
126+
127+
async def add_requests(
128+
self,
129+
requests: list[str | CreateRequestSchema],
130+
*,
131+
batch_size: int = 1000,
132+
wait_for_all_requests_to_be_added: bool = False,
133+
wait_time_between_batches: timedelta = timedelta(0),
134+
) -> None:
135+
"""Add requests to the underlying queue."""
136+
await self._request_provider.add_requests_batched(
137+
[
138+
request
139+
if isinstance(request, CreateRequestSchema)
140+
else CreateRequestSchema(url=request, unique_key='') # TODO: move unique key generation from sdk
141+
for request in requests
142+
],
143+
batch_size=batch_size,
144+
wait_for_all_requests_to_be_added=wait_for_all_requests_to_be_added,
145+
wait_time_between_batches=wait_time_between_batches,
146+
)
147+
148+
async def run(self, requests: list[str | CreateRequestSchema] | None = None) -> FinalStatistics:
149+
"""Run the crawler until all reqeuests are processed."""
150+
if requests is not None:
151+
await self.add_requests(requests)
152+
153+
async with self._event_manager:
154+
await self._snapshotter.start()
155+
await self._pool.run()
156+
await self._snapshotter.stop()
157+
return FinalStatistics()
158+
159+
def __is_finished_function(self) -> bool:
160+
return self._request_provider.is_finished()
161+
162+
def __is_task_ready_function(self) -> bool:
163+
return self._request_provider.is_empty()
164+
165+
async def __run_task_function(self) -> None:
166+
request = await wait_for(
167+
self._request_provider.fetch_next_request(),
168+
self._internal_timeout,
169+
f'Fetching next request failed after {self._internal_timeout.total_seconds()} seconds',
170+
)
171+
172+
if request is None:
173+
return
174+
175+
# TODO: fetch session from the session pool
176+
177+
crawling_context = BasicCrawlingContext(request=request)
178+
179+
try:
180+
request.state = RequestState.REQUEST_HANDLER
181+
182+
await wait_for(
183+
self.__run_request_handler(crawling_context),
184+
self._request_handler_timeout,
185+
f'Request handler timed out after {self._request_handler_timeout.total_seconds()} seconds',
186+
)
187+
188+
await wait_for(
189+
self._request_provider.mark_request_handled(request),
190+
self._internal_timeout,
191+
f'Marking request as handled timed out after {self._internal_timeout.total_seconds()} seconds',
192+
) # TODO: retry on failure
193+
194+
request.state = RequestState.DONE
195+
except RequestHandlerError[TCrawlingContext] as primary_error:
196+
try:
197+
request.state = RequestState.ERROR_HANDLER
198+
199+
await wait_for(
200+
self._handle_request_error(primary_error.crawling_context, primary_error.wrapped_exception),
201+
self._internal_timeout,
202+
f'Handling request failure timed out after {self._internal_timeout.total_seconds()} seconds',
203+
)
204+
205+
request.state = RequestState.DONE
206+
except UserDefinedErrorHandlerError:
207+
request.state = RequestState.ERROR
208+
raise
209+
except Exception as secondary_error:
210+
logger.exception(
211+
'An exception occurred during handling of failed request. This places the crawler '
212+
'and its underlying storages into an unknown state and crawling will be terminated.',
213+
exc_info=secondary_error,
214+
)
215+
request.state = RequestState.ERROR
216+
raise
217+
except Exception as internal_error:
218+
logger.exception(
219+
'An exception occurred during handling of a request. This places the crawler '
220+
'and its underlying storages into an unknown state and crawling will be terminated.',
221+
exc_info=internal_error,
222+
)
223+
raise
224+
225+
async def __run_request_handler(self, crawling_context: BasicCrawlingContext) -> None:
226+
await self._context_pipeline(crawling_context, self.router)
227+
228+
async def _handle_request_error(self, crawling_context: TCrawlingContext, error: Exception) -> None:
229+
should_retry_request = False # TODO: handle and track retries
230+
231+
if should_retry_request:
232+
if self._error_handler:
233+
try:
234+
await self._error_handler(crawling_context, error)
235+
except Exception as e:
236+
raise UserDefinedErrorHandlerError('Exception thrown in user-defined request error handler') from e
237+
238+
await self._request_provider.reclaim_request(crawling_context.request)
239+
else:
240+
await self._handle_failed_request(crawling_context, error)
58241

59-
def _is_task_ready_function(self: BasicCrawler) -> bool:
60-
pass
242+
async def _handle_failed_request(self, crawling_context: TCrawlingContext, error: Exception) -> None:
243+
logger.exception('Request failed and reached maximum retries', exc_info=error)
61244

62-
async def _run_task_function(self: BasicCrawler) -> None:
63-
pass
245+
if self._failed_request_handler:
246+
try:
247+
await self._failed_request_handler(crawling_context, error)
248+
except Exception as e:
249+
raise UserDefinedErrorHandlerError('Exception thrown in user-defined failed request handler') from e
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import annotations
2+
3+
from typing import Awaitable, Callable, Generic, cast
4+
5+
from typing_extensions import TypeVar
6+
7+
from crawlee.basic_crawler.types import BasicCrawlingContext
8+
9+
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
10+
TMiddlewareCrawlingContext = TypeVar('TMiddlewareCrawlingContext', bound=BasicCrawlingContext)
11+
MiddlewareCallNext = Callable[[TMiddlewareCrawlingContext], Awaitable[None]]
12+
13+
14+
class RequestHandlerError(Exception, Generic[TCrawlingContext]):
15+
"""Wraps an exception thrown from a request handler (router) and extends it with crawling context."""
16+
17+
def __init__(self, wrapped_exception: Exception, crawling_context: TCrawlingContext) -> None:
18+
self.wrapped_exception = wrapped_exception
19+
self.crawling_context = crawling_context
20+
21+
22+
class ContextPipeline(Generic[TCrawlingContext]):
23+
def __init__(
24+
self,
25+
*,
26+
_middleware: Callable[[BasicCrawlingContext, MiddlewareCallNext[BasicCrawlingContext]], Awaitable[None]]
27+
| None = None,
28+
_parent: ContextPipeline[BasicCrawlingContext] | None = None,
29+
) -> None:
30+
self._middleware = _middleware
31+
self._parent = _parent
32+
33+
async def __call__(
34+
self,
35+
crawling_context: BasicCrawlingContext,
36+
final_context_consumer: Callable[[TCrawlingContext], Awaitable[None]],
37+
) -> None:
38+
if not self._middleware:
39+
return
40+
41+
async def call_next(
42+
enhanced_context: BasicCrawlingContext,
43+
) -> None:
44+
if self._parent:
45+
await self._parent(
46+
enhanced_context, cast(Callable[[BasicCrawlingContext], Awaitable[None]], final_context_consumer)
47+
)
48+
else:
49+
try:
50+
await final_context_consumer(cast(TCrawlingContext, enhanced_context))
51+
except Exception as e:
52+
raise RequestHandlerError(e, cast(TCrawlingContext, enhanced_context)) from e
53+
54+
await self._middleware(crawling_context, call_next)
55+
56+
def compose(
57+
self, middleware: Callable[[TCrawlingContext, MiddlewareCallNext[TMiddlewareCrawlingContext]], Awaitable[None]]
58+
) -> ContextPipeline[TMiddlewareCrawlingContext]:
59+
return ContextPipeline[TMiddlewareCrawlingContext](
60+
_middleware=cast(
61+
Callable[[BasicCrawlingContext, MiddlewareCallNext[BasicCrawlingContext]], Awaitable[None]], middleware
62+
),
63+
_parent=cast(ContextPipeline[BasicCrawlingContext], self),
64+
)

src/crawlee/basic_crawler/router.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ class Router(Generic[TCrawlingContext]):
1313

1414
def __init__(self: Router) -> None:
1515
self._default_handler = None
16-
self._handlers_by_label = dict[str, RequestHandler]()
16+
self._handlers_by_label = dict[str, RequestHandler[TCrawlingContext]]()
1717

18-
def default_handler(self: Router, handler: RequestHandler) -> None:
18+
def default_handler(self: Router, handler: RequestHandler[TCrawlingContext]) -> RequestHandler[TCrawlingContext]:
1919
"""A decorator used to register a default handler.
2020
2121
The default handler is invoked for requests that have either no label or a label for which we have no matching
@@ -26,7 +26,9 @@ def default_handler(self: Router, handler: RequestHandler) -> None:
2626

2727
self._default_handler = handler
2828

29-
def handler(self: Router, label: str) -> Callable[[RequestHandler], None]:
29+
return handler
30+
31+
def handler(self: Router, label: str) -> Callable[[RequestHandler[TCrawlingContext]], None]:
3032
"""A decorator used to register a label-based handler.
3133
3234
The registered will be invoked only for requests with the exact same label.

0 commit comments

Comments
 (0)