Source code for numistalib.client

"""Numista API client implementations (sync/async) with caching and throttling.

Provides common logging, dependency injection, cache indicators, rate limiting,
and retry logic for resilient access to the Numista API.

Legal & Attribution
-------------------

- Unofficial: This library is an independent, community project and is not affiliated with Numista.
- Attribution: Numista is a trademark/service of Numista. Please attribute Numista where their data is displayed.
- Compliance: Users must comply with Numista's published terms:
    * Conditions of use: https://en.numista.com/conditions.php
    * Legal information: https://en.numista.com/legal.php
    * Pricing API terms: https://en.numista.com/api/pricing.php
- Pricing API usage considerations: Respect any restrictions on caching, retention, and redistribution of
    pricing responses. Prefer conservative TTLs and avoid republishing price data beyond what Numista permits.
    Configure cache behavior (`default_ttl`, `refresh_ttl_on_access`) accordingly.
"""

import asyncio
import logging
import random
import time
from abc import ABC, abstractmethod
from typing import Any, Protocol

import httpx
from hishel import AsyncSqliteStorage, BaseFilter, FilterPolicy, Request, SyncSqliteStorage
from hishel.httpx import AsyncCacheClient, SyncCacheClient
from pyrate_limiter import Limiter, Rate

# Local constants to avoid circular imports
CACHE_HIT_ICON = "💾"
CACHE_MISS_ICON = "🌐"
DEFAULT_CACHE_DB = "hishel_cache.db"
DEFAULT_CACHE_DIR = ".cache/numistalib/hishel"
DEFAULT_CACHE_REFRESH_ON_ACCESS = True
DEFAULT_CACHE_TTL = 604800  # 7 days
DEFAULT_RATE_LIMIT_PERIOD = 60  # seconds
DEFAULT_RATE_LIMIT_REQUESTS = 45  # requests
DEFAULT_TIMEOUT = 30.0  # seconds
LOGGER_NAME = "numistalib"
DEFAULT_RETRY_ATTEMPTS = 3
DEFAULT_BACKOFF_BASE = 0.5  # seconds
DEFAULT_BACKOFF_MAX = 5.0   # seconds

logger = logging.getLogger(LOGGER_NAME)
_CLIENT_REGISTRY: list["NumistaClient"] = []


class SyncClientProtocol(Protocol):
    """Synchronous client protocol returning NumistaResponse."""

    def get(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    def post(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    def patch(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    def put(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    def delete(self, url: str, **kwargs: Any) -> "NumistaResponse": ...


class AsyncClientProtocol(Protocol):
    """Asynchronous client protocol returning NumistaResponse."""

    async def get(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    async def post(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    async def patch(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    async def put(self, url: str, **kwargs: Any) -> "NumistaResponse": ...
    async def delete(self, url: str, **kwargs: Any) -> "NumistaResponse": ...


class CacheAllGETRequests(BaseFilter[Request]):
    """Filter to cache all GET requests regardless of response headers.

    This is designed for legacy APIs (like Numista) that don't send proper
    HTTP caching headers (Cache-Control, ETag, Expires). Per hishel docs,
    FilterPolicy is recommended for legacy APIs that don't implement
    HTTP caching headers correctly.
    """

    def needs_body(self) -> bool:  # noqa: PLR6301
        """Indicate that we don't need to inspect the request body."""
        return False

    def apply(self, item: Request, body: bytes | None) -> bool:  # noqa: ARG002, PLR6301
        """Apply filter: cache all GET requests.

        Parameters
        ----------
        item : Request
            The HTTP request to filter
        body : bytes | None
            Request body (not used)

        Returns
        -------
        bool
            True if request method is GET
        """
        return item.method == "GET"


[docs] class NumistaResponse(httpx.Response): """Custom HTTP response to expose caching info.""" @property def cached(self) -> bool: """Determine if response was served from cache.""" return bool(self.extensions.get("hishel_from_cache", False)) @property def cached_indicator(self) -> str: """Get cache indicator based on response.""" return str(CACHE_HIT_ICON if self.cached else CACHE_MISS_ICON)
class NumistaClient(ABC): """Abstract base for Numista API clients (sync and async). Provides caching, rate limiting, and cache indicator tracking. """ def __init__( self, **kwargs: Any, ) -> None: """Initialize Numista client base. Parameters ---------- **kwargs : Any Configuration parameters """ self.api_key = kwargs.get("api_key") self.api_base_url = kwargs.get("api_base_url", "https://api.numista.com/v3") self.rate_limit_period = kwargs.get("rate_limit_period", DEFAULT_RATE_LIMIT_PERIOD) self.rate_limit_requests = int(kwargs.get("rate_limit_requests", DEFAULT_RATE_LIMIT_REQUESTS)) self.timeout = int(kwargs.get("timeout", DEFAULT_TIMEOUT)) self.database_cache_dir = kwargs.get("database_cache_dir", DEFAULT_CACHE_DIR) self.database_cache_db = kwargs.get("database_cache_db", DEFAULT_CACHE_DB) self.default_ttl = int(kwargs.get("default_ttl", DEFAULT_CACHE_TTL)) self.refresh_ttl_on_access = kwargs.get("refresh_ttl_on_access", DEFAULT_CACHE_REFRESH_ON_ACCESS) self._client: httpx.Client | httpx.AsyncClient | None = None if not self.api_key: raise ValueError("Numista API key is required via parameter or NUMISTA_API_KEY environment variable") self.headers = { "Numista-API-Key": self.api_key, # "User-Agent": f"numistalib/{__version__}", } logger.debug(f"{self.__class__.__name__} initialized") _CLIENT_REGISTRY.append(self) def close(self) -> None: """Close underlying HTTP client if open. Subclasses may override.""" try: if self._client and hasattr(self._client, "close"): self._client.close() # type: ignore[attr-defined] finally: self._client = None @property def database_full_path(self) -> str: """Get the full path to the cache database. Creates the cache directory if it doesn't exist. """ from pathlib import Path cache_dir = Path(self.database_cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) return str(cache_dir / self.database_cache_db) def _build_url(self, path: str) -> str: """Build full URL from base and path. Parameters ---------- path : str API path (relative or absolute URL) Returns ------- str Full URL """ if path.startswith(("http://", "https://")): return path return f"{self.api_base_url.rstrip('/')}/{path.lstrip('/')}" def _wrap_response(self, response: httpx.Response) -> NumistaResponse: # noqa: PLR6301 """Wrap httpx.Response as NumistaResponse to expose cache indicators. Parameters ---------- response : httpx.Response Raw HTTP response Returns ------- NumistaResponse Response with cache indicator support """ # Cast the response to NumistaResponse by setting its class # This avoids re-reading the response body response.__class__ = NumistaResponse return response # type: ignore @staticmethod def _jitter_delay(attempt: int) -> float: """Compute exponential backoff with jitter. Parameters ---------- attempt : int Zero-based attempt number Returns ------- float Sleep duration in seconds """ base = DEFAULT_BACKOFF_BASE * (2 ** attempt) delay = min(base, DEFAULT_BACKOFF_MAX) return float(delay * (0.5 + random.random())) @property def rate(self) -> Rate: """Get the rate limit configuration.""" return Rate(self.rate_limit_requests, self.rate_limit_period) @property def limiter(self) -> Limiter: """Get the rate limiter instance.""" return Limiter([self.rate], raise_when_fail=False) @property @abstractmethod def storage(self) -> SyncSqliteStorage | AsyncSqliteStorage: """Get the cache storage instance. Must be implemented by subclasses. Returns ------- SyncSqliteStorage | AsyncSqliteStorage Cache storage instance """ pass @property @abstractmethod def client(self) -> httpx.Client | httpx.AsyncClient: """Get the HTTP client instance. Must be implemented by subclasses. Returns ------- httpx.Client | httpx.AsyncClient HTTP client instance """ pass class NumistaClientSync(NumistaClient): """Synchronous Numista API client with caching and rate limiting.""" @property def storage(self) -> SyncSqliteStorage: """Get the synchronous cache storage instance.""" return SyncSqliteStorage( database_path=self.database_full_path, default_ttl=self.default_ttl, refresh_ttl_on_access=self.refresh_ttl_on_access, ) @property def client(self) -> httpx.Client: """Get the synchronous HTTP client instance. Caches the client to prevent creating new instances on each request. """ if self._client is None: policy = FilterPolicy(request_filters=[CacheAllGETRequests()]) storage = self.storage self._client = SyncCacheClient( storage=storage, headers=self.headers, timeout=self.timeout, policy=policy, ) # Keep a reference to storage so we can close it explicitly self._storage = storage # type: ignore[attr-defined] return self._client # type: ignore def close(self) -> None: try: if self._client and hasattr(self._client, "close"): self._client.close() # type: ignore[attr-defined] finally: self._client = None # Close hishel storage connection if present storage = getattr(self, "_storage", None) try: if storage is not None and hasattr(storage, "close"): storage.close() finally: if hasattr(self, "_storage"): self._storage = None # type: ignore[assignment] def get(self, url: str, **kwargs: Any) -> NumistaResponse: """Make a synchronous GET request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) # Retry with exponential backoff and jitter for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = self.client.get(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err time.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: retry loop must return or raise") def post(self, url: str, **kwargs: Any) -> NumistaResponse: """Make a synchronous POST request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = self.client.post(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err time.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: retry loop must return or raise") def patch(self, url: str, **kwargs: Any) -> NumistaResponse: """Make a synchronous PATCH request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = self.client.patch(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err time.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") def put(self, url: str, **kwargs: Any) -> NumistaResponse: """Make a synchronous PUT request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = self.client.put(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err time.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") def delete(self, url: str, **kwargs: Any) -> NumistaResponse: """Make a synchronous DELETE request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = self.client.delete(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err time.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") class NumistaClientAsync(NumistaClient): """Asynchronous Numista API client with caching and rate limiting.""" @property def storage(self) -> AsyncSqliteStorage: """Get the asynchronous cache storage instance.""" return AsyncSqliteStorage( database_path=self.database_full_path, default_ttl=self.default_ttl, refresh_ttl_on_access=self.refresh_ttl_on_access, ) @property def client(self) -> httpx.AsyncClient: """Get the asynchronous HTTP client instance. Caches the client to prevent creating new instances on each request. """ if self._client is None: policy = FilterPolicy(request_filters=[CacheAllGETRequests()]) self._client = AsyncCacheClient( storage=self.storage, headers=self.headers, timeout=self.timeout, policy=policy, ) return self._client # type: ignore async def get(self, url: str, **kwargs: Any) -> NumistaResponse: """Make an asynchronous GET request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = await self.client.get(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err await asyncio.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: retry loop must return or raise") async def post(self, url: str, **kwargs: Any) -> NumistaResponse: """Make an asynchronous POST request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = await self.client.post(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err await asyncio.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: retry loop must return or raise") async def patch(self, url: str, **kwargs: Any) -> NumistaResponse: """Make an asynchronous PATCH request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = await self.client.patch(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err await asyncio.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") async def put(self, url: str, **kwargs: Any) -> NumistaResponse: """Make an asynchronous PUT request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = await self.client.put(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err await asyncio.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") async def delete(self, url: str, **kwargs: Any) -> NumistaResponse: """Make an asynchronous DELETE request. Parameters ---------- url : str Request URL (relative or absolute) **kwargs : Any Additional request parameters Returns ------- NumistaResponse HTTP response with cache indicator """ full_url = self._build_url(url) for attempt in range(DEFAULT_RETRY_ATTEMPTS): try: response = await self.client.delete(full_url, **kwargs) return self._wrap_response(response) except httpx.HTTPError as err: if attempt == DEFAULT_RETRY_ATTEMPTS - 1: raise err await asyncio.sleep(self._jitter_delay(attempt)) raise AssertionError("Unreachable: loop must return or raise") async def aclose(self) -> None: """Close underlying HTTP client and storage if open.""" try: if self._client and hasattr(self._client, "aclose"): await self._client.aclose() # type: ignore[attr-defined] finally: self._client = None storage = getattr(self, "_storage", None) if storage is not None and hasattr(storage, "close"): await storage.close() # type: ignore[attr-defined]
[docs] class NumistaApiClient: """Unified client factory for both sync and async HTTP operations. Wraps NumistaClientSync/Async and provides a convenient interface. Uses dependency injection pattern for services. """
[docs] def __init__(self, client: NumistaClientSync | NumistaClientAsync) -> None: """Initialize with a concrete client instance. Parameters ---------- client : NumistaClientSync | NumistaClientAsync Pre-configured sync or async HTTP client """ self._client = client
@property def is_async(self) -> bool: """Check if this is an async client.""" return isinstance(self._client, NumistaClientAsync) @property def raw_client(self) -> NumistaClientSync | NumistaClientAsync: """Get the underlying raw client.""" return self._client def __enter__(self) -> "NumistaApiClient": return self def __exit__(self, *args: object) -> None: if isinstance(self._client, NumistaClientSync): self._client.close() else: # Best-effort async close in sync context loop = asyncio.new_event_loop() try: loop.run_until_complete(self._client.aclose()) # type: ignore[attr-defined] except RuntimeError: # Suppress "Event loop is closed" during cleanup pass finally: loop.close()
def close_all_clients() -> None: """Close all registered clients and clear registry (tests teardown).""" for client in list(_CLIENT_REGISTRY): if isinstance(client, NumistaClientSync): client.close() else: loop = asyncio.new_event_loop() try: loop.run_until_complete(client.aclose()) # type: ignore[attr-defined] except RuntimeError: # Suppress "Event loop is closed" during cleanup pass finally: loop.close() _CLIENT_REGISTRY.clear()