Source code for zyte_api._sync

import asyncio
from typing import Generator, List, Optional, Union

from aiohttp import ClientSession
from tenacity import AsyncRetrying

from ._async import AsyncZyteAPI
from .constants import API_URL


def _get_loop():
    try:
        return asyncio.get_event_loop()
    except RuntimeError:  # pragma: no cover (tests always have a running loop)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return loop


class _Session:
    def __init__(self, client, **session_kwargs):
        self._client = client

        # https://github.com/aio-libs/aiohttp/pull/1468
        async def create_session():
            return client._async_client.session(**session_kwargs)._session

        loop = _get_loop()
        self._session = loop.run_until_complete(create_session())

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        loop = _get_loop()
        loop.run_until_complete(self._session.close())

    def close(self):
        loop = _get_loop()
        loop.run_until_complete(self._session.close())

    def get(
        self,
        query: dict,
        *,
        endpoint: str = "extract",
        handle_retries=True,
        retrying: Optional[AsyncRetrying] = None,
    ):
        return self._client.get(
            query=query,
            endpoint=endpoint,
            handle_retries=handle_retries,
            retrying=retrying,
            session=self._session,
        )

    def iter(
        self,
        queries: List[dict],
        *,
        endpoint: str = "extract",
        handle_retries=True,
        retrying: Optional[AsyncRetrying] = None,
    ) -> Generator[Union[dict, Exception], None, None]:
        return self._client.iter(
            queries=queries,
            endpoint=endpoint,
            session=self._session,
            handle_retries=handle_retries,
            retrying=retrying,
        )


[docs] class ZyteAPI: """:ref:`Synchronous Zyte API client <sync>`. *api_key* is your Zyte API key. If not specified, it is read from the ``ZYTE_API_KEY`` environment variable. See :ref:`api-key`. *api_url* is the Zyte API base URL. *n_conn* is the maximum number of concurrent requests to use. See :ref:`api-optimize`. *retrying* is the retry policy for requests. Defaults to :data:`~zyte_api.zyte_api_retrying`. *user_agent* is the user agent string reported to Zyte API. Defaults to ``python-zyte-api/<VERSION>``. .. tip:: To change the ``User-Agent`` header sent to a target website, use :http:`request:customHttpRequestHeaders` instead. """ def __init__( self, *, api_key=None, api_url=API_URL, n_conn=15, retrying: Optional[AsyncRetrying] = None, user_agent: Optional[str] = None, ): self._async_client = AsyncZyteAPI( api_key=api_key, api_url=api_url, n_conn=n_conn, retrying=retrying, user_agent=user_agent, )
[docs] def get( self, query: dict, *, endpoint: str = "extract", session: Optional[ClientSession] = None, handle_retries: bool = True, retrying: Optional[AsyncRetrying] = None, ) -> dict: """Send *query* to Zyte API and return the result. *endpoint* is the Zyte API endpoint path relative to the client object *api_url*. *session* is the network session to use. Consider using :meth:`session` instead of this parameter. *handle_retries* determines whether or not a :ref:`retry policy <retry-policy>` should be used. *retrying* is the :ref:`retry policy <retry-policy>` to use, provided *handle_retries* is ``True``. If not specified, the :ref:`default retry policy <default-retry-policy>` is used. """ loop = _get_loop() future = self._async_client.get( query=query, endpoint=endpoint, session=session, handle_retries=handle_retries, retrying=retrying, ) return loop.run_until_complete(future)
[docs] def iter( self, queries: List[dict], *, endpoint: str = "extract", session: Optional[ClientSession] = None, handle_retries: bool = True, retrying: Optional[AsyncRetrying] = None, ) -> Generator[Union[dict, Exception], None, None]: """Send multiple *queries* to Zyte API in parallel and iterate over their results as they come. The number of *queries* can exceed the *n_conn* parameter set on the client object. Extra queries will be queued, there will be only up to *n_conn* requests being processed in parallel at a time. Results may come an a different order from the original list of *queries*. You can use :http:`request:echoData` to attach metadata to queries, and later use that metadata to restore their original order. When exceptions occur, they are yielded, not raised. The remaining parameters work the same as in :meth:`get`. """ loop = _get_loop() for future in self._async_client.iter( queries=queries, endpoint=endpoint, session=session, handle_retries=handle_retries, retrying=retrying, ): try: yield loop.run_until_complete(future) except Exception as exception: yield exception
[docs] def session(self, **kwargs): """:ref:`Context manager <context-managers>` to create a session. A session is an object that has the same API as the client object, except: - :meth:`get` and :meth:`iter` do not have a *session* parameter, the session creates an :class:`aiohttp.ClientSession` object and passes it to :meth:`get` and :meth:`iter` automatically. - It does not have a :meth:`session` method. Using the same :class:`aiohttp.ClientSession` object for all Zyte API requests improves performance by keeping a pool of reusable connections to Zyte API. The :class:`aiohttp.ClientSession` object is created with sane defaults for Zyte API, but you can use *kwargs* to pass additional parameters to :class:`aiohttp.ClientSession` and even override those sane defaults. You do not need to use :meth:`session` as a context manager as long as you call ``close()`` on the object it returns when you are done: .. code-block:: python session = client.session() try: ... finally: session.close() """ return _Session(client=self, **kwargs)