Source code for numistalib.services.issues.service

"""Issue service implementation."""

from collections.abc import AsyncGenerator, Mapping
from typing import Any, cast

from numistalib import logger
from numistalib.client import AsyncClientProtocol, NumistaResponse, SyncClientProtocol
from numistalib.models.issues import Issue, Mark, Signature
from numistalib.models.references import Catalogue, Reference
from numistalib.services.issues.base import IssueServiceBase


[docs] class IssueService(IssueServiceBase): """Unified issue service supporting both sync and async clients.""" CLASS_ITEMS_KEY = "issues"
[docs] def __init__(self, client: SyncClientProtocol | AsyncClientProtocol) -> None: """Initialize issue service. Parameters ---------- client : SyncClientProtocol | AsyncClientProtocol HTTP client instance (sync or async) """ super().__init__(client)
[docs] def to_models( # noqa: PLR6301 self, items: list[Mapping[str, Any]], type_id: int | None = None, **kwargs: Any # noqa: ARG002 ) -> list[Issue]: """Convert API response items to Issue models. Parameters ---------- items : list[Mapping[str, Any]] Raw API response items type_id : int | None Numista type ID (required context) **kwargs : Any Additional context Returns ------- list[Issue] Parsed issue models """ if type_id is None: raise ValueError("type_id is required for issue conversion") issues: list[Issue] = [] for item in items: # Parse marks array marks_data = item.get("marks", []) marks = [ Mark( mark_id=mark["id"], title=mark.get("title"), picture=mark.get("picture"), letters=mark.get("letters"), ) for mark in marks_data ] # Parse signatures array signatures_data = item.get("signatures", []) signatures = [ Signature( signer_name=sig["signer_name"], signer_title=sig.get("signer_title"), ) for sig in signatures_data ] # Parse references array references_data = item.get("references", []) references = [ Reference( catalogue=Catalogue( id=ref["catalogue"]["id"], code=ref["catalogue"]["code"], ), number=ref["number"], url=ref.get("url"), ) for ref in references_data ] issues.append( Issue( numista_id=cast(int, item["id"]), type_id=type_id, is_dated=cast(bool | None, item.get("is_dated")) or False, year=cast(int | None, item.get("year")), gregorian_year=cast(int | None, item.get("gregorian_year")), min_year=cast(int | None, item.get("min_year")), max_year=cast(int | None, item.get("max_year")), mint_letter=cast(str | None, item.get("mint_letter")), marks=marks, signatures=signatures, mintage=cast(int | None, item.get("mintage")), references=references, comment=cast(str | None, item.get("comment")), ) ) return issues
[docs] def get_issues(self, type_id: int, lang: str = "en") -> list[Issue]: """Get all issues for a specific type (single call). Supports both sync and async clients via duck-typing. Parameters ---------- type_id : int Numista type ID lang : str Language code Returns ------- list[Issue] List of issues Raises ------ httpx.HTTPStatusError If API returns error """ logger.debug("→ get_issues(type_id=%s, lang=%s)", type_id, lang) params = self._build_params(lang=lang) if lang else None response = cast( NumistaResponse, self._client.get(f"/types/{type_id}/issues", params=params), ) response.raise_for_status() self._track_response(response) data = response.json() items = cast(list[Mapping[str, Any]], data) if isinstance(data, list) else cast( list[Mapping[str, Any]], data.get("issues", []) ) issues = self.to_models(items, type_id=type_id) logger.info( f"Retrieved {len(issues)} issues for type {type_id} {response.cached_indicator}" ) return issues
[docs] def add_issue( self, type_id: int, issue_data: Mapping[str, Any], lang: str | None = None, ) -> Issue: """Add a new issue to a type. Parameters ---------- type_id : int Numista type ID issue_data : Mapping[str, Any] Issue data payload lang : str | None Language code Returns ------- Issue Created issue Raises ------ httpx.HTTPStatusError If API returns error """ logger.debug("→ add_issue(type_id=%s, lang=%s)", type_id, lang) params = self._build_params(lang=lang) if lang else None response = cast( NumistaResponse, self._client.post( f"/types/{type_id}/issues", params=params, json=dict(issue_data) ), ) response.raise_for_status() self._track_response(response) data = cast(Mapping[str, Any], response.json()) issue = self.to_models([data], type_id=type_id)[0] logger.info( f"Added issue {issue.numista_id} for type {type_id} {response.cached_indicator}" ) return issue
[docs] async def get_issues_async(self, type_id: int, lang: str = "en") -> list[Issue]: """Get all issues for a specific type (async). Parameters ---------- type_id : int Numista type ID lang : str Language code Returns ------- list[Issue] List of issues Raises ------ httpx.HTTPStatusError If API returns error """ logger.debug("→ get_issues_async(type_id=%s, lang=%s)", type_id, lang) params = self._build_params(lang=lang) if lang else None response = await self._aget(f"/types/{type_id}/issues", params=params) response.raise_for_status() self._track_response(response) data = response.json() items = cast(list[Mapping[str, Any]], data) if isinstance(data, list) else cast( list[Mapping[str, Any]], data.get("issues", []) ) issues = self.to_models(items, type_id=type_id) logger.info( f"Retrieved {len(issues)} issues for type {type_id} {response.cached_indicator}" ) return issues
[docs] async def add_issue_async( self, type_id: int, issue_data: Mapping[str, Any], lang: str | None = None, ) -> Issue: """Add a new issue to a type (async). Parameters ---------- type_id : int Numista type ID issue_data : Mapping[str, Any] Issue data payload lang : str | None Language code Returns ------- Issue Created issue Raises ------ httpx.HTTPStatusError If API returns error """ logger.debug("→ add_issue_async(type_id=%s, lang=%s)", type_id, lang) params = self._build_params(lang=lang) if lang else None response = await self._apost( f"/types/{type_id}/issues", params=params, json=dict(issue_data) ) response.raise_for_status() self._track_response(response) data = cast(Mapping[str, Any], response.json()) issue = self.to_models([data], type_id=type_id)[0] logger.info( f"Added issue {issue.numista_id} for type {type_id} {response.cached_indicator}" ) return issue
[docs] async def paginated_issues( self, type_id: int, lang: str = "en", limit: int = 100, ) -> AsyncGenerator[Issue]: """Lazily iterate issues by following pagination links. Parameters ---------- type_id : int Numista type ID lang : str Language code limit : int Results per page (max 100) Yields ------ Issue Individual issue records """ logger.debug( "→ paginated_issues(type_id=%s, lang=%s, limit=%s)", type_id, lang, limit ) url: str | None = f"/types/{type_id}/issues?lang={lang}&count={limit}" page_count = 0 while url: page_count += 1 logger.debug("Fetching issues page %s: %s", page_count, url) response = await self._aget(url) response.raise_for_status() self._track_response(response) data = response.json() if isinstance(data, list): items = cast(list[Mapping[str, Any]], data) next_url = None else: mapping = cast(Mapping[str, Any], data) items = cast(list[Mapping[str, Any]], mapping.get(self.CLASS_ITEMS_KEY, [])) next_url = cast(str | None, mapping.get("next_url")) issues = self.to_models(items, type_id=type_id) for issue in issues: yield issue url = next_url logger.info("Finished paginating through %s pages of issues", page_count)
[docs] async def get_issues_paginated( self, type_id: int, lang: str = "en", limit: int = 100, ) -> AsyncGenerator[Issue]: """Paginate through issues for a type (alias for paginated_issues). Parameters ---------- type_id : int Numista type ID lang : str Language code limit : int Results per page (max 100) Yields ------ Issue Individual issue records """ async for issue in self.paginated_issues(type_id=type_id, lang=lang, limit=limit): yield issue
# Backward compatibility exports IssueServiceAsync = IssueService