Advanced Usage

Advanced techniques and patterns for using numistalib.

§ 1 Async Operations

Concurrent Requests

Make multiple requests concurrently:

import asyncio
from numistalib.client import NumistaApiClient
from numistalib.config import Settings
from numistalib.services.types.service import TypeService

async def get_multiple_types(type_ids: list[int]):
    settings = Settings()
    
    async with NumistaApiClient(settings) as client:
        service = TypeService(client)
        
        # Fetch all types concurrently
        tasks = [service.get_type_async(type_id) for type_id in type_ids]
        results = await asyncio.gather(*tasks)
        
        return results

# Usage
type_ids = [95420, 95421, 95422]
results = asyncio.run(get_multiple_types(type_ids))

for coin_type in results:
    print(f"{coin_type.title}: {coin_type.weight}g")

Paginated Search with Processing

Process results as they arrive:

import asyncio
from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService

async def process_all_french_coins():
    settings = Settings()
    
    async with NumistaApiClient(settings) as client:
        service = TypeService(client)
        
        count = 0
        async for coin_type in service.paginated_search(
            issuer="france",
            category="coin"
        ):
            count += 1
            
            # Process each result
            print(f"{count}. {coin_type.title} ({coin_type.min_year})")
            
            # Could also fetch details asynchronously
            # details = await service.get_type_async(coin_type.numista_id)
        
        print(f"\nTotal: {count} coins")

asyncio.run(process_all_french_coins())

§ 2 Batch Operations

Bulk Data Collection

Collect data for multiple entities:

from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService
from numistalib.services.issues.service import IssuesService
from numistalib.services.prices.service import PricesService

def collect_complete_dataset(type_ids: list[int]):
    settings = Settings()
    dataset = []
    
    with NumistaApiClient(settings) as client:
        types_service = TypeService(client)
        issues_service = IssuesService(client)
        prices_service = PricesService(client)
        
        for type_id in type_ids:
            # Get type
            coin_type = types_service.get_type(type_id)
            
            # Get issues
            issues = issues_service.get_issues(type_id)
            
            # Get prices for each issue
            prices_data = []
            for issue in issues:
                prices = prices_service.get_prices(
                    type_id,
                    issue.id,
                    currency="USD"
                )
                prices_data.append({
                    "issue_id": issue.id,
                    "prices": prices
                })
            
            dataset.append({
                "type": coin_type,
                "issues": issues,
                "prices": prices_data
            })
    
    return dataset

# Collect data for specific types
type_ids = [95420, 95421, 95422]
data = collect_complete_dataset(type_ids)

§ 3 Custom Caching Strategies

Per-Query Cache Invalidation

Selectively invalidate cache entries:

from pathlib import Path
import shutil
from numistalib.config import Settings

def clear_cache_for_type(type_id: int):
    """Clear cached data for a specific type."""
    settings = Settings()
    cache_dir = Path(settings.cache_dir)
    
    # Cache keys follow a pattern
    # This is implementation-specific and may change
    pattern = f"*type*{type_id}*"
    
    for cache_file in cache_dir.glob(pattern):
        cache_file.unlink()
        print(f"Cleared: {cache_file}")

# Usage
clear_cache_for_type(95420)

Cache Warming

Pre-populate cache for commonly accessed data:

from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService

def warm_cache(type_ids: list[int]):
    """Pre-fetch and cache type data."""
    settings = Settings()
    
    with NumistaApiClient(settings) as client:
        service = TypeService(client)
        
        for type_id in type_ids:
            coin_type = service.get_type(type_id)
            print(f"Cached: {coin_type.title}")

# Warm cache for popular types
popular_types = [95420, 95421, 95422, 95423]
warm_cache(popular_types)

§ 4 Error Handling Patterns

Retry with Custom Logic

Implement custom retry behavior:

import time
from numistalib.client import NumistaApiClient
from numistalib.models.errors import NumistaError

def get_type_with_retry(type_id: int, max_attempts: int = 5):
    """Get type with custom retry logic."""
    settings = Settings()
    
    for attempt in range(max_attempts):
        try:
            with NumistaApiClient(settings) as client:
                service = TypeService(client)
                return service.get_type(type_id)
        
        except NumistaError as e:
            if e.status_code == 429:  # Rate limit
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            elif e.status_code >= 500:  # Server error
                print(f"Server error. Attempt {attempt + 1}/{max_attempts}")
                time.sleep(1)
            else:
                raise  # Don't retry client errors
    
    raise NumistaError("Max retries exceeded")

Graceful Degradation

Handle missing data gracefully:

from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService
from numistalib.models.errors import NumistaError

def get_type_safe(type_id: int):
    """Get type with safe fallbacks."""
    settings = Settings()
    
    try:
        with NumistaApiClient(settings) as client:
            service = TypeService(client)
            coin_type = service.get_type(type_id)
            
            return {
                "id": coin_type.numista_id,
                "title": coin_type.title or "Unknown",
                "weight": coin_type.weight or 0.0,
                "composition": coin_type.composition or "Unknown",
                "issuer": coin_type.issuer or "Unknown",
            }
    
    except NumistaError as e:
        print(f"Error fetching type {type_id}: {e}")
        return {
            "id": type_id,
            "title": "Error loading data",
            "error": str(e)
        }

§ 5 Data Processing

Export to CSV

Export search results to CSV:

import csv
from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService

def export_search_to_csv(query: str, output_file: str):
    """Export search results to CSV."""
    settings = Settings()
    
    with NumistaApiClient(settings) as client:
        service = TypeService(client)
        results = service.search_types(query=query, count=50)
        
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=[
                'id', 'title', 'issuer', 'min_year', 'max_year', 'category'
            ])
            writer.writeheader()
            
            for coin_type in results:
                writer.writerow({
                    'id': coin_type.numista_id,
                    'title': coin_type.title,
                    'issuer': coin_type.issuer,
                    'min_year': coin_type.min_year,
                    'max_year': coin_type.max_year,
                    'category': coin_type.category
                })
    
    print(f"Exported {len(results)} results to {output_file}")

# Usage
export_search_to_csv("dollar", "dollar_coins.csv")

Export to JSON

Export complete type data:

import json
from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService

def export_type_to_json(type_id: int, output_file: str):
    """Export complete type data to JSON."""
    settings = Settings()
    
    with NumistaApiClient(settings) as client:
        service = TypeService(client)
        coin_type = service.get_type(type_id)
        
        # Convert to dict (Pydantic model)
        data = coin_type.model_dump()
        
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"Exported type {type_id} to {output_file}")

# Usage
export_type_to_json(95420, "type_95420.json")

§ 6 Collection Management

Bulk Add to Collection

Add multiple items to a collection:

import asyncio
from numistalib.client import NumistaApiClient
from numistalib.services.collections.service import CollectionsService

async def bulk_add_to_collection(
    user_id: int,
    type_ids: list[int],
    collection_id: int
):
    """Add multiple types to a collection."""
    settings = Settings()
    
    async with NumistaApiClient(settings) as client:
        service = CollectionsService(client)
        
        for type_id in type_ids:
            await service.add_collected_item_async(
                user_id=user_id,
                type_id=type_id,
                collection_id=collection_id,
                quantity=1
            )
            print(f"Added type {type_id}")

# Usage
type_ids = [95420, 95421, 95422]
asyncio.run(bulk_add_to_collection(
    user_id=12345,
    type_ids=type_ids,
    collection_id=67890
))

Collection Statistics

Analyze your collection:

from collections import Counter
from numistalib.client import NumistaApiClient
from numistalib.services.collections.service import CollectionsService

def analyze_collection(user_id: int):
    """Analyze collection statistics."""
    settings = Settings()
    
    with NumistaApiClient(settings) as client:
        service = CollectionsService(client)
        items = service.get_collected_items(user_id)
        
        # Statistics
        total_items = len(items)
        total_quantity = sum(item.quantity for item in items)
        
        # By grade
        grades = Counter(item.grade for item in items if item.grade)
        
        # By category
        categories = Counter(item.category for item in items if item.category)
        
        print(f"Collection Statistics for User {user_id}")
        print(f"Total unique items: {total_items}")
        print(f"Total quantity: {total_quantity}")
        print(f"\nBy Grade:")
        for grade, count in grades.most_common():
            print(f"  {grade}: {count}")
        print(f"\nBy Category:")
        for category, count in categories.most_common():
            print(f"  {category}: {count}")

# Usage
analyze_collection(12345)

§ 7 Image Search Optimization


§ 8 Rate Limit Management

Dynamic Rate Limiting

Adjust rate limits based on response headers:

from numistalib.client import NumistaApiClient
from numistalib.config import Settings

class AdaptiveClient:
    """Client with adaptive rate limiting."""
    
    def __init__(self):
        self.settings = Settings()
        self.current_limit = self.settings.rate_limit
    
    def search_types(self, **kwargs):
        """Search with adaptive rate limiting."""
        with NumistaApiClient(self.settings) as client:
            service = TypeService(client)
            
            try:
                results = service.search_types(**kwargs)
                return results
            
            except Exception as e:
                if "429" in str(e):  # Rate limit error
                    # Reduce rate limit
                    self.current_limit = max(10, self.current_limit - 5)
                    self.settings = Settings(rate_limit=self.current_limit)
                    print(f"Reduced rate limit to {self.current_limit}")
                raise

# Usage
client = AdaptiveClient()
results = client.search_types(query="dollar")

§ 9 Testing Patterns

Mock Responses

Test code using numistalib:

from unittest.mock import Mock, patch
from numistalib.models.types import TypeBasic

def test_my_function():
    """Test function that uses numistalib."""
    
    # Create mock type
    mock_type = TypeBasic(
        numista_id=95420,
        title="Test Coin",
        issuer="france",
        min_year=2020,
        max_year=2020,
        category="coin"
    )
    
    # Mock service
    with patch('numistalib.services.types.service.TypeService') as MockService:
        mock_service = MockService.return_value
        mock_service.search_types.return_value = [mock_type]
        
        # Test your code
        results = mock_service.search_types(query="test")
        assert len(results) == 1
        assert results[0].title == "Test Coin"

§ 10 Performance Optimization

Connection Pooling

Reuse client connections:

from numistalib.client import NumistaApiClient
from numistalib.services.types.service import TypeService

class CoinDatabase:
    """Persistent client for batch operations."""
    
    def __init__(self):
        self.settings = Settings()
        self.client = None
    
    def __enter__(self):
        self.client = NumistaApiClient(self.settings)
        self.client.__enter__()
        return self
    
    def __exit__(self, *args):
        if self.client:
            self.client.__exit__(*args)
    
    def get_type(self, type_id: int):
        service = TypeService(self.client)
        return service.get_type(type_id)
    
    def search(self, **kwargs):
        service = TypeService(self.client)
        return service.search_types(**kwargs)

# Usage - single client for multiple operations
with CoinDatabase() as db:
    # Reuses connection
    results1 = db.search(query="dollar")
    results2 = db.search(query="euro")
    type1 = db.get_type(95420)
    type2 = db.get_type(95421)

Next Steps