import asyncio
import time
from asyncio import Future
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
import aiohttp
from tenacity import AsyncRetrying
from ._errors import RequestError
from ._retry import zyte_api_retrying
from ._utils import _AIO_API_TIMEOUT, create_session
from .apikey import get_apikey
from .constants import API_URL
from .stats import AggStats, ResponseStats
from .utils import USER_AGENT, _process_query
if TYPE_CHECKING:
_ResponseFuture = Future[Dict[str, Any]]
else:
_ResponseFuture = Future # Python 3.8 support
def _post_func(session):
"""Return a function to send a POST request"""
if session is None:
return partial(aiohttp.request, method="POST", timeout=_AIO_API_TIMEOUT)
else:
return session.post
class _AsyncSession:
def __init__(self, client, **session_kwargs):
self._client = client
self._session = create_session(client.n_conn, **session_kwargs)
async def __aenter__(self):
return self
async def __aexit__(self, *exc_info):
await self._session.close()
async def close(self):
await self._session.close()
async def get(
self,
query: dict,
*,
endpoint: str = "extract",
handle_retries=True,
retrying: Optional[AsyncRetrying] = None,
):
return await self._client.get(
query=query,
endpoint=endpoint,
handle_retries=handle_retries,
retrying=retrying,
session=self._session,
)
def iter(
self,
queries: List[dict],
*,
endpoint: str = "extract",
handle_retries=True,
retrying: Optional[AsyncRetrying] = None,
) -> Iterator[Future]:
return self._client.iter(
queries=queries,
endpoint=endpoint,
session=self._session,
handle_retries=handle_retries,
retrying=retrying,
)
[docs]
class AsyncZyteAPI:
""":ref:`Asynchronous Zyte API client <asyncio_api>`.
Parameters work the same as for :class:`ZyteAPI`.
"""
def __init__(
self,
*,
api_key=None,
api_url=API_URL,
n_conn=15,
retrying: Optional[AsyncRetrying] = None,
user_agent: Optional[str] = None,
):
self.api_key = get_apikey(api_key)
self.api_url = api_url
self.n_conn = n_conn
self.agg_stats = AggStats()
self.retrying = retrying or zyte_api_retrying
self.user_agent = user_agent or USER_AGENT
self._semaphore = asyncio.Semaphore(n_conn)
[docs]
async def get(
self,
query: dict,
*,
endpoint: str = "extract",
session=None,
handle_retries=True,
retrying: Optional[AsyncRetrying] = None,
) -> _ResponseFuture:
"""Asynchronous equivalent to :meth:`ZyteAPI.get`."""
retrying = retrying or self.retrying
post = _post_func(session)
auth = aiohttp.BasicAuth(self.api_key)
headers = {"User-Agent": self.user_agent, "Accept-Encoding": "br"}
response_stats = []
start_global = time.perf_counter()
async def request():
stats = ResponseStats.create(start_global)
self.agg_stats.n_attempts += 1
safe_query = _process_query(query)
post_kwargs = dict(
url=self.api_url + endpoint,
json=safe_query,
auth=auth,
headers=headers,
)
try:
async with self._semaphore:
async with post(**post_kwargs) as resp:
stats.record_connected(resp.status, self.agg_stats)
if resp.status >= 400:
content = await resp.read()
resp.release()
stats.record_read()
stats.record_request_error(content, self.agg_stats)
raise RequestError(
request_info=resp.request_info,
history=resp.history,
status=resp.status,
message=resp.reason,
headers=resp.headers,
response_content=content,
query=safe_query,
)
response = await resp.json()
stats.record_read(self.agg_stats)
return response
except Exception as e:
if not isinstance(e, RequestError):
self.agg_stats.n_errors += 1
stats.record_exception(e, agg_stats=self.agg_stats)
raise
finally:
response_stats.append(stats)
if handle_retries:
request = retrying.wraps(request)
try:
# Try to make a request
result = await request()
self.agg_stats.n_success += 1
except Exception:
self.agg_stats.n_fatal_errors += 1
raise
return result
[docs]
def iter(
self,
queries: List[dict],
*,
endpoint: str = "extract",
session: Optional[aiohttp.ClientSession] = None,
handle_retries=True,
retrying: Optional[AsyncRetrying] = None,
) -> Iterator[_ResponseFuture]:
"""Asynchronous equivalent to :meth:`ZyteAPI.iter`.
.. note:: Yielded futures, when awaited, do raise their exceptions,
instead of only returning them.
"""
def _request(query):
return self.get(
query,
endpoint=endpoint,
session=session,
handle_retries=handle_retries,
retrying=retrying,
)
return asyncio.as_completed([_request(query) for query in queries])
[docs]
def session(self, **kwargs):
"""Asynchronous equivalent to :meth:`ZyteAPI.session`.
You do not need to use :meth:`~AsyncZyteAPI.session` as an async
context manager as long as you await ``close()`` on the object it
returns when you are done:
.. code-block:: python
session = client.session()
try:
...
finally:
await session.close()
"""
return _AsyncSession(client=self, **kwargs)