Source code for stellium.engines.aspects

"""
Aspect calculation engines.

These engines are responsible for finding angular relationships (aspects)
between celestial objects. They follow the `AspectEngine` protocol.
"""

from itertools import combinations

from stellium.core.config import AspectConfig
from stellium.core.models import Aspect, CelestialPosition, ObjectType
from stellium.core.protocols import OrbEngine
from stellium.core.registry import get_aspect_by_alias, get_aspect_info

# --- Helper Functions (Shared Logic) ---


def _are_axis_pair(obj1: CelestialPosition, obj2: CelestialPosition) -> bool:
    """
    Check if two objects are an axis pair that shouldn't aspect each other.

    Axis pairs:
    - ASC/DSC (Ascendant-Descendant axis)
    - MC/IC (Midheaven-Imum Coeli axis)
    - True Node/South Node (Nodal axis)

    These pairs are always in exact opposition by definition, so calculating
    aspects between them is redundant and clutters the aspect list.

    Returns:
        True if the pair is an axis pair that should be excluded
    """
    # Define axis pairs (order doesn't matter)
    axis_pairs = {
        frozenset(["ASC", "DSC"]),
        frozenset(["MC", "IC"]),
        frozenset(["True Node", "South Node"]),
    }

    # Check if this pair is an axis pair
    pair = frozenset([obj1.name, obj2.name])
    return pair in axis_pairs


def _angular_distance(long1: float, long2: float) -> float:
    """Calculate shortest angular distance between two longitudes."""
    diff = abs(long1 - long2)
    if diff > 180:
        diff = 360 - diff
    return diff


# Threshold for stationary detection (degrees/day).
# A planet moving slower than this is effectively stationary.
_STATIONARY_THRESHOLD = 0.005


def _is_applying(
    obj1: CelestialPosition,
    obj2: CelestialPosition,
    aspect_angle: float,
    current_distance: float,
) -> bool | None:
    """
    Determine if aspect is applying or separating using relative velocity.

    Uses the analytical approach: compares the relative angular velocity
    of the two bodies against the aspect axis. This correctly handles the
    0°/360° zodiac seam without numerical integration artifacts.

    Returns:
        True if applying, False if separating,
        None if speed is unavailable or either body is stationary.
    """
    # Speed genuinely absent
    if obj1.speed_longitude is None or obj2.speed_longitude is None:
        return None

    # Either body is stationary — applying/separating is undefined
    if abs(obj1.speed_longitude) < _STATIONARY_THRESHOLD:
        return None
    if abs(obj2.speed_longitude) < _STATIONARY_THRESHOLD:
        return None

    # Analytical approach: relative velocity tells us directly whether
    # the angular distance is increasing or decreasing, without needing
    # to compute future positions (which can fail at the 0°/360° seam).
    #
    # The signed angular distance from obj1 to obj2 (shortest arc):
    delta = (obj2.longitude - obj1.longitude + 180) % 360 - 180

    # Relative velocity: how fast this delta is changing
    relative_speed = obj2.speed_longitude - obj1.speed_longitude

    # If they're on the "positive side" of the aspect angle,
    # applying means the gap is shrinking toward aspect_angle.
    # If on the "negative side", applying means the gap is growing toward it.
    current_orb = abs(abs(delta) - aspect_angle)

    # A small perturbation using relative speed
    perturbed_delta = delta + relative_speed * (1.0 / (24.0 * 60.0))
    perturbed_orb = abs(abs(perturbed_delta) - aspect_angle)

    return perturbed_orb < current_orb


[docs] class ModernAspectEngine: """ Calculates standard aspects (conjunction, square, trine, etc.) based on a provided AspectConfig. """ def __init__(self, config: AspectConfig | None = None): """ Initialize the engine. Args: config: An AspectConfig object defining which aspect angles and object types to use. If None, a default AspectConfig is created. """ self._config = config or AspectConfig()
[docs] def calculate_aspects( self, positions: list[CelestialPosition], orb_engine: OrbEngine ) -> list[Aspect]: """ Calculate aspects based on the engine's config and the provided orb engine. Args: positions: The list of CelestialPosition objects to check. orb_engine: The OrbEngine that will provide the orb allowance for each potential aspect. Returns: A list of found Aspect objects. """ aspects = [] # 1. Filter the list of positions based on our config valid_types = {ObjectType.PLANET, ObjectType.NODE, ObjectType.POINT} if self._config.include_angles: valid_types.add(ObjectType.ANGLE) if self._config.include_asteroids: valid_types.add(ObjectType.ASTEROID) valid_objects = [p for p in positions if p.object_type in valid_types] # 2. Iterate over every unique pair of objects for obj1, obj2 in combinations(valid_objects, 2): # Skip axis pairs (ASC/DSC, MC/IC, True Node/South Node) if _are_axis_pair(obj1, obj2): continue # Skip aspects TO Dsc/IC (but allow aspects TO Asc/MC) if obj2.name in ["DSC", "IC"] or obj1.name in ["DSC", "IC"]: continue # Skip Asc-MC aspect (angle to angle) if {obj1.name, obj2.name} == {"ASC", "MC"}: continue distance = _angular_distance(obj1.longitude, obj2.longitude) # 3. Check against each aspect in our config for aspect_name in self._config.aspects: # Look up the aspect angle from the registry aspect_info = get_aspect_info(aspect_name) if not aspect_info: # Try as alias aspect_info = get_aspect_by_alias(aspect_name) if not aspect_info: # Skip unknown aspects continue aspect_angle = aspect_info.angle actual_orb = abs(distance - aspect_angle) # 4. Ask the OrbEngine for the allowance orb_allowance = orb_engine.get_orb_allowance(obj1, obj2, aspect_name) # 5. If it's a match, create the Aspect object if actual_orb <= orb_allowance: is_applying = _is_applying(obj1, obj2, aspect_angle, distance) aspect = Aspect( object1=obj1, object2=obj2, aspect_name=aspect_name, aspect_degree=aspect_angle, orb=actual_orb, is_applying=is_applying, ) aspects.append(aspect) # Only one aspect per pair break return aspects
[docs] class HarmonicAspectEngine: """ Calculates harmonic aspects (eg H5, H7, H9). This engine does *not* use AspectConfig, as it defines its own angles. It *does* use the OrbEngine, which can be configured to give different orbs for different harmonics. """ def __init__(self, harmonic: int) -> None: """ Initialize the harmonic engine. Args: harmonic: The harmonic number (eg. 7 for septiles) """ if harmonic <= 1: raise ValueError("Harmonic must be greater than 1.") self.harmonic = harmonic self.aspect_name = f"H{harmonic}" # Generate the aspect angles for this harmonic # e.g., H7 = [51.42, 102.85, 154.28] # We skip the 0/360 conjunction base_angle = 360.0 / harmonic self.aspect_angles = [(i * base_angle) for i in range(1, harmonic // 2 + 1)]
[docs] def calculate_aspects( self, positions: list[CelestialPosition], orb_engine: OrbEngine ) -> list[Aspect]: """ Calculate harmonic aspects for the configured harmonic number. Currently only calculates between ObjectType=Planet objects. Args: positions: The list of CelestialPositions objects to check. orb_engine: The OrbEngine that will provide the orb allowance. Returns: A list of found Aspect objects. """ aspects = [] # Harmonics are typically only calculated between planets valid_objects = [p for p in positions if p.object_type == ObjectType.PLANET] for obj1, obj2 in combinations(valid_objects, 2): distance = _angular_distance(obj1.longitude, obj2.longitude) # Check against each harmonic angle (e.g., 51.4, 102.8 for H7) for aspect_angle in self.aspect_angles: actual_orb = abs(distance - aspect_angle) # Ask the OrbEngine for allowance for "H7", etc. orb_allowance = orb_engine.get_orb_allowance( obj1, obj2, self.aspect_name ) if actual_orb <= orb_allowance: is_applying = _is_applying(obj1, obj2, aspect_angle, distance) aspect = Aspect( object1=obj1, object2=obj2, aspect_name=self.aspect_name, aspect_degree=round(aspect_angle), orb=actual_orb, is_applying=is_applying, ) aspects.append(aspect) # Only use one harmonic aspect per pair break return aspects
[docs] class CrossChartAspectEngine: """ Calculate aspects between two separate charts. Unlike ModernAspectEngine which finds all aspects within a single chart (using combinations of all positions), this engine specifically handles cross-chart scenarios where we want aspects BETWEEN chart1 objects and chart2 objects (but not within each chart). Use cases: - Synastry: Person A's planets aspecting Person B's planets - Transits: Current sky aspecting natal chart - Progressions: Progressed chart aspecting natal chart The key difference: controlled iteration. We only check pairs where one object is from chart1 and the other is from chart2. This prevents: - Object identity collision (same planet in both charts) - Redundant calculation (internal aspects already calculated separately) - Incorrect filtering (can't distinguish sources after merging lists) """ def __init__(self, config: AspectConfig | None = None): """ Initialize the cross-chart aspect engine. Args: config: An AspectConfig object defining which aspect angles and object types to use. If None, a default AspectConfig is created. """ self._config = config or AspectConfig()
[docs] def calculate_cross_aspects( self, chart1_positions: list[CelestialPosition], chart2_positions: list[CelestialPosition], orb_engine: OrbEngine, ) -> list[Aspect]: """ Calculate aspects between two sets of positions. This only calculates aspects WHERE one object is from chart1 and the other is from chart2. Internal aspects within each chart are NOT calculated by this method. Args: chart1_positions: Positions from first chart (e.g., natal/inner) chart2_positions: Positions from second chart (e.g., transit/outer) orb_engine: The OrbEngine that will provide orb allowances Returns: List of Aspect objects representing cross-chart aspects Example: >>> engine = CrossChartAspectEngine() >>> orb_engine = SimpleOrbEngine() >>> aspects = engine.calculate_cross_aspects( ... natal_chart.positions, ... transit_chart.positions, ... orb_engine ... ) >>> # Gets natal Sun trine transit Jupiter, etc. >>> # Does NOT get natal Sun trine natal Moon (internal) """ aspects = [] # 1. Filter positions based on config valid_types = {ObjectType.PLANET, ObjectType.NODE, ObjectType.POINT} if self._config.include_angles: valid_types.add(ObjectType.ANGLE) if self._config.include_asteroids: valid_types.add(ObjectType.ASTEROID) chart1_objects = [p for p in chart1_positions if p.object_type in valid_types] chart2_objects = [p for p in chart2_positions if p.object_type in valid_types] # 2. Controlled iteration: chart1 × chart2 only for obj1 in chart1_objects: for obj2 in chart2_objects: distance = _angular_distance(obj1.longitude, obj2.longitude) # 3. Check each aspect from config for aspect_name in self._config.aspects: # Look up aspect angle from registry aspect_info = get_aspect_info(aspect_name) if not aspect_info: # Try as alias aspect_info = get_aspect_by_alias(aspect_name) if not aspect_info: # Skip unknown aspects continue aspect_angle = aspect_info.angle actual_orb = abs(distance - aspect_angle) # 4. Ask OrbEngine for allowance orb_allowance = orb_engine.get_orb_allowance( obj1, obj2, aspect_name ) # 5. If close enough, create the aspect if actual_orb <= orb_allowance: is_applying = _is_applying(obj1, obj2, aspect_angle, distance) aspect = Aspect( object1=obj1, object2=obj2, aspect_name=aspect_name, aspect_degree=aspect_angle, orb=actual_orb, is_applying=is_applying, ) aspects.append(aspect) # Only one aspect per pair break return aspects
[docs] class DeclinationAspectEngine: """ Calculates declination aspects (Parallel and Contraparallel). Declination aspects are based on celestial equatorial coordinates: - Parallel: Two bodies at the SAME declination (both north or both south). Interpreted similarly to a conjunction - blending of energies. - Contraparallel: Two bodies at the SAME declination magnitude but OPPOSITE hemispheres. Interpreted similarly to an opposition - polarity. Unlike longitude-based aspects which use variable orbs by planet, declination aspects traditionally use a fixed tight orb (1.0-1.5 degrees). Example: >>> engine = DeclinationAspectEngine(orb=1.0) >>> aspects = engine.calculate_aspects(chart.positions) >>> for asp in aspects: ... print(asp) Sun Parallel Moon (orb: 0.45°) Mars Contraparallel Saturn (orb: 0.78°) """ def __init__( self, orb: float = 1.0, include_types: set[ObjectType] | None = None, ) -> None: """ Initialize the declination aspect engine. Args: orb: Maximum orb allowance in degrees (default 1.0°). Traditional range is 1.0-1.5°. Declination aspects use tighter orbs than longitude aspects. include_types: Which ObjectTypes to calculate aspects for. Default: PLANET, NODE. Can also include ANGLE, ASTEROID, POINT. """ self.orb = orb self.include_types = include_types or {ObjectType.PLANET, ObjectType.NODE}
[docs] def calculate_aspects( self, positions: list[CelestialPosition], orb_engine: OrbEngine | None = None, # Ignored but included for protocol compatibility ) -> list[Aspect]: """ Calculate parallel and contraparallel aspects. Args: positions: List of CelestialPosition objects to check. Only positions with non-None declination are used. orb_engine: Ignored. Declination aspects use fixed orb. Included for compatibility with AspectEngine protocol. Returns: List of Aspect objects for detected declination aspects. """ aspects = [] # Filter to valid objects with declination data valid_objects = [ p for p in positions if p.object_type in self.include_types and p.declination is not None ] for obj1, obj2 in combinations(valid_objects, 2): # Skip axis pairs if _are_axis_pair(obj1, obj2): continue dec1, dec2 = obj1.declination, obj2.declination # Determine if same hemisphere (both north or both south) same_hemisphere = (dec1 >= 0) == (dec2 >= 0) # Calculate orb as difference in declination magnitude orb = abs(abs(dec1) - abs(dec2)) if orb > self.orb: continue # Parallel = same hemisphere, same declination magnitude # Contraparallel = opposite hemispheres, same declination magnitude aspect_name = "Parallel" if same_hemisphere else "Contraparallel" aspect_degree = 0 if same_hemisphere else 180 aspects.append( Aspect( object1=obj1, object2=obj2, aspect_name=aspect_name, aspect_degree=aspect_degree, orb=orb, is_applying=None, # Would need declination velocity to determine ) ) return aspects