"""Collection service implementation."""
from collections.abc import Mapping
from typing import Any, cast
from numistalib import logger
from numistalib.client import AsyncClientProtocol, NumistaResponse, SyncClientProtocol
from numistalib.models.collections import CollectedItem, GradingDetails, Picture, TypeDetail, UserCollection
from numistalib.services.collections.base import CollectionServiceBase
[docs]
class CollectionService(CollectionServiceBase):
"""Unified collection service supporting both sync and async clients.
Requires OAuth authentication with appropriate scopes.
"""
[docs]
def __init__(self, client: SyncClientProtocol | AsyncClientProtocol) -> None:
"""Initialize collection service.
Parameters
----------
client : SyncClientProtocol | AsyncClientProtocol
HTTP client instance (sync or async)
"""
super().__init__(client)
[docs]
def to_models( # noqa: PLR6301
self, items: list[Mapping[str, Any]], user_id: int | None = None, **kwargs: Any # noqa: ARG002
) -> list[CollectedItem]:
"""Convert API response items to CollectedItem models.
Parameters
----------
items : list[Mapping[str, Any]]
Raw API response items
user_id : int | None
Associated user ID (not used, kept for interface compatibility)
**kwargs : Any
Additional context
Returns
-------
list[CollectedItem]
Parsed collected item models
"""
collected_items: list[CollectedItem] = []
for item in items:
type_dict = cast(Mapping[str, Any], item["type"])
type_obj = TypeDetail(**type_dict)
issue_obj = None
if item.get("issue"):
issue_obj = cast(dict[str, object], item["issue"])
price_obj = None
if item.get("price"):
price_obj = cast(dict[str, object], item["price"])
collection_obj = None
if item.get("collection"):
collection_dict = cast(Mapping[str, Any], item["collection"])
collection_obj = UserCollection(**collection_dict)
pictures_obj = None
if item.get("pictures"):
pictures_list = cast(list[Mapping[str, Any]], item["pictures"])
pictures_obj = [Picture(**pic) for pic in pictures_list]
grading_obj = None
if item.get("grading_details"):
grading_dict = cast(Mapping[str, Any], item["grading_details"])
grading_obj = GradingDetails(**grading_dict)
grade_val = cast(str | None, item.get("grade"))
collected_items.append(
CollectedItem(
id=cast(int, item["id"]),
quantity=cast(int, item.get("quantity", 1)),
type=type_obj,
for_swap=bool(item.get("for_swap", False)),
issue=issue_obj,
grade=grade_val, # type: ignore[arg-type]
private_comment=cast(str | None, item.get("private_comment")),
public_comment=cast(str | None, item.get("public_comment")),
price=price_obj,
collection=collection_obj,
pictures=pictures_obj,
storage_location=cast(str | None, item.get("storage_location")),
acquisition_place=cast(str | None, item.get("acquisition_place")),
acquisition_date=cast(Any, item.get("acquisition_date")), # type: ignore[arg-type]
serial_number=cast(str | None, item.get("serial_number")),
internal_id=cast(str | None, item.get("internal_id")),
weight=cast(float | None, item.get("weight")),
size=cast(float | None, item.get("size")),
axis=cast(int | None, item.get("axis")),
grading_details=grading_obj,
)
)
return collected_items
[docs]
def get_collected_items(
self,
user_id: int,
category: str | None = None,
type_id: int | None = None,
collection_id: int | None = None,
) -> list[CollectedItem]:
"""Get all items in a user's collection.
Requires OAuth 2.0 authentication with 'view_collection' scope.
Supports both sync and async clients via duck-typing.
Parameters
----------
user_id : int
Numista user ID
category : str | None
Filter by category (coin, banknote, exonumia)
type_id : int | None
Filter by type ID
collection_id : int | None
Filter by collection ID
Returns
-------
list[CollectedItem]
List of collected items
Raises
------
httpx.HTTPStatusError
If user not found, unauthorized, or API error
"""
logger.debug(
"→ get_collected_items(user_id=%s, category=%s, type_id=%s, collection_id=%s)",
user_id,
category,
type_id,
collection_id,
)
params = self._build_params(
category=category, type=type_id, collection=collection_id
)
response = cast(
NumistaResponse,
self._client.get(f"/users/{user_id}/collected_items", params=params),
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
items_raw = cast(list[Mapping[str, Any]], data.get("collected_items", []))
items = self.to_models(items_raw, user_id=user_id)
logger.info(
f"Retrieved {len(items)} collected items for user {user_id} {response.cached_indicator}"
)
return items
[docs]
def get_collected_item(self, user_id: int, item_id: int) -> CollectedItem:
"""Get details about a specific collected item.
Requires OAuth 2.0 authentication with 'view_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
Returns
-------
CollectedItem
Collected item details
Raises
------
httpx.HTTPStatusError
If item not found, unauthorized, or API error
"""
logger.debug(
"→ get_collected_item(user_id=%s, item_id=%s)",
user_id,
item_id,
)
response = cast(
NumistaResponse,
self._client.get(f"/users/{user_id}/collected_items/{item_id}"),
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Retrieved collected item {item_id} {response.cached_indicator}")
return item
[docs]
def get_collections(self, user_id: int) -> list[UserCollection]:
"""Get all collections for a user.
Requires OAuth 2.0 authentication with 'view_collection' scope.
Parameters
----------
user_id : int
Numista user ID
Returns
-------
list[UserCollection]
List of user collections
Raises
------
httpx.HTTPStatusError
If user not found, unauthorized, or API error
"""
logger.debug("→ get_collections(user_id=%s)", user_id)
response = cast(
NumistaResponse,
self._client.get(f"/users/{user_id}/collections"),
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
collections_raw = cast(list[Mapping[str, Any]], data.get("collections", []))
collections = [
UserCollection(
numista_id=cast(int, col["id"]),
name=cast(str, col["name"]),
)
for col in collections_raw
]
logger.info(
f"Retrieved {len(collections)} collections for user {user_id} {response.cached_indicator}"
)
return collections
[docs]
async def get_collected_items_async(
self,
user_id: int,
category: str | None = None,
type_id: int | None = None,
collection_id: int | None = None,
) -> list[CollectedItem]:
"""Get all items in a user's collection (async).
Parameters
----------
user_id : int
Numista user ID
category : str | None
Filter by category (coin, banknote, exonumia)
type_id : int | None
Filter by type ID
collection_id : int | None
Filter by collection ID
Returns
-------
list[CollectedItem]
List of collected items
Raises
------
httpx.HTTPStatusError
If user not found, unauthorized, or API error
"""
logger.debug(
"→ get_collected_items_async(user_id=%s, category=%s, type_id=%s, collection_id=%s)",
user_id,
category,
type_id,
collection_id,
)
params = self._build_params(
category=category, type=type_id, collection=collection_id
)
response = await self._aget(f"/users/{user_id}/collected_items", params=params)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
items_raw = cast(list[Mapping[str, Any]], data.get("collected_items", []))
items = self.to_models(items_raw, user_id=user_id)
logger.info(
f"Retrieved {len(items)} collected items for user {user_id} {response.cached_indicator}"
)
return items
[docs]
async def get_collected_item_async(self, user_id: int, item_id: int) -> CollectedItem:
"""Get details about a specific collected item (async).
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
Returns
-------
CollectedItem
Collected item details
Raises
------
httpx.HTTPStatusError
If item not found, unauthorized, or API error
"""
logger.debug(
"→ get_collected_item_async(user_id=%s, item_id=%s)",
user_id,
item_id,
)
response = await self._aget(f"/users/{user_id}/collected_items/{item_id}")
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Retrieved collected item {item_id} {response.cached_indicator}")
return item
[docs]
async def get_collections_async(self, user_id: int) -> list[UserCollection]:
"""Get all collections for a user (async).
Parameters
----------
user_id : int
Numista user ID
Returns
-------
list[UserCollection]
List of user collections
Raises
------
httpx.HTTPStatusError
If user not found, unauthorized, or API error
"""
logger.debug("→ get_collections_async(user_id=%s)", user_id)
response = await self._aget(f"/users/{user_id}/collections")
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
collections_raw = cast(list[Mapping[str, Any]], data.get("collections", []))
collections = [
UserCollection(
numista_id=cast(int, col["id"]),
name=cast(str, col["name"]),
)
for col in collections_raw
]
logger.info(
f"Retrieved {len(collections)} collections for user {user_id} {response.cached_indicator}"
)
return collections
[docs]
def add_collected_item(self, user_id: int, item_data: dict[str, object]) -> CollectedItem:
"""Add item to user's collection.
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_data : dict
Item data payload
Returns
-------
CollectedItem
Created collected item
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ add_collected_item(user_id=%s)", user_id)
response = cast(
NumistaResponse,
self._client.post(f"/users/{user_id}/collected_items", json=item_data),
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Added collected item {item.id} {response.cached_indicator}")
return item
[docs]
async def add_collected_item_async(self, user_id: int, item_data: dict[str, object]) -> CollectedItem:
"""Add item to user's collection (async).
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_data : dict
Item data payload
Returns
-------
CollectedItem
Created collected item
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ add_collected_item_async(user_id=%s)", user_id)
response = await self._apost(f"/users/{user_id}/collected_items", json=item_data)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Added collected item {item.id} {response.cached_indicator}")
return item
[docs]
def edit_collected_item(self, user_id: int, item_id: int, item_data: dict[str, object]) -> CollectedItem:
"""Edit item in user's collection.
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
item_data : dict
Item data payload (all fields optional)
Returns
-------
CollectedItem
Updated collected item
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ edit_collected_item(user_id=%s, item_id=%s)", user_id, item_id)
response = cast(
NumistaResponse,
self._client.patch(
f"/users/{user_id}/collected_items/{item_id}", json=item_data
),
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Edited collected item {item_id} {response.cached_indicator}")
return item
[docs]
async def edit_collected_item_async(
self, user_id: int, item_id: int, item_data: dict[str, object]
) -> CollectedItem:
"""Edit item in user's collection (async).
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
item_data : dict
Item data payload (all fields optional)
Returns
-------
CollectedItem
Updated collected item
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ edit_collected_item_async(user_id=%s, item_id=%s)", user_id, item_id)
response = await self._apatch(
f"/users/{user_id}/collected_items/{item_id}", json=item_data
)
response.raise_for_status()
self._track_response(response)
data = cast(Mapping[str, Any], response.json())
item = self.to_models([data], user_id=user_id)[0]
logger.info(f"Edited collected item {item_id} {response.cached_indicator}")
return item
[docs]
def delete_collected_item(self, user_id: int, item_id: int) -> None:
"""Delete item from user's collection.
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ delete_collected_item(user_id=%s, item_id=%s)", user_id, item_id)
response = cast(
NumistaResponse,
self._client.delete(f"/users/{user_id}/collected_items/{item_id}"),
)
response.raise_for_status()
self._track_response(response)
logger.info(f"Deleted collected item {item_id} {response.cached_indicator}")
[docs]
async def delete_collected_item_async(self, user_id: int, item_id: int) -> None:
"""Delete item from user's collection (async).
Requires OAuth 2.0 authentication with 'edit_collection' scope.
Parameters
----------
user_id : int
Numista user ID
item_id : int
Collected item ID
Raises
------
httpx.HTTPStatusError
If API returns error
"""
logger.debug("→ delete_collected_item_async(user_id=%s, item_id=%s)", user_id, item_id)
response = await self._adelete(f"/users/{user_id}/collected_items/{item_id}")
response.raise_for_status()
self._track_response(response)
logger.info(f"Deleted collected item {item_id} {response.cached_indicator}")