Source code for stellium.analysis.batch

"""
Batch chart calculation for large-scale analysis.

BatchCalculator provides efficient calculation of many charts at once,
with support for progress tracking, filtering, and generator-based processing.
"""

from collections.abc import Callable, Generator, Iterable
from typing import Any

from stellium.core.builder import ChartBuilder
from stellium.core.models import CalculatedChart
from stellium.core.native import Native
from stellium.core.protocols import (
    AspectEngine,
    ChartAnalyzer,
    HouseSystemEngine,
    OrbEngine,
)
from stellium.data import get_notable_registry
from stellium.engines.aspects import ModernAspectEngine
from stellium.engines.houses import PlacidusHouses
from stellium.engines.orbs import SimpleOrbEngine


[docs] class BatchCalculator: """ Efficient batch calculation of multiple charts. Supports calculation from: - NotableRegistry (with optional filters) - List of Native objects - Any iterable of chart data Example:: # From NotableRegistry charts = (BatchCalculator .from_registry(category="scientist", verified=True) .with_aspects() .calculate_all()) # From list of Natives charts = BatchCalculator.from_natives(natives).calculate_all() # Generator for memory efficiency for chart in BatchCalculator.from_registry().calculate(): process(chart) """ def __init__(self, sources: Iterable[Native]) -> None: """ Initialize with an iterable of Native objects. Use factory methods like `from_registry()` or `from_natives()` instead. """ self._sources = sources # Default engines (same as ChartBuilder) self._house_engines: list[HouseSystemEngine] = [PlacidusHouses()] self._aspect_engine: AspectEngine | None = None self._orb_engine: OrbEngine = SimpleOrbEngine() self._analyzers: list[ChartAnalyzer] = [] # Progress callback self._progress_callback: Callable[[int, int, str], None] | None = None # Count for progress (if known) self._total_count: int | None = None # ---- Factory Methods ----
[docs] @classmethod def from_registry( cls, *, category: str | None = None, event_type: str | None = None, verified: bool | None = None, data_quality: str | None = None, **filters: Any, ) -> "BatchCalculator": """ Create BatchCalculator from NotableRegistry with optional filters. Args: category: Filter by category (e.g., "scientist", "artist") event_type: Filter by event type ("birth" or "event") verified: Filter by verified status data_quality: Filter by data quality ("AA", "A", "B", "C") **filters: Additional filters passed to registry.search() Returns: BatchCalculator ready to configure and run Example:: # All verified scientists batch = BatchCalculator.from_registry( category="scientist", verified=True ) # High-quality birth data only batch = BatchCalculator.from_registry( event_type="birth", data_quality="AA" ) """ registry = get_notable_registry() # Build filter dict filter_dict: dict[str, Any] = {} if category is not None: filter_dict["category"] = category if event_type is not None: filter_dict["event_type"] = event_type if verified is not None: filter_dict["verified"] = verified if data_quality is not None: filter_dict["data_quality"] = data_quality filter_dict.update(filters) # Get filtered notables if filter_dict: notables = registry.search(**filter_dict) else: notables = registry.get_all() batch = cls(notables) batch._total_count = len(notables) return batch
[docs] @classmethod def from_natives(cls, natives: list[Native]) -> "BatchCalculator": """ Create BatchCalculator from a list of Native objects. Args: natives: List of Native objects to calculate charts for Returns: BatchCalculator ready to configure and run Example:: natives = [ Native("2000-01-01 12:00", "New York, NY", name="Person 1"), Native("1990-06-15 08:30", "Los Angeles, CA", name="Person 2"), ] batch = BatchCalculator.from_natives(natives) """ batch = cls(natives) batch._total_count = len(natives) return batch
[docs] @classmethod def from_iterable(cls, sources: Iterable[Native]) -> "BatchCalculator": """ Create BatchCalculator from any iterable of Native objects. Use this for streaming data or custom data sources. Args: sources: Iterable yielding Native objects Returns: BatchCalculator ready to configure and run """ return cls(sources)
# ---- Configuration Methods (Fluent API) ----
[docs] def with_house_systems(self, engines: list[HouseSystemEngine]) -> "BatchCalculator": """Set the house systems to calculate.""" if not engines: raise ValueError("House engine list cannot be empty") self._house_engines = engines return self
[docs] def with_aspects(self, engine: AspectEngine | None = None) -> "BatchCalculator": """Enable aspect calculation with optional custom engine.""" self._aspect_engine = engine or ModernAspectEngine() return self
[docs] def with_orbs(self, engine: OrbEngine | None = None) -> "BatchCalculator": """Set the orb calculation engine.""" self._orb_engine = engine or SimpleOrbEngine() return self
[docs] def add_analyzer(self, analyzer: ChartAnalyzer) -> "BatchCalculator": """Add a chart analyzer (e.g., PatternAnalysisEngine).""" self._analyzers.append(analyzer) return self
[docs] def with_progress( self, callback: Callable[[int, int, str], None] ) -> "BatchCalculator": """ Set progress callback for tracking calculation progress. The callback receives: - current: Current chart number (1-based) - total: Total number of charts (or -1 if unknown) - name: Name of current chart being calculated Args: callback: Function to call with progress updates Example:: def show_progress(current, total, name): if total > 0: print(f"Calculating {current}/{total}: {name}") else: print(f"Calculating {current}: {name}") batch = BatchCalculator.from_registry().with_progress(show_progress) """ self._progress_callback = callback return self
# ---- Calculation Methods ----
[docs] def calculate(self) -> Generator[CalculatedChart, None, None]: """ Calculate charts as a generator (memory efficient). Yields charts one at a time, suitable for processing large datasets without loading all charts into memory. Yields: CalculatedChart for each source Example:: for chart in BatchCalculator.from_registry().calculate(): # Process one chart at a time print(chart.get_object("Sun").sign) """ total = self._total_count or -1 current = 0 for source in self._sources: current += 1 name = getattr(source, "name", None) or f"Chart {current}" # Report progress if self._progress_callback: self._progress_callback(current, total, name) # Build and calculate chart chart = self._build_chart(source) yield chart
[docs] def calculate_all(self) -> list[CalculatedChart]: """ Calculate all charts and return as a list. Loads all charts into memory. Use `calculate()` generator for large datasets that don't fit in memory. Returns: List of all calculated charts Example:: charts = BatchCalculator.from_registry(category="artist").calculate_all() print(f"Calculated {len(charts)} artist charts") """ return list(self.calculate())
def _build_chart(self, source: Native) -> CalculatedChart: """Build a single chart from a Native source.""" builder = ChartBuilder.from_native(source) # Configure house systems builder.with_house_systems(self._house_engines) # Configure aspects (if enabled) if self._aspect_engine: builder.with_aspects(self._aspect_engine) builder.with_orbs(self._orb_engine) # Add analyzers for analyzer in self._analyzers: builder.add_analyzer(analyzer) return builder.calculate()
[docs] def count(self) -> int: """ Get the count of sources (if known). Returns: Number of sources, or -1 if unknown (for streaming iterables) """ if self._total_count is not None: return self._total_count return -1
def __len__(self) -> int: """ Get the count of sources. Raises: TypeError: If count is unknown (streaming iterable) """ if self._total_count is not None: return self._total_count raise TypeError("Cannot get length of streaming BatchCalculator") def __repr__(self) -> str: count = self._total_count if count is not None: return f"<BatchCalculator: {count} sources>" return "<BatchCalculator: streaming>"