22from __future__ import annotations
33
44import tempfile
5+ from contextlib import AsyncExitStack
56from datetime import timedelta
67from functools import partial
78from logging import getLogger
1819from crawlee .autoscaling .system_status import SystemStatus
1920from crawlee .basic_crawler .context_pipeline import (
2021 ContextPipeline ,
22+ )
23+ from crawlee .basic_crawler .errors import (
2124 ContextPipelineInitializationError ,
2225 ContextPipelineInterruptedError ,
2326 RequestHandlerError ,
27+ SessionError ,
28+ UserDefinedErrorHandlerError ,
2429)
2530from crawlee .basic_crawler .router import Router
2631from crawlee .basic_crawler .types import (
3439from crawlee .events .local_event_manager import LocalEventManager
3540from crawlee .http_clients .httpx_client import HttpxClient
3641from crawlee .request import BaseRequestData , Request , RequestState
42+ from crawlee .sessions import SessionPool
3743from crawlee .storages .request_queue import RequestQueue
3844
3945if TYPE_CHECKING :
4046 import re
4147
4248 from crawlee .http_clients .base_http_client import BaseHttpClient , HttpResponse
49+ from crawlee .sessions .session import Session
4350 from crawlee .storages .request_provider import RequestProvider
4451
4552TCrawlingContext = TypeVar ('TCrawlingContext' , bound = BasicCrawlingContext , default = BasicCrawlingContext )
4956logger = getLogger (__name__ )
5057
5158
52- class UserDefinedErrorHandlerError (Exception ):
53- """Wraps an exception thrown from an user-defined error handler."""
54-
55-
5659class BasicCrawler (Generic [TCrawlingContext ]):
5760 """Provides a simple framework for parallel crawling of web pages.
5861
@@ -72,8 +75,12 @@ def __init__(
7275 http_client : BaseHttpClient | None = None ,
7376 concurrency_settings : ConcurrencySettings | None = None ,
7477 max_request_retries : int = 3 ,
78+ max_session_rotations : int = 10 ,
7579 configuration : Configuration | None = None ,
7680 request_handler_timeout : timedelta = timedelta (minutes = 1 ),
81+ session_pool : SessionPool | None = None ,
82+ use_session_pool : bool = True ,
83+ retry_on_blocked : bool = True ,
7784 _context_pipeline : ContextPipeline [TCrawlingContext ] | None = None ,
7885 ) -> None :
7986 """Initialize the BasicCrawler.
@@ -84,8 +91,14 @@ def __init__(
8491 http_client: HTTP client to be used for `BasicCrawlingContext.send_request` and HTTP-only crawling.
8592 concurrency_settings: Allows fine-tuning concurrency levels
8693 max_request_retries: Maximum amount of attempts at processing a request
94+ max_session_rotations: Maximum number of session rotations per request.
95+ The crawler will automatically rotate the session in case of a proxy error or if it gets blocked by
96+ the website.
8797 configuration: Crawler configuration
8898 request_handler_timeout: How long is a single request handler allowed to run
99+ use_session_pool: Enables using the session pool for crawling
100+ session_pool: A preconfigured SessionPool instance if you wish to use non-default configuration
101+ retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
89102 _context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
90103 This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
91104 """
@@ -105,6 +118,7 @@ def __init__(
105118 self ._failed_request_handler : FailedRequestHandler [TCrawlingContext ] | None = None
106119
107120 self ._max_request_retries = max_request_retries
121+ self ._max_session_rotations = max_session_rotations
108122
109123 self ._request_provider = request_provider
110124 self ._configuration = configuration or Configuration ()
@@ -129,6 +143,11 @@ def __init__(
129143 concurrency_settings = concurrency_settings ,
130144 )
131145
146+ self ._use_session_pool = use_session_pool
147+ self ._session_pool : SessionPool = session_pool or SessionPool ()
148+
149+ self ._retry_on_blocked = retry_on_blocked
150+
132151 @property
133152 def router (self ) -> Router [TCrawlingContext ]:
134153 """The router used to handle each individual crawling request."""
@@ -144,6 +163,20 @@ def router(self, router: Router[TCrawlingContext]) -> None:
144163
145164 self ._router = router
146165
166+ async def _get_session (self ) -> Session | None :
167+ """If session pool is being used, try to take a session from it."""
168+ if not self ._use_session_pool :
169+ return None
170+
171+ return await wait_for (
172+ self ._session_pool .get_session ,
173+ timeout = self ._internal_timeout ,
174+ timeout_message = 'Fetching a session from the pool timed out after '
175+ f'{ self ._internal_timeout .total_seconds ()} seconds' ,
176+ max_retries = 3 ,
177+ logger = logger ,
178+ )
179+
147180 async def get_request_provider (self ) -> RequestProvider :
148181 """Return the configured request provider. If none is configured, open and return the default request queue."""
149182 if not self ._request_provider :
@@ -188,19 +221,29 @@ async def run(self, requests: list[str | BaseRequestData] | None = None) -> Fina
188221 if requests is not None :
189222 await self .add_requests (requests )
190223
191- async with self ._event_manager , self ._snapshotter :
224+ async with AsyncExitStack () as exit_stack :
225+ await exit_stack .enter_async_context (self ._event_manager )
226+ await exit_stack .enter_async_context (self ._snapshotter )
227+
228+ if self ._use_session_pool :
229+ await exit_stack .enter_async_context (self ._session_pool )
230+
192231 await self ._pool .run ()
193232
194233 return FinalStatistics ()
195234
196- def _should_retry_request (self , crawling_context : BasicCrawlingContext ) -> bool :
235+ def _should_retry_request (self , crawling_context : BasicCrawlingContext , error : Exception ) -> bool :
236+ if crawling_context .request .no_retry :
237+ return False
238+
239+ if isinstance (error , SessionError ):
240+ return ((crawling_context .request .session_rotation_count or 0 ) + 1 ) < self ._max_session_rotations
241+
197242 max_request_retries = crawling_context .request .max_retries
198243 if max_request_retries is None :
199244 max_request_retries = self ._max_request_retries
200245
201- return (
202- not crawling_context .request .no_retry and (crawling_context .request .retry_count + 1 ) < max_request_retries
203- )
246+ return (crawling_context .request .retry_count + 1 ) < max_request_retries
204247
205248 async def _check_url_after_redirects (
206249 self , crawling_context : TCrawlingContext
@@ -273,7 +316,7 @@ def _check_url_patterns(
273316 async def _handle_request_error (self , crawling_context : TCrawlingContext , error : Exception ) -> None :
274317 request_provider = await self .get_request_provider ()
275318
276- if self ._should_retry_request (crawling_context ):
319+ if self ._should_retry_request (crawling_context , error ):
277320 request = crawling_context .request
278321 request .retry_count += 1
279322
@@ -307,9 +350,16 @@ async def _handle_failed_request(self, crawling_context: TCrawlingContext, error
307350 except Exception as e :
308351 raise UserDefinedErrorHandlerError ('Exception thrown in user-defined failed request handler' ) from e
309352
310- def _prepare_send_request_function (self ) -> SendRequestFunction :
311- async def send_request (url : str , * , method : str = 'get' , headers : dict [str , str ] | None = None ) -> HttpResponse :
312- return await self ._http_client .send_request (url , method = method , headers = httpx .Headers (headers ))
353+ def _prepare_send_request_function (self , session : Session | None ) -> SendRequestFunction :
354+ async def send_request (
355+ url : str ,
356+ * ,
357+ method : str = 'get' ,
358+ headers : dict [str , str ] | None = None ,
359+ ) -> HttpResponse :
360+ return await self ._http_client .send_request (
361+ url , method = method , headers = httpx .Headers (headers ), session = session
362+ )
313363
314364 return send_request
315365
@@ -350,7 +400,7 @@ async def __is_task_ready_function(self) -> bool:
350400 request_provider = await self .get_request_provider ()
351401 return not await request_provider .is_empty ()
352402
353- async def __run_task_function (self ) -> None :
403+ async def __run_task_function (self ) -> None : # noqa: PLR0912
354404 request_provider = await self .get_request_provider ()
355405
356406 request = await wait_for (
@@ -364,14 +414,13 @@ async def __run_task_function(self) -> None:
364414 if request is None :
365415 return
366416
367- # TODO: fetch session from the session pool
368- # https://github.com/apify/crawlee-py/issues/110
369-
417+ session = await self ._get_session ()
370418 result = RequestHandlerRunResult ()
371419
372420 crawling_context = BasicCrawlingContext (
373421 request = request ,
374- send_request = self ._prepare_send_request_function (),
422+ session = session ,
423+ send_request = self ._prepare_send_request_function (session ),
375424 add_requests = result .add_requests ,
376425 )
377426
@@ -398,6 +447,9 @@ async def __run_task_function(self) -> None:
398447 )
399448
400449 request .state = RequestState .DONE
450+
451+ if crawling_context .session :
452+ crawling_context .session .mark_good ()
401453 except RequestHandlerError as primary_error :
402454 primary_error = cast (
403455 RequestHandlerError [TCrawlingContext ], primary_error
@@ -428,6 +480,34 @@ async def __run_task_function(self) -> None:
428480 )
429481 request .state = RequestState .ERROR
430482 raise
483+
484+ if crawling_context .session :
485+ crawling_context .session .mark_bad ()
486+ except SessionError as session_error :
487+ if not crawling_context .session :
488+ raise RuntimeError ('SessionError raised in a crawling context without a session' ) from session_error
489+
490+ if self ._should_retry_request (crawling_context , session_error ):
491+ logger .warning ('Encountered a session error, rotating session and retrying' )
492+
493+ crawling_context .session .retire ()
494+
495+ if crawling_context .request .session_rotation_count is None :
496+ crawling_context .request .session_rotation_count = 0
497+ crawling_context .request .session_rotation_count += 1
498+
499+ await request_provider .reclaim_request (request )
500+ else :
501+ logger .exception ('Request failed and reached maximum retries' , exc_info = session_error )
502+
503+ await wait_for (
504+ lambda : request_provider .mark_request_as_handled (crawling_context .request ),
505+ timeout = self ._internal_timeout ,
506+ timeout_message = 'Marking request as handled timed out after '
507+ f'{ self ._internal_timeout .total_seconds ()} seconds' ,
508+ logger = logger ,
509+ max_retries = 3 ,
510+ )
431511 except ContextPipelineInterruptedError as interruped_error :
432512 logger .debug ('The context pipeline was interrupted' , exc_info = interruped_error )
433513
@@ -440,9 +520,9 @@ async def __run_task_function(self) -> None:
440520 max_retries = 3 ,
441521 )
442522 except ContextPipelineInitializationError as initialization_error :
443- if self ._should_retry_request (crawling_context ):
523+ if self ._should_retry_request (crawling_context , initialization_error ):
444524 logger .debug (
445- 'An exception occured during the initialization of crawling context, a retry is in order' ,
525+ 'An exception occurred during the initialization of crawling context, a retry is in order' ,
446526 exc_info = initialization_error ,
447527 )
448528
@@ -461,6 +541,9 @@ async def __run_task_function(self) -> None:
461541 logger = logger ,
462542 max_retries = 3 ,
463543 )
544+
545+ if crawling_context .session :
546+ crawling_context .session .mark_bad ()
464547 except Exception as internal_error :
465548 logger .exception (
466549 'An exception occurred during handling of a request. This places the crawler '
0 commit comments