Source code for zyte_api._retry

import asyncio
import logging

from aiohttp import client_exceptions
from tenacity import (
    AsyncRetrying,
    RetryCallState,
    after_log,
    before_log,
    before_sleep_log,
    retry_base,
    retry_if_exception,
    stop_after_attempt,
    stop_after_delay,
    wait_chain,
    wait_fixed,
    wait_random,
    wait_random_exponential,
)
from tenacity.stop import stop_never

from ._errors import RequestError

logger = logging.getLogger(__name__)


_NETWORK_ERRORS = (
    asyncio.TimeoutError,  # could happen while reading the response body
    client_exceptions.ClientResponseError,
    client_exceptions.ClientOSError,
    client_exceptions.ServerConnectionError,
    client_exceptions.ServerDisconnectedError,
    client_exceptions.ServerTimeoutError,
    client_exceptions.ClientPayloadError,
    client_exceptions.ClientConnectorSSLError,
    client_exceptions.ClientConnectorError,
)


def _is_network_error(exc: BaseException) -> bool:
    if isinstance(exc, RequestError):
        # RequestError is ClientResponseError, which is in the
        # _NETWORK_ERRORS list, but it should be handled
        # separately.
        return False
    return isinstance(exc, _NETWORK_ERRORS)


def _is_throttling_error(exc: BaseException) -> bool:
    return isinstance(exc, RequestError) and exc.status in (429, 503)


def _is_temporary_download_error(exc: BaseException) -> bool:
    return isinstance(exc, RequestError) and exc.status == 520


[docs] class RetryFactory: """Factory class that builds the :class:`tenacity.AsyncRetrying` object that defines the :ref:`default retry policy <default-retry-policy>`. To create a custom retry policy, you can subclass this factory class, modify it as needed, and then call :meth:`build` on your subclass to get the corresponding :class:`tenacity.AsyncRetrying` object. For example, to increase the maximum number of attempts for :ref:`temporary download errors <zyte-api-temporary-download-errors>` from 4 (i.e. 3 retries) to 10 (i.e. 9 retries): .. code-block:: python from tenacity import stop_after_attempt from zyte_api import RetryFactory class CustomRetryFactory(RetryFactory): temporary_download_error_stop = stop_after_attempt(10) CUSTOM_RETRY_POLICY = CustomRetryFactory().build() To retry :ref:`permanent download errors <zyte-api-permanent-download-errors>`, treating them the same as :ref:`temporary download errors <zyte-api-temporary-download-errors>`: .. code-block:: python from tenacity import RetryCallState, retry_if_exception, stop_after_attempt from zyte_api import RequestError, RetryFactory def is_permanent_download_error(exc: BaseException) -> bool: return isinstance(exc, RequestError) and exc.status == 521 class CustomRetryFactory(RetryFactory): retry_condition = RetryFactory.retry_condition | retry_if_exception( is_permanent_download_error ) def wait(self, retry_state: RetryCallState) -> float: if is_permanent_download_error(retry_state.outcome.exception()): return self.temporary_download_error_wait(retry_state=retry_state) return super().wait(retry_state) def stop(self, retry_state: RetryCallState) -> bool: if is_permanent_download_error(retry_state.outcome.exception()): return self.temporary_download_error_stop(retry_state) return super().stop(retry_state) CUSTOM_RETRY_POLICY = CustomRetryFactory().build() """ retry_condition: retry_base = ( retry_if_exception(_is_throttling_error) | retry_if_exception(_is_network_error) | retry_if_exception(_is_temporary_download_error) ) # throttling throttling_wait = wait_chain( # always wait 20-40s first wait_fixed(20) + wait_random(0, 20), # wait 20-40s again wait_fixed(20) + wait_random(0, 20), # wait from 30 to 630s, with full jitter and exponentially # increasing max wait time wait_fixed(30) + wait_random_exponential(multiplier=1, max=600), ) # connection errors, other client and server failures network_error_wait = ( # wait from 3s to ~1m wait_random(3, 7) + wait_random_exponential(multiplier=1, max=55) ) temporary_download_error_wait = network_error_wait throttling_stop = stop_never network_error_stop = stop_after_delay(15 * 60) temporary_download_error_stop = stop_after_attempt(4) def wait(self, retry_state: RetryCallState) -> float: assert retry_state.outcome, "Unexpected empty outcome" exc = retry_state.outcome.exception() assert exc, "Unexpected empty exception" if _is_throttling_error(exc): return self.throttling_wait(retry_state=retry_state) if _is_network_error(exc): return self.network_error_wait(retry_state=retry_state) assert _is_temporary_download_error(exc) # See retry_condition return self.temporary_download_error_wait(retry_state=retry_state) def stop(self, retry_state: RetryCallState) -> bool: assert retry_state.outcome, "Unexpected empty outcome" exc = retry_state.outcome.exception() assert exc, "Unexpected empty exception" if _is_throttling_error(exc): return self.throttling_stop(retry_state) if _is_network_error(exc): return self.network_error_stop(retry_state) assert _is_temporary_download_error(exc) # See retry_condition return self.temporary_download_error_stop(retry_state) def reraise(self) -> bool: return True def build(self) -> AsyncRetrying: return AsyncRetrying( wait=self.wait, retry=self.retry_condition, stop=self.stop, reraise=self.reraise(), before=before_log(logger, logging.DEBUG), after=after_log(logger, logging.DEBUG), before_sleep=before_sleep_log(logger, logging.DEBUG), )
zyte_api_retrying: AsyncRetrying = RetryFactory().build()