11# Inspiration: https://github.com/apify/crawlee/blob/v3.7.3/packages/basic-crawler/src/internals/basic-crawler.ts
2-
32from __future__ import annotations
43
5- from typing import Awaitable , Callable
4+ from datetime import timedelta
5+ from logging import getLogger
6+ from typing import TYPE_CHECKING , Awaitable , Callable , Generic
7+
8+ from typing_extensions import TypeVar
69
10+ from crawlee ._utils .wait import wait_for
711from crawlee .autoscaling import AutoscaledPool
812from crawlee .autoscaling .snapshotter import Snapshotter
913from crawlee .autoscaling .system_status import SystemStatus
14+ from crawlee .basic_crawler .context_pipeline import RequestHandlerError
1015from crawlee .basic_crawler .router import Router
11- from crawlee .basic_crawler .types import BasicCrawlingContext
16+ from crawlee .basic_crawler .types import BasicCrawlingContext , CreateRequestSchema , FinalStatistics , RequestState
17+ from crawlee .config import Configuration
18+ from crawlee .events .local_event_manager import LocalEventManager
19+
20+ if TYPE_CHECKING :
21+ from crawlee .basic_crawler .context_pipeline import ContextPipeline
22+ from crawlee .storages .request_provider import RequestProvider
23+
24+
25+ TCrawlingContext = TypeVar ('TCrawlingContext' , bound = BasicCrawlingContext , default = BasicCrawlingContext )
26+ ErrorHandler = Callable [[TCrawlingContext , Exception ], Awaitable [TCrawlingContext ]]
27+ FailedRequestHandler = Callable [[TCrawlingContext , Exception ], Awaitable [None ]]
28+
29+ logger = getLogger (__name__ )
1230
1331
14- class BasicCrawler :
32+ class UserDefinedErrorHandlerError (Exception ):
33+ """Wraps an exception thrown from an user-defined error handler."""
34+
35+
36+ class BasicCrawler (Generic [TCrawlingContext ]):
1537 """Provides a simple framework for parallel crawling of web pages.
1638
1739 The URLs to crawl are fed either from a static list of URLs or from a dynamic queue of URLs enabling recursive
@@ -22,7 +44,19 @@ class BasicCrawler:
2244 one of its subclasses.
2345 """
2446
25- def __init__ (self : BasicCrawler , * , router : Callable [[BasicCrawlingContext ], Awaitable ] | None = None ) -> None :
47+ def __init__ (
48+ self ,
49+ * ,
50+ router : Callable [[TCrawlingContext ], Awaitable ] | None = None ,
51+ _context_pipeline : ContextPipeline [TCrawlingContext ],
52+ # TODO: make request_provider optional (and instantiate based on configuration if None is supplied)
53+ request_provider : RequestProvider ,
54+ min_concurrency : int | None = None ,
55+ max_concurrency : int | None = None ,
56+ max_requests_per_minute : int | None = None ,
57+ configuration : Configuration | None = None ,
58+ request_handler_timeout : timedelta = timedelta (minutes = 1 ),
59+ ) -> None :
2660 if isinstance (router , Router ):
2761 self ._router = router
2862 elif router is not None :
@@ -31,33 +65,185 @@ def __init__(self: BasicCrawler, *, router: Callable[[BasicCrawlingContext], Awa
3165 else :
3266 self ._router = None
3367
68+ self ._context_pipeline = _context_pipeline
69+
70+ self ._error_handler : ErrorHandler [BasicCrawlingContext ] | None = None
71+ self ._failed_request_handler : FailedRequestHandler [BasicCrawlingContext ] | None = None
72+
73+ pool_kwargs = {}
74+ if min_concurrency is not None :
75+ pool_kwargs ['min_concurrency' ] = min_concurrency
76+ if max_concurrency is not None :
77+ pool_kwargs ['max_concurrency' ] = max_concurrency
78+ pool_kwargs ['max_tasks_per_minute' ] = max_requests_per_minute
79+
80+ self ._request_provider = request_provider
81+ self ._configuration = configuration or Configuration ()
82+
83+ self ._request_handler_timeout = request_handler_timeout
84+ self ._internal_timeout = (
85+ self ._configuration .internal_timeout
86+ if self ._configuration .internal_timeout is not None
87+ else max (2 * request_handler_timeout , timedelta (minutes = 5 ))
88+ )
89+
90+ self ._event_manager = LocalEventManager () # TODO: switch based on configuration
91+ self ._snapshotter = Snapshotter (self ._event_manager )
3492 self ._pool = AutoscaledPool (
35- system_status = SystemStatus (Snapshotter ()),
36- is_finished_function = self ._is_finished_function ,
37- is_task_ready_function = self ._is_task_ready_function ,
38- run_task_function = self ._run_task_function ,
93+ system_status = SystemStatus (self ._snapshotter ),
94+ is_finished_function = self .__is_finished_function ,
95+ is_task_ready_function = self .__is_task_ready_function ,
96+ run_task_function = self .__run_task_function ,
97+ ** pool_kwargs ,
3998 )
4099
41100 @property
42- def router (self : BasicCrawler ) -> Router [BasicCrawlingContext ]:
101+ def router (self ) -> Router [TCrawlingContext ]:
43102 """The router used to handle each individual crawling request."""
44103 if self ._router is None :
45- self ._router = Router [BasicCrawlingContext ]()
104+ self ._router = Router [TCrawlingContext ]()
46105
47106 return self ._router
48107
49108 @router .setter
50- def router (self : BasicCrawler , router : Router [BasicCrawlingContext ]) -> None :
109+ def router (self , router : Router [TCrawlingContext ]) -> None :
51110 if self ._router is not None :
52111 raise RuntimeError ('A router is already set' )
53112
54113 self ._router = router
55114
56- def _is_finished_function (self : BasicCrawler ) -> bool :
57- pass
115+ def error_handler (self , handler : ErrorHandler [BasicCrawlingContext ]) -> ErrorHandler [BasicCrawlingContext ]:
116+ """Decorator for configuring an error handler (called after a request handler error and before retrying)."""
117+ self ._error_handler = handler
118+ return handler
119+
120+ def failed_request_handler (
121+ self , handler : FailedRequestHandler [BasicCrawlingContext ]
122+ ) -> FailedRequestHandler [BasicCrawlingContext ]:
123+ """Decorator for configuring a failed request handler (called after max retries are reached)."""
124+ self ._failed_request_handler = handler
125+ return handler
126+
127+ async def add_requests (
128+ self ,
129+ requests : list [str | CreateRequestSchema ],
130+ * ,
131+ batch_size : int = 1000 ,
132+ wait_for_all_requests_to_be_added : bool = False ,
133+ wait_time_between_batches : timedelta = timedelta (0 ),
134+ ) -> None :
135+ """Add requests to the underlying queue."""
136+ await self ._request_provider .add_requests_batched (
137+ [
138+ request
139+ if isinstance (request , CreateRequestSchema )
140+ else CreateRequestSchema (url = request , unique_key = '' ) # TODO: move unique key generation from sdk
141+ for request in requests
142+ ],
143+ batch_size = batch_size ,
144+ wait_for_all_requests_to_be_added = wait_for_all_requests_to_be_added ,
145+ wait_time_between_batches = wait_time_between_batches ,
146+ )
147+
148+ async def run (self , requests : list [str | CreateRequestSchema ] | None = None ) -> FinalStatistics :
149+ """Run the crawler until all reqeuests are processed."""
150+ if requests is not None :
151+ await self .add_requests (requests )
152+
153+ async with self ._event_manager :
154+ await self ._snapshotter .start ()
155+ await self ._pool .run ()
156+ await self ._snapshotter .stop ()
157+ return FinalStatistics ()
158+
159+ def __is_finished_function (self ) -> bool :
160+ return self ._request_provider .is_finished ()
161+
162+ def __is_task_ready_function (self ) -> bool :
163+ return self ._request_provider .is_empty ()
164+
165+ async def __run_task_function (self ) -> None :
166+ request = await wait_for (
167+ self ._request_provider .fetch_next_request (),
168+ self ._internal_timeout ,
169+ f'Fetching next request failed after { self ._internal_timeout .total_seconds ()} seconds' ,
170+ )
171+
172+ if request is None :
173+ return
174+
175+ # TODO: fetch session from the session pool
176+
177+ crawling_context = BasicCrawlingContext (request = request )
178+
179+ try :
180+ request .state = RequestState .REQUEST_HANDLER
181+
182+ await wait_for (
183+ self .__run_request_handler (crawling_context ),
184+ self ._request_handler_timeout ,
185+ f'Request handler timed out after { self ._request_handler_timeout .total_seconds ()} seconds' ,
186+ )
187+
188+ await wait_for (
189+ self ._request_provider .mark_request_handled (request ),
190+ self ._internal_timeout ,
191+ f'Marking request as handled timed out after { self ._internal_timeout .total_seconds ()} seconds' ,
192+ ) # TODO: retry on failure
193+
194+ request .state = RequestState .DONE
195+ except RequestHandlerError [TCrawlingContext ] as primary_error :
196+ try :
197+ request .state = RequestState .ERROR_HANDLER
198+
199+ await wait_for (
200+ self ._handle_request_error (primary_error .crawling_context , primary_error .wrapped_exception ),
201+ self ._internal_timeout ,
202+ f'Handling request failure timed out after { self ._internal_timeout .total_seconds ()} seconds' ,
203+ )
204+
205+ request .state = RequestState .DONE
206+ except UserDefinedErrorHandlerError :
207+ request .state = RequestState .ERROR
208+ raise
209+ except Exception as secondary_error :
210+ logger .exception (
211+ 'An exception occurred during handling of failed request. This places the crawler '
212+ 'and its underlying storages into an unknown state and crawling will be terminated.' ,
213+ exc_info = secondary_error ,
214+ )
215+ request .state = RequestState .ERROR
216+ raise
217+ except Exception as internal_error :
218+ logger .exception (
219+ 'An exception occurred during handling of a request. This places the crawler '
220+ 'and its underlying storages into an unknown state and crawling will be terminated.' ,
221+ exc_info = internal_error ,
222+ )
223+ raise
224+
225+ async def __run_request_handler (self , crawling_context : BasicCrawlingContext ) -> None :
226+ await self ._context_pipeline (crawling_context , self .router )
227+
228+ async def _handle_request_error (self , crawling_context : TCrawlingContext , error : Exception ) -> None :
229+ should_retry_request = False # TODO: handle and track retries
230+
231+ if should_retry_request :
232+ if self ._error_handler :
233+ try :
234+ await self ._error_handler (crawling_context , error )
235+ except Exception as e :
236+ raise UserDefinedErrorHandlerError ('Exception thrown in user-defined request error handler' ) from e
237+
238+ await self ._request_provider .reclaim_request (crawling_context .request )
239+ else :
240+ await self ._handle_failed_request (crawling_context , error )
58241
59- def _is_task_ready_function (self : BasicCrawler ) -> bool :
60- pass
242+ async def _handle_failed_request (self , crawling_context : TCrawlingContext , error : Exception ) -> None :
243+ logger . exception ( 'Request failed and reached maximum retries' , exc_info = error )
61244
62- async def _run_task_function (self : BasicCrawler ) -> None :
63- pass
245+ if self ._failed_request_handler :
246+ try :
247+ await self ._failed_request_handler (crawling_context , error )
248+ except Exception as e :
249+ raise UserDefinedErrorHandlerError ('Exception thrown in user-defined failed request handler' ) from e
0 commit comments