Source code for stellium.engines.search

"""
Longitude search engine for finding when celestial objects reach specific positions.

This module provides efficient search functions for finding exact times when
planets and other celestial objects cross specific zodiac degrees. Uses a hybrid
Newton-Raphson / bisection algorithm for fast, reliable convergence.

Key features:
- Fast convergence using planetary speed from Swiss Ephemeris
- Handles retrograde motion and stations gracefully
- Proper 360°/0° wraparound handling
- Forward and backward search directions
- Find single crossing or all crossings in a date range
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Literal

import swisseph as swe

from stellium.engines.ephemeris import SWISS_EPHEMERIS_IDS, _set_ephemeris_path

# Sign boundaries in longitude (0° of each sign)
SIGN_BOUNDARIES = {
    "Aries": 0.0,
    "Taurus": 30.0,
    "Gemini": 60.0,
    "Cancer": 90.0,
    "Leo": 120.0,
    "Virgo": 150.0,
    "Libra": 180.0,
    "Scorpio": 210.0,
    "Sagittarius": 240.0,
    "Capricorn": 270.0,
    "Aquarius": 300.0,
    "Pisces": 330.0,
}

SIGN_ORDER = list(SIGN_BOUNDARIES.keys())


[docs] @dataclass(frozen=True) class LongitudeCrossing: """Result of a longitude search. Attributes: julian_day: Julian day of the crossing datetime_utc: UTC datetime of the crossing longitude: Exact longitude at crossing (should be very close to target) speed: Longitude speed at crossing (degrees/day, negative = retrograde) is_retrograde: True if object was moving retrograde at crossing object_name: Name of the celestial object """ julian_day: float datetime_utc: datetime longitude: float speed: float is_retrograde: bool object_name: str @property def is_direct(self) -> bool: """True if object was moving direct (not retrograde) at crossing.""" return not self.is_retrograde
[docs] @dataclass(frozen=True) class SignIngress: """Result of a sign ingress search. Attributes: julian_day: Julian day of the ingress datetime_utc: UTC datetime of the ingress object_name: Name of the celestial object sign: Sign being entered from_sign: Sign being left longitude: Exact longitude at ingress (should be very close to 0° of sign) speed: Longitude speed at ingress (degrees/day, negative = retrograde) is_retrograde: True if object was moving retrograde at ingress """ julian_day: float datetime_utc: datetime object_name: str sign: str from_sign: str longitude: float speed: float is_retrograde: bool @property def is_direct(self) -> bool: """True if object was moving direct (not retrograde) at ingress.""" return not self.is_retrograde def __str__(self) -> str: direction = "Rx" if self.is_retrograde else "" return ( f"{self.object_name} {direction}enters {self.sign} " f"on {self.datetime_utc.strftime('%Y-%m-%d %H:%M')}" )
[docs] @dataclass(frozen=True) class Station: """Result of a planetary station search. A station occurs when a planet's apparent motion changes direction - either from direct to retrograde, or retrograde to direct. Attributes: julian_day: Julian day of the station datetime_utc: UTC datetime of the station object_name: Name of the celestial object station_type: "retrograde" (turning Rx) or "direct" (turning D) longitude: Longitude at the station (planet "hovers" here) sign: Zodiac sign of the station """ julian_day: float datetime_utc: datetime object_name: str station_type: Literal["retrograde", "direct"] longitude: float sign: str @property def is_turning_retrograde(self) -> bool: """True if planet is turning retrograde (was direct, now Rx).""" return self.station_type == "retrograde" @property def is_turning_direct(self) -> bool: """True if planet is turning direct (was Rx, now direct).""" return self.station_type == "direct" @property def degree_in_sign(self) -> float: """Degree within the sign (0-30).""" return self.longitude % 30 def __str__(self) -> str: degree = int(self.degree_in_sign) minute = int((self.degree_in_sign - degree) * 60) return ( f"{self.object_name} stations {self.station_type} " f"at {degree}°{minute:02d}' {self.sign} " f"on {self.datetime_utc.strftime('%Y-%m-%d %H:%M')}" )
# Aspect angle to name mapping ASPECT_ANGLES = { 0.0: "conjunction", 60.0: "sextile", 90.0: "square", 120.0: "trine", 180.0: "opposition", }
[docs] @dataclass(frozen=True) class AspectExact: """Result of an aspect exactitude search. Represents the exact moment when two celestial objects form a precise aspect (conjunction, sextile, square, trine, or opposition). Attributes: julian_day: Julian day when aspect is exact datetime_utc: UTC datetime when aspect is exact object1_name: First object name object2_name: Second object name aspect_angle: The aspect angle (0=conjunction, 60=sextile, etc.) aspect_name: Human name ("conjunction", "trine", etc.) object1_longitude: First object's longitude at exact object2_longitude: Second object's longitude at exact is_applying_before: True if aspect was applying before exact """ julian_day: float datetime_utc: datetime object1_name: str object2_name: str aspect_angle: float aspect_name: str object1_longitude: float object2_longitude: float is_applying_before: bool @property def separation(self) -> float: """Angular separation between the two objects (should be ~aspect_angle).""" diff = abs(self.object2_longitude - self.object1_longitude) if diff > 180: diff = 360 - diff return diff def __str__(self) -> str: return ( f"{self.object1_name} {self.aspect_name} {self.object2_name} exact " f"on {self.datetime_utc.strftime('%Y-%m-%d %H:%M')}" )
def _normalize_angle_error(angle: float) -> float: """Normalize angle difference to range [-180, +180]. This handles the 360°/0° wraparound properly. For example: - 359° to 1° is a difference of +2°, not -358° - 1° to 359° is a difference of -2°, not +358° Args: angle: Angle difference in degrees Returns: Normalized difference in range [-180, +180] """ return ((angle + 180) % 360) - 180 def _get_position_and_speed(object_id: int, julian_day: float) -> tuple[float, float]: """Get longitude and speed for an object at a specific time. Args: object_id: Swiss Ephemeris object ID julian_day: Julian day number Returns: Tuple of (longitude, speed_longitude) in degrees and degrees/day """ flags = swe.FLG_SWIEPH | swe.FLG_SPEED result = swe.calc_ut(julian_day, object_id, flags) return result[0][0], result[0][3] def _julian_day_to_datetime(jd: float) -> datetime: """Convert Julian day to UTC datetime. Args: jd: Julian day number Returns: UTC datetime """ year, month, day, hour = swe.revjul(jd) # hour is a float, convert to hours, minutes, seconds hours = int(hour) minutes = int((hour - hours) * 60) seconds = int(((hour - hours) * 60 - minutes) * 60) microseconds = int((((hour - hours) * 60 - minutes) * 60 - seconds) * 1_000_000) return datetime(year, month, day, hours, minutes, seconds, microseconds) def _datetime_to_julian_day(dt: datetime) -> float: """Convert datetime to Julian day. Args: dt: Datetime (assumed UTC) Returns: Julian day number """ hour = dt.hour + dt.minute / 60 + dt.second / 3600 + dt.microsecond / 3_600_000_000 return swe.julday(dt.year, dt.month, dt.day, hour) def _bracket_crossing( object_id: int, target_longitude: float, start_jd: float, direction: Literal["forward", "backward"] = "forward", max_days: float = 366.0, step_days: float = 1.0, ) -> tuple[float, float] | None: """Find an interval containing a longitude crossing. Sweeps through time looking for when the object crosses the target degree. Uses careful handling to avoid false positives at target+180° due to the angle normalization. Args: object_id: Swiss Ephemeris object ID target_longitude: Target longitude in degrees (0-360) start_jd: Julian day to start search from direction: "forward" (future) or "backward" (past) max_days: Maximum days to search step_days: Step size for initial sweep Returns: Tuple (jd1, jd2) bracketing the crossing, or None if not found """ step = step_days if direction == "forward" else -step_days end_jd = start_jd + (max_days if direction == "forward" else -max_days) current_jd = start_jd current_lon, _ = _get_position_and_speed(object_id, current_jd) current_error = _normalize_angle_error(current_lon - target_longitude) while (direction == "forward" and current_jd < end_jd) or ( direction == "backward" and current_jd > end_jd ): next_jd = current_jd + step next_lon, _ = _get_position_and_speed(object_id, next_jd) next_error = _normalize_angle_error(next_lon - target_longitude) # Check for sign change (potential crossing) if current_error * next_error < 0: # Verify this is a real crossing at target, not at target+180° # A real crossing will have errors that are both small (< 90°) # A false crossing at +180° will have errors near ±180° if abs(current_error) < 90 and abs(next_error) < 90: # Real crossing - return interval in chronological order if direction == "forward": return (current_jd, next_jd) else: return (next_jd, current_jd) # Otherwise it's a false positive from crossing target+180°, skip it # Also check if we're very close (within tolerance) if abs(next_error) < 0.001: # Return a small bracket around this point return (next_jd - 0.01, next_jd + 0.01) current_jd = next_jd current_error = next_error return None
[docs] def find_longitude_crossing( object_name: str, target_longitude: float, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 366.0, tolerance: float = 0.0001, max_iterations: int = 50, ) -> LongitudeCrossing | None: """Find when a celestial object crosses a specific longitude. Uses a hybrid Newton-Raphson / bisection algorithm: 1. First brackets the crossing with a coarse sweep 2. Then refines with Newton-Raphson (fast when speed is good) 3. Falls back to bisection near stations (speed ≈ 0) Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") target_longitude: Target longitude in degrees (0-360) start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 366 = just over a year) tolerance: Convergence tolerance in degrees (default 0.0001 ≈ 0.36 arcsec) max_iterations: Maximum refinement iterations Returns: LongitudeCrossing with exact time, or None if not found Example: >>> # When does the Sun reach 0° Aries (vernal equinox) after Jan 1, 2024? >>> result = find_longitude_crossing("Sun", 0.0, datetime(2024, 1, 1)) >>> print(result.datetime_utc) # ~March 20, 2024 """ # Ensure ephemeris path is set _set_ephemeris_path() # Get object ID if object_name not in SWISS_EPHEMERIS_IDS: raise ValueError(f"Unknown object: {object_name}") object_id = SWISS_EPHEMERIS_IDS[object_name] # Convert start to Julian day if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start # Normalize target longitude target_longitude = target_longitude % 360 # Phase 1: Bracket the crossing bracket = _bracket_crossing( object_id, target_longitude, start_jd, direction, max_days ) if bracket is None: return None t1, t2 = bracket # Phase 2: Refine with Newton-Raphson + bisection fallback t = (t1 + t2) / 2 for _ in range(max_iterations): lon, speed = _get_position_and_speed(object_id, t) error = _normalize_angle_error(lon - target_longitude) # Check convergence if abs(error) < tolerance: return LongitudeCrossing( julian_day=t, datetime_utc=_julian_day_to_datetime(t), longitude=lon, speed=speed, is_retrograde=speed < 0, object_name=object_name, ) # Try Newton-Raphson step if speed is reasonable if abs(speed) > 0.01: newton_step = -error / speed # Clamp step to avoid huge jumps newton_step = max(-15, min(15, newton_step)) t_new = t + newton_step # Keep within bracket t_new = max(t1, min(t2, t_new)) else: # Bisection fallback when near station t_new = (t1 + t2) / 2 # Update bracket based on error sign if error > 0: t2 = t else: t1 = t t = t_new # Failed to converge - return best estimate lon, speed = _get_position_and_speed(object_id, t) return LongitudeCrossing( julian_day=t, datetime_utc=_julian_day_to_datetime(t), longitude=lon, speed=speed, is_retrograde=speed < 0, object_name=object_name, )
[docs] def find_all_longitude_crossings( object_name: str, target_longitude: float, start: datetime | float, end: datetime | float, max_results: int = 100, ) -> list[LongitudeCrossing]: """Find all times a celestial object crosses a specific longitude in a date range. Useful for: - Finding all Moon transits over a degree (roughly monthly) - Finding multiple Mercury crossings during retrograde (up to 3) - Building transit timelines Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") target_longitude: Target longitude in degrees (0-360) start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results (default 100) Returns: List of LongitudeCrossing objects, chronologically ordered Example: >>> # Find all times Moon crosses 15° Taurus in 2024 >>> results = find_all_longitude_crossings( ... "Moon", 45.0, # 15° Taurus ... datetime(2024, 1, 1), ... datetime(2024, 12, 31) ... ) >>> print(f"Moon crosses 15° Taurus {len(results)} times in 2024") """ # Convert to Julian days if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: # Search forward from current position result = find_longitude_crossing( object_name, target_longitude, current_jd, direction="forward", max_days=end_jd - current_jd + 1, ) if result is None or result.julian_day > end_jd: break results.append(result) # Move past this crossing (small step to avoid finding same one) current_jd = result.julian_day + 0.1 return results
# ============================================================================= # Sign Ingress Search Functions # ============================================================================= def _get_sign_from_longitude(longitude: float) -> str: """Get the zodiac sign for a given longitude.""" sign_index = int(longitude // 30) % 12 return SIGN_ORDER[sign_index] def _get_previous_sign(sign: str) -> str: """Get the sign before the given sign.""" index = SIGN_ORDER.index(sign) return SIGN_ORDER[(index - 1) % 12]
[docs] def find_ingress( object_name: str, sign: str, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 730.0, ) -> SignIngress | None: """Find when a celestial object next enters a specific zodiac sign. Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") sign: Target zodiac sign (e.g., "Aries", "Taurus") start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 730 = ~2 years) Returns: SignIngress with exact time of ingress, or None if not found Example: >>> # When does Mars next enter Aries? >>> result = find_ingress("Mars", "Aries", datetime(2024, 1, 1)) >>> print(result) # Mars enters Aries on 2024-04-30 12:34 """ if sign not in SIGN_BOUNDARIES: raise ValueError( f"Unknown sign: {sign}. Must be one of {list(SIGN_BOUNDARIES.keys())}" ) target_longitude = SIGN_BOUNDARIES[sign] crossing = find_longitude_crossing( object_name, target_longitude, start, direction=direction, max_days=max_days, ) if crossing is None: return None from_sign = _get_previous_sign(sign) return SignIngress( julian_day=crossing.julian_day, datetime_utc=crossing.datetime_utc, object_name=object_name, sign=sign, from_sign=from_sign, longitude=crossing.longitude, speed=crossing.speed, is_retrograde=crossing.is_retrograde, )
[docs] def find_all_ingresses( object_name: str, sign: str, start: datetime | float, end: datetime | float, max_results: int = 50, ) -> list[SignIngress]: """Find all times a celestial object enters a specific sign in a date range. Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") sign: Target zodiac sign (e.g., "Aries", "Taurus") start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results (default 50) Returns: List of SignIngress objects, chronologically ordered Example: >>> # Find all Mars ingresses to Aries in the next 10 years >>> results = find_all_ingresses( ... "Mars", "Aries", ... datetime(2024, 1, 1), ... datetime(2034, 1, 1) ... ) >>> print(f"Mars enters Aries {len(results)} times") """ if sign not in SIGN_BOUNDARIES: raise ValueError( f"Unknown sign: {sign}. Must be one of {list(SIGN_BOUNDARIES.keys())}" ) target_longitude = SIGN_BOUNDARIES[sign] from_sign = _get_previous_sign(sign) crossings = find_all_longitude_crossings( object_name, target_longitude, start, end, max_results=max_results, ) return [ SignIngress( julian_day=c.julian_day, datetime_utc=c.datetime_utc, object_name=object_name, sign=sign, from_sign=from_sign, longitude=c.longitude, speed=c.speed, is_retrograde=c.is_retrograde, ) for c in crossings ]
[docs] def find_next_sign_change( object_name: str, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 60.0, ) -> SignIngress | None: """Find when a celestial object next changes signs (enters any sign). This is useful for questions like "when does this transit end?" where you don't care which sign is entered, just when the object leaves its current sign. Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 60) Returns: SignIngress with exact time of sign change, or None if not found Example: >>> # When does Mars leave its current sign? >>> result = find_next_sign_change("Mars", datetime(2024, 1, 15)) >>> print(f"Mars enters {result.sign} on {result.datetime_utc}") """ _set_ephemeris_path() if object_name not in SWISS_EPHEMERIS_IDS: raise ValueError(f"Unknown object: {object_name}") # Get current position to find current sign if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start object_id = SWISS_EPHEMERIS_IDS[object_name] current_lon, _ = _get_position_and_speed(object_id, start_jd) current_sign = _get_sign_from_longitude(current_lon) # Determine which sign boundary to search for if direction == "forward": # Next sign next_sign_index = (SIGN_ORDER.index(current_sign) + 1) % 12 target_sign = SIGN_ORDER[next_sign_index] else: # Previous sign boundary (entering current sign from before) target_sign = current_sign return find_ingress( object_name, target_sign, start, direction=direction, max_days=max_days, )
[docs] def find_all_sign_changes( object_name: str, start: datetime | float, end: datetime | float, max_results: int = 100, ) -> list[SignIngress]: """Find all sign changes for a celestial object in a date range. Args: object_name: Name of celestial object (e.g., "Sun", "Mars", "Moon") start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results (default 100) Returns: List of SignIngress objects, chronologically ordered Example: >>> # Find all Mercury sign changes in 2024 >>> results = find_all_sign_changes( ... "Mercury", ... datetime(2024, 1, 1), ... datetime(2024, 12, 31) ... ) >>> for r in results: ... print(f"{r.datetime_utc.date()}: Mercury enters {r.sign}") """ # Convert to Julian days if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: # Find next sign change ingress = find_next_sign_change( object_name, current_jd, direction="forward", max_days=end_jd - current_jd + 1, ) if ingress is None or ingress.julian_day > end_jd: break results.append(ingress) # Move past this ingress current_jd = ingress.julian_day + 0.1 return results
# ============================================================================= # Planetary Station Search Functions # ============================================================================= def _bracket_station( object_id: int, start_jd: float, direction: Literal["forward", "backward"] = "forward", max_days: float = 400.0, step_days: float = 1.0, ) -> tuple[float, float, str] | None: """Find an interval containing a planetary station. Sweeps through time looking for when the planet's speed changes sign. Args: object_id: Swiss Ephemeris object ID start_jd: Julian day to start search from direction: "forward" (future) or "backward" (past) max_days: Maximum days to search step_days: Step size for initial sweep Returns: Tuple (jd1, jd2, station_type) bracketing the station, or None if not found. station_type is "retrograde" if speed goes + to -, "direct" if - to + """ step = step_days if direction == "forward" else -step_days end_jd = start_jd + (max_days if direction == "forward" else -max_days) current_jd = start_jd _, current_speed = _get_position_and_speed(object_id, current_jd) while (direction == "forward" and current_jd < end_jd) or ( direction == "backward" and current_jd > end_jd ): next_jd = current_jd + step _, next_speed = _get_position_and_speed(object_id, next_jd) # Check for sign change in speed if current_speed * next_speed < 0: # Determine station type based on direction of speed change if current_speed > 0 and next_speed < 0: station_type = "retrograde" # Turning Rx else: station_type = "direct" # Turning D # Return interval in chronological order if direction == "forward": return (current_jd, next_jd, station_type) else: return (next_jd, current_jd, station_type) current_jd = next_jd current_speed = next_speed return None def _refine_station( object_id: int, jd1: float, jd2: float, tolerance: float = 0.0001, max_iterations: int = 50, ) -> tuple[float, float]: """Refine a station time using bisection. Args: object_id: Swiss Ephemeris object ID jd1: Lower bound of bracket jd2: Upper bound of bracket tolerance: Convergence tolerance in degrees/day max_iterations: Maximum iterations Returns: Tuple of (julian_day, longitude) at the station """ for _ in range(max_iterations): mid_jd = (jd1 + jd2) / 2 lon, speed = _get_position_and_speed(object_id, mid_jd) # Check convergence (speed very close to zero) if abs(speed) < tolerance: return mid_jd, lon # Get speeds at boundaries to determine which half contains the zero _, speed1 = _get_position_and_speed(object_id, jd1) # If speed1 and mid_speed have same sign, zero is in upper half if speed1 * speed > 0: jd1 = mid_jd else: jd2 = mid_jd # Return best estimate mid_jd = (jd1 + jd2) / 2 lon, _ = _get_position_and_speed(object_id, mid_jd) return mid_jd, lon
[docs] def find_station( object_name: str, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 400.0, ) -> Station | None: """Find the next planetary station (retrograde or direct). A station occurs when a planet appears to stop moving and reverse direction. This is an important timing point in astrology. Note: Sun and Moon do not have stations (they don't go retrograde). Args: object_name: Name of celestial object (e.g., "Mercury", "Mars") start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 400, > 1 year for outer planets) Returns: Station with exact time and details, or None if not found Example: >>> # When does Mercury next station? >>> result = find_station("Mercury", datetime(2024, 1, 1)) >>> print(result) # Mercury stations retrograde at 4°51' Capricorn on 2024-04-01 """ _set_ephemeris_path() if object_name not in SWISS_EPHEMERIS_IDS: raise ValueError(f"Unknown object: {object_name}") # Sun and Moon don't station if object_name in ("Sun", "Moon"): raise ValueError( f"{object_name} does not have stations (never goes retrograde)" ) object_id = SWISS_EPHEMERIS_IDS[object_name] # Convert start to Julian day if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start # Phase 1: Bracket the station bracket = _bracket_station(object_id, start_jd, direction, max_days) if bracket is None: return None jd1, jd2, station_type = bracket # Phase 2: Refine with bisection station_jd, longitude = _refine_station(object_id, jd1, jd2) sign = _get_sign_from_longitude(longitude) return Station( julian_day=station_jd, datetime_utc=_julian_day_to_datetime(station_jd), object_name=object_name, station_type=station_type, longitude=longitude, sign=sign, )
[docs] def find_all_stations( object_name: str, start: datetime | float, end: datetime | float, max_results: int = 50, ) -> list[Station]: """Find all planetary stations in a date range. Args: object_name: Name of celestial object (e.g., "Mercury", "Mars") start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results (default 50) Returns: List of Station objects, chronologically ordered Example: >>> # Find all Mercury stations in 2024 >>> results = find_all_stations( ... "Mercury", ... datetime(2024, 1, 1), ... datetime(2024, 12, 31) ... ) >>> for r in results: ... print(r) """ # Convert to Julian days if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: station = find_station( object_name, current_jd, direction="forward", max_days=end_jd - current_jd + 1, ) if station is None or station.julian_day > end_jd: break results.append(station) # Move past this station (use larger step since stations are months apart) current_jd = station.julian_day + 5.0 return results
# ============================================================================= # Aspect Exactitude Search Functions # ============================================================================= def _get_aspect_separation( obj1_id: int, obj2_id: int, julian_day: float ) -> tuple[float, float, float, float, float]: """Get angular separation and speeds for two objects. Args: obj1_id: Swiss Ephemeris ID for first object obj2_id: Swiss Ephemeris ID for second object julian_day: Julian day number Returns: Tuple of (separation, obj1_lon, obj2_lon, obj1_speed, obj2_speed) Separation is in range [0, 180] """ lon1, speed1 = _get_position_and_speed(obj1_id, julian_day) lon2, speed2 = _get_position_and_speed(obj2_id, julian_day) # Calculate separation (always positive, 0-180) diff = (lon2 - lon1) % 360 if diff > 180: diff = 360 - diff return diff, lon1, lon2, speed1, speed2 def _bracket_aspect( obj1_id: int, obj2_id: int, aspect_angle: float, start_jd: float, direction: Literal["forward", "backward"] = "forward", max_days: float = 366.0, step_days: float = 0.5, ) -> tuple[float, float] | None: """Find an interval containing an aspect exactitude. Sweeps through time looking for when the angular separation equals the aspect angle. Args: obj1_id: Swiss Ephemeris ID for first object obj2_id: Swiss Ephemeris ID for second object aspect_angle: Target aspect angle (0, 60, 90, 120, 180) start_jd: Julian day to start search from direction: "forward" (future) or "backward" (past) max_days: Maximum days to search step_days: Step size for initial sweep Returns: Tuple (jd1, jd2) bracketing the aspect exactitude, or None if not found """ step = step_days if direction == "forward" else -step_days end_jd = start_jd + (max_days if direction == "forward" else -max_days) current_jd = start_jd sep, _, _, _, _ = _get_aspect_separation(obj1_id, obj2_id, current_jd) # For aspects, we need to handle both directions of approach # e.g., for a trine (120°), separation could approach from below or above # Use signed error relative to target current_error = _normalize_angle_error(sep - aspect_angle) # Track previous values for local minimum detection (needed for conjunctions) prev_jd = None prev_error = None while (direction == "forward" and current_jd < end_jd) or ( direction == "backward" and current_jd > end_jd ): next_jd = current_jd + step sep, _, _, _, _ = _get_aspect_separation(obj1_id, obj2_id, next_jd) next_error = _normalize_angle_error(sep - aspect_angle) # Check for sign change (potential crossing) if current_error * next_error < 0: # Verify this is a real crossing, not a 180° wraparound artifact if abs(current_error) < 90 and abs(next_error) < 90: # Real crossing - return interval in chronological order if direction == "forward": return (current_jd, next_jd) else: return (next_jd, current_jd) # For conjunctions (aspect_angle ≈ 0), separation is always positive [0, 180] # so we need to detect local minima instead of sign changes # A local minimum occurs when: prev_error > current_error and current_error < next_error if aspect_angle < 1.0 and prev_error is not None: # Using abs() since for 0° aspect, error = separation which is non-negative if abs(prev_error) > abs(current_error) < abs(next_error): # Found local minimum at current_jd, bracket it if direction == "forward": return (prev_jd, next_jd) else: return (next_jd, prev_jd) # Also check if we're very close (within tolerance) if abs(next_error) < 0.01: return (next_jd - 0.1, next_jd + 0.1) prev_jd = current_jd prev_error = current_error current_jd = next_jd current_error = next_error return None
[docs] def find_aspect_exact( object1_name: str, object2_name: str, aspect_angle: float, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 366.0, tolerance: float = 0.0001, max_iterations: int = 50, ) -> AspectExact | None: """Find when two objects form an exact aspect. Uses a hybrid Newton-Raphson / bisection algorithm to find the precise moment when two celestial objects reach a specific angular separation. Args: object1_name: First object (e.g., "Moon", "Sun", "Mars") object2_name: Second object (e.g., "Jupiter", "Venus") aspect_angle: Target angle in degrees (0=conjunction, 60=sextile, 90=square, 120=trine, 180=opposition) start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 366 = just over a year) tolerance: Convergence tolerance in degrees (default 0.0001 ≈ 0.36 arcsec) max_iterations: Maximum refinement iterations Returns: AspectExact with exact timing, or None if not found Example: >>> # Find next exact Moon trine Jupiter >>> result = find_aspect_exact("Moon", "Jupiter", 120.0, datetime(2024, 1, 1)) >>> print(f"Exact trine at {result.datetime_utc}") >>> # Find next Mercury-Venus conjunction >>> result = find_aspect_exact("Mercury", "Venus", 0.0, datetime(2024, 1, 1)) >>> print(result) """ _set_ephemeris_path() # Get object IDs if object1_name not in SWISS_EPHEMERIS_IDS: raise ValueError(f"Unknown object: {object1_name}") if object2_name not in SWISS_EPHEMERIS_IDS: raise ValueError(f"Unknown object: {object2_name}") obj1_id = SWISS_EPHEMERIS_IDS[object1_name] obj2_id = SWISS_EPHEMERIS_IDS[object2_name] # Convert start to Julian day if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start # Normalize aspect angle aspect_angle = aspect_angle % 180 # Aspects are symmetric # Get aspect name aspect_name = ASPECT_ANGLES.get(aspect_angle, f"{aspect_angle}°") # Phase 1: Bracket the aspect exactitude bracket = _bracket_aspect( obj1_id, obj2_id, aspect_angle, start_jd, direction, max_days ) if bracket is None: return None t1, t2 = bracket # Check if aspect was applying before exact (for the result) sep_before, _, _, speed1_before, speed2_before = _get_aspect_separation( obj1_id, obj2_id, t1 ) relative_speed_before = speed2_before - speed1_before # If obj2 is moving faster, separation is increasing (separating) if positive # For applying, we want separation decreasing toward exact error_before = _normalize_angle_error(sep_before - aspect_angle) is_applying_before = (error_before < 0 and relative_speed_before > 0) or ( error_before > 0 and relative_speed_before < 0 ) # Phase 2: Refine with Newton-Raphson + bisection/golden section fallback # For conjunctions (aspect_angle ≈ 0), we search for minimum separation # For other aspects, we search for zero crossing use_minimum_search = aspect_angle < 1.0 t = (t1 + t2) / 2 for _ in range(max_iterations): sep, lon1, lon2, speed1, speed2 = _get_aspect_separation(obj1_id, obj2_id, t) error = _normalize_angle_error(sep - aspect_angle) # Check convergence if abs(error) < tolerance: return AspectExact( julian_day=t, datetime_utc=_julian_day_to_datetime(t), object1_name=object1_name, object2_name=object2_name, aspect_angle=aspect_angle, aspect_name=aspect_name, object1_longitude=lon1, object2_longitude=lon2, is_applying_before=is_applying_before, ) # Calculate relative speed for Newton-Raphson # The derivative of separation w.r.t. time is approximately (speed2 - speed1) # but we need to account for the sign of the error relative_speed = speed2 - speed1 # Adjust sign based on how separation relates to error # When separation > aspect_angle, error > 0 # We want to move in direction that decreases error if abs(relative_speed) > 0.01: newton_step = -error / relative_speed # Clamp step to avoid huge jumps newton_step = max(-10, min(10, newton_step)) t_new = t + newton_step # Keep within bracket t_new = max(t1, min(t2, t_new)) else: # Bisection/golden section fallback when relative speed is very small t_new = (t1 + t2) / 2 # Update bracket if use_minimum_search: # For conjunctions, use golden section search for minimum # Compare errors at bracket endpoints and midpoint to narrow sep1, _, _, _, _ = _get_aspect_separation(obj1_id, obj2_id, t1) sep2, _, _, _, _ = _get_aspect_separation(obj1_id, obj2_id, t2) error1 = abs(sep1 - aspect_angle) error2 = abs(sep2 - aspect_angle) # Shrink bracket toward the side with smaller error if error1 < error2: t2 = t # Keep t1, shrink from right else: t1 = t # Keep t2, shrink from left else: # For other aspects, use bisection based on error sign if error > 0: t2 = t else: t1 = t t = t_new # Failed to converge - return best estimate sep, lon1, lon2, _, _ = _get_aspect_separation(obj1_id, obj2_id, t) return AspectExact( julian_day=t, datetime_utc=_julian_day_to_datetime(t), object1_name=object1_name, object2_name=object2_name, aspect_angle=aspect_angle, aspect_name=aspect_name, object1_longitude=lon1, object2_longitude=lon2, is_applying_before=is_applying_before, )
[docs] def find_all_aspect_exacts( object1_name: str, object2_name: str, aspect_angle: float, start: datetime | float, end: datetime | float, max_results: int = 100, ) -> list[AspectExact]: """Find all exact aspects between two objects in a date range. Useful for: - Finding all Moon-Jupiter trines in a year - Tracking Mercury-Venus aspects for relationship timing - Building aspect timelines Args: object1_name: First object (e.g., "Moon", "Sun", "Mars") object2_name: Second object (e.g., "Jupiter", "Venus") aspect_angle: Target angle in degrees (0, 60, 90, 120, 180) start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results (default 100) Returns: List of AspectExact objects, chronologically ordered Example: >>> # Find all Moon trine Jupiter in 2024 >>> results = find_all_aspect_exacts( ... "Moon", "Jupiter", 120.0, ... datetime(2024, 1, 1), ... datetime(2024, 12, 31) ... ) >>> print(f"Found {len(results)} exact trines") >>> for r in results[:5]: ... print(r) """ # Convert to Julian days if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: # Search forward from current position result = find_aspect_exact( object1_name, object2_name, aspect_angle, current_jd, direction="forward", max_days=end_jd - current_jd + 1, ) if result is None or result.julian_day > end_jd: break results.append(result) # Move past this aspect (small step to avoid finding same one) # For Moon aspects, they recur ~monthly, so 1 day is safe current_jd = result.julian_day + 1.0 return results
# ============================================================================= # Angle Crossing Search Functions # ============================================================================= # Angle name to index mapping for ascmc array from swe.houses_ex() ANGLE_INDICES = { "ASC": 0, "Ascendant": 0, "MC": 1, "Midheaven": 1, "DSC": 0, # DSC = ASC + 180° "Descendant": 0, "IC": 1, # IC = MC + 180° "Imum Coeli": 1, } # Angles that need 180° offset OPPOSITE_ANGLES = {"DSC", "Descendant", "IC", "Imum Coeli"} def _get_angle_longitude( julian_day: float, latitude: float, longitude: float, angle: str ) -> float: """Get the longitude of a chart angle at a given time. Args: julian_day: Julian day number latitude: Geographic latitude longitude: Geographic longitude (negative = West) angle: Angle name ("ASC", "MC", "DSC", "IC", or full names) Returns: Longitude of the angle in degrees (0-360) """ from stellium.data.paths import initialize_ephemeris initialize_ephemeris() _, ascmc = swe.houses_ex(julian_day, latitude, longitude, hsys=b"P") angle_upper = angle.upper() if angle in ("ASC", "MC", "DSC", "IC") else angle if angle_upper not in ANGLE_INDICES and angle not in ANGLE_INDICES: raise ValueError( f"Unknown angle: {angle}. Must be one of ASC, MC, DSC, IC " "(or Ascendant, Midheaven, Descendant, Imum Coeli)" ) idx = ANGLE_INDICES.get(angle_upper, ANGLE_INDICES.get(angle)) angle_lon = ascmc[idx] # Apply 180° offset for opposite angles if angle_upper in OPPOSITE_ANGLES or angle in OPPOSITE_ANGLES: angle_lon = (angle_lon + 180) % 360 return angle_lon
[docs] @dataclass(frozen=True) class AngleCrossing: """Result of an angle crossing search. Represents the moment when a chart angle (ASC, MC, DSC, IC) reaches a specific zodiac longitude. Attributes: julian_day: Julian day when angle reaches the longitude datetime_utc: UTC datetime of the crossing angle_name: Which angle ("ASC", "MC", "DSC", "IC") target_longitude: The target longitude that was crossed actual_longitude: The actual angle longitude at crossing latitude: Geographic latitude used longitude: Geographic longitude used """ julian_day: float datetime_utc: datetime angle_name: str target_longitude: float actual_longitude: float latitude: float longitude: float def __str__(self) -> str: sign_idx = int(self.target_longitude // 30) signs = [ "Aries", "Taurus", "Gemini", "Cancer", "Leo", "Virgo", "Libra", "Scorpio", "Sagittarius", "Capricorn", "Aquarius", "Pisces", ] degree = self.target_longitude % 30 return ( f"{self.angle_name} at {degree:.1f}° {signs[sign_idx]} " f"on {self.datetime_utc.strftime('%Y-%m-%d %H:%M')}" )
[docs] def find_angle_crossing( target_longitude: float, latitude: float, longitude: float, angle: str, start: datetime | float, direction: Literal["forward", "backward"] = "forward", max_days: float = 2.0, tolerance: float = 0.001, max_iterations: int = 50, ) -> AngleCrossing | None: """Find when a chart angle crosses a specific longitude. Since angles rotate with Earth's rotation (~1° every 4 minutes), any given longitude will be crossed roughly once per sidereal day (~23h 56m) by each angle. Args: target_longitude: Target longitude in degrees (0-360) latitude: Geographic latitude longitude: Geographic longitude (negative = West) angle: Which angle to track ("ASC", "MC", "DSC", "IC") start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past max_days: Maximum days to search (default 2, since crossings are daily) tolerance: Convergence tolerance in degrees max_iterations: Maximum refinement iterations Returns: AngleCrossing with exact timing, or None if not found Example: >>> # Find when ASC reaches 0° Leo in NYC >>> result = find_angle_crossing(120.0, 40.7, -74.0, "ASC", datetime.now()) >>> print(f"ASC at 0° Leo: {result.datetime_utc}") """ from stellium.data.paths import initialize_ephemeris initialize_ephemeris() # Convert start to Julian day if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start # Normalize target longitude target_longitude = target_longitude % 360 # Angles move ~360° per day, so we need small steps for bracketing step_hours = 0.5 # 30 minutes step_jd = step_hours / 24 step = step_jd if direction == "forward" else -step_jd end_jd = start_jd + (max_days if direction == "forward" else -max_days) current_jd = start_jd current_angle = _get_angle_longitude(current_jd, latitude, longitude, angle) current_error = _normalize_angle_error(current_angle - target_longitude) # Bracket the crossing bracket_start = None bracket_end = None while (direction == "forward" and current_jd < end_jd) or ( direction == "backward" and current_jd > end_jd ): next_jd = current_jd + step next_angle = _get_angle_longitude(next_jd, latitude, longitude, angle) next_error = _normalize_angle_error(next_angle - target_longitude) # Check for sign change (crossing) if current_error * next_error < 0: # Verify it's a real crossing (not a 180° wraparound artifact) if abs(current_error) < 90 and abs(next_error) < 90: bracket_start = current_jd bracket_end = next_jd break # Also check if we're very close if abs(next_error) < tolerance: return AngleCrossing( julian_day=next_jd, datetime_utc=_julian_day_to_datetime(next_jd), angle_name=angle.upper() if angle.upper() in ANGLE_INDICES else angle, target_longitude=target_longitude, actual_longitude=next_angle, latitude=latitude, longitude=longitude, ) current_jd = next_jd current_error = next_error if bracket_start is None: return None # Refine with bisection (Newton-Raphson is tricky here due to discontinuities) t1, t2 = bracket_start, bracket_end for _ in range(max_iterations): mid_jd = (t1 + t2) / 2 mid_angle = _get_angle_longitude(mid_jd, latitude, longitude, angle) mid_error = _normalize_angle_error(mid_angle - target_longitude) if abs(mid_error) < tolerance: return AngleCrossing( julian_day=mid_jd, datetime_utc=_julian_day_to_datetime(mid_jd), angle_name=angle.upper() if angle.upper() in ANGLE_INDICES else angle, target_longitude=target_longitude, actual_longitude=mid_angle, latitude=latitude, longitude=longitude, ) # Determine which half contains the crossing t1_angle = _get_angle_longitude(t1, latitude, longitude, angle) t1_error = _normalize_angle_error(t1_angle - target_longitude) if t1_error * mid_error < 0: t2 = mid_jd else: t1 = mid_jd # Return best estimate final_jd = (t1 + t2) / 2 final_angle = _get_angle_longitude(final_jd, latitude, longitude, angle) return AngleCrossing( julian_day=final_jd, datetime_utc=_julian_day_to_datetime(final_jd), angle_name=angle.upper() if angle.upper() in ANGLE_INDICES else angle, target_longitude=target_longitude, actual_longitude=final_angle, latitude=latitude, longitude=longitude, )
[docs] def find_all_angle_crossings( target_longitude: float, latitude: float, longitude: float, angle: str, start: datetime | float, end: datetime | float, max_results: int = 100, ) -> list[AngleCrossing]: """Find all times a chart angle crosses a specific longitude in a date range. Args: target_longitude: Target longitude in degrees (0-360) latitude: Geographic latitude longitude: Geographic longitude (negative = West) angle: Which angle to track ("ASC", "MC", "DSC", "IC") start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day max_results: Safety limit on number of results Returns: List of AngleCrossing objects, chronologically ordered Example: >>> # Find all times ASC crosses 0° Aries in January 2025 >>> results = find_all_angle_crossings( ... 0.0, 40.7, -74.0, "ASC", ... datetime(2025, 1, 1), datetime(2025, 2, 1) ... ) >>> print(f"Found {len(results)} crossings") # ~31 (once per day) """ # Convert to Julian days if needed if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: result = find_angle_crossing( target_longitude, latitude, longitude, angle, current_jd, direction="forward", max_days=end_jd - current_jd + 1, ) if result is None or result.julian_day > end_jd: break results.append(result) # Move past this crossing # Angles cross a given longitude roughly once per sidereal day (~23h 56m) # Use 20 hours to be safe current_jd = result.julian_day + 20 / 24 return results
# ============================================================================= # Eclipse Search Functions # ============================================================================= # Maximum orb from node for an eclipse to occur # Solar eclipses can occur up to ~18° from node, lunar up to ~12° SOLAR_ECLIPSE_MAX_ORB = 18.5 LUNAR_ECLIPSE_MAX_ORB = 12.5 # Approximate orb thresholds for eclipse types (rough guidelines) # These vary based on lunar distance, parallax, etc. - simplified here TOTAL_SOLAR_ORB = 10.0 # Likely total/annular if within this PARTIAL_SOLAR_ORB = 18.5 # Partial possible up to this TOTAL_LUNAR_ORB = 6.0 # Likely total lunar if within this PARTIAL_LUNAR_ORB = 12.5 # Partial lunar up to this
[docs] @dataclass(frozen=True) class Eclipse: """Result of an eclipse search. Attributes: julian_day: Julian day of the eclipse (exact Sun-Moon conjunction/opposition) datetime_utc: UTC datetime of the eclipse eclipse_type: "solar" or "lunar" sun_longitude: Sun's longitude at eclipse moon_longitude: Moon's longitude at eclipse node_longitude: True Node longitude at eclipse orb_to_node: Distance from Sun/Moon to nearest node (degrees) nearest_node: Which node is involved ("north" or "south") sign: Zodiac sign of the eclipse classification: "total", "annular", "partial", or "penumbral" """ julian_day: float datetime_utc: datetime eclipse_type: Literal["solar", "lunar"] sun_longitude: float moon_longitude: float node_longitude: float orb_to_node: float nearest_node: Literal["north", "south"] sign: str classification: str @property def is_solar(self) -> bool: """True if this is a solar eclipse.""" return self.eclipse_type == "solar" @property def is_lunar(self) -> bool: """True if this is a lunar eclipse.""" return self.eclipse_type == "lunar" @property def degree_in_sign(self) -> float: """Degree within the sign (0-30).""" return self.sun_longitude % 30 def __str__(self) -> str: degree = int(self.degree_in_sign) minute = int((self.degree_in_sign - degree) * 60) node_abbrev = "NN" if self.nearest_node == "north" else "SN" return ( f"{self.classification.capitalize()} {self.eclipse_type} eclipse " f"at {degree}°{minute:02d}' {self.sign} ({node_abbrev}) " f"on {self.datetime_utc.strftime('%Y-%m-%d %H:%M')}" )
def _get_sun_moon_separation(julian_day: float) -> tuple[float, float, float, float]: """Get Sun-Moon separation and positions at a given time. Args: julian_day: Julian day number Returns: Tuple of (separation, sun_longitude, moon_longitude, moon_speed) Separation is normalized to [-180, +180] """ sun_lon, _ = _get_position_and_speed(swe.SUN, julian_day) moon_lon, moon_speed = _get_position_and_speed(swe.MOON, julian_day) separation = _normalize_angle_error(moon_lon - sun_lon) return separation, sun_lon, moon_lon, moon_speed def _find_lunation( start_jd: float, lunation_type: Literal["new", "full"], max_days: float = 32.0, ) -> tuple[float, float, float] | None: """Find the next New Moon or Full Moon. Uses Newton-Raphson on Sun-Moon separation. Args: start_jd: Julian day to start search from lunation_type: "new" (conjunction) or "full" (opposition) max_days: Maximum days to search Returns: Tuple of (julian_day, sun_longitude, moon_longitude) or None """ # Start with a sweep to bracket the lunation current_jd = start_jd end_jd = start_jd + max_days step = 1.0 # Day step for initial sweep sep, _, _, _ = _get_sun_moon_separation(current_jd) # Adjust for target if lunation_type == "full": current_error = _normalize_angle_error(sep - 180.0) else: current_error = sep # Already normalized around 0 # Find bracket bracket_start = None bracket_end = None while current_jd < end_jd: next_jd = current_jd + step sep, _, _, _ = _get_sun_moon_separation(next_jd) if lunation_type == "full": next_error = _normalize_angle_error(sep - 180.0) else: next_error = sep # Check for sign change AND that both errors are reasonably small # This avoids false positives from wraparound (e.g., +174° to -171°) # A real zero crossing will have |error| < 90° on both sides if current_error * next_error < 0: if abs(current_error) < 90 and abs(next_error) < 90: bracket_start = current_jd bracket_end = next_jd break current_jd = next_jd current_error = next_error if bracket_start is None: return None # Refine with Newton-Raphson / bisection t = (bracket_start + bracket_end) / 2 for _ in range(50): sep, sun_lon, moon_lon, moon_speed = _get_sun_moon_separation(t) if lunation_type == "full": error = _normalize_angle_error(sep - 180.0) else: error = sep if abs(error) < 0.0001: # ~0.36 arcseconds return t, sun_lon, moon_lon # Moon moves ~13°/day relative to Sun (~12°/day faster than Sun) # Use this as derivative estimate relative_speed = moon_speed - 1.0 # Sun moves ~1°/day if abs(relative_speed) > 0.1: newton_step = -error / relative_speed newton_step = max(-2, min(2, newton_step)) # Clamp t_new = t + newton_step t_new = max(bracket_start, min(bracket_end, t_new)) else: t_new = (bracket_start + bracket_end) / 2 # Update bracket if error > 0: bracket_end = t else: bracket_start = t t = t_new # Return best estimate sep, sun_lon, moon_lon, _ = _get_sun_moon_separation(t) return t, sun_lon, moon_lon def _classify_eclipse( eclipse_type: Literal["solar", "lunar"], orb_to_node: float, ) -> str: """Classify an eclipse based on type and orb to node. Args: eclipse_type: "solar" or "lunar" orb_to_node: Distance from eclipse to nearest node Returns: Classification: "total", "annular", "partial", or "penumbral" """ if eclipse_type == "solar": if orb_to_node <= TOTAL_SOLAR_ORB: # Could be total or annular depending on Moon's distance # Simplified: just call it "total" for tight orbs return "total" else: return "partial" else: # lunar if orb_to_node <= TOTAL_LUNAR_ORB: return "total" elif orb_to_node <= PARTIAL_LUNAR_ORB * 0.7: return "partial" else: return "penumbral"
[docs] def find_eclipse( start: datetime | float, direction: Literal["forward", "backward"] = "forward", eclipse_types: Literal["both", "solar", "lunar"] = "both", max_days: float = 200.0, ) -> Eclipse | None: """Find the next eclipse from a starting date. Args: start: Starting datetime (UTC) or Julian day direction: "forward" to search future, "backward" to search past eclipse_types: Which types to find ("both", "solar", "lunar") max_days: Maximum days to search (default 200 = ~6 months) Returns: Eclipse with details, or None if not found Example: >>> # Find next eclipse from now >>> eclipse = find_eclipse(datetime(2024, 1, 1)) >>> print(eclipse) Partial lunar eclipse at 5°07' Leo (NN) on 2024-03-25 07:00 """ _set_ephemeris_path() if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if direction == "backward": # For backward search, we'd need to modify the logic # For now, only support forward raise NotImplementedError("Backward eclipse search not yet implemented") current_jd = start_jd end_jd = start_jd + max_days while current_jd < end_jd: # Find BOTH next New Moon and next Full Moon # Then check which (if any) is an eclipse, returning the earlier one solar_eclipse = None lunar_eclipse = None new_moon_jd = None full_moon_jd = None # Find next New Moon (potential solar eclipse) if eclipse_types in ("both", "solar"): new_moon = _find_lunation(current_jd, "new", max_days=35) if new_moon and new_moon[0] <= end_jd: jd, sun_lon, moon_lon = new_moon new_moon_jd = jd # Get node position node_lon, _ = _get_position_and_speed( SWISS_EPHEMERIS_IDS["True Node"], jd ) south_node_lon = (node_lon + 180) % 360 # Check distance to nodes dist_to_nn = abs(_normalize_angle_error(sun_lon - node_lon)) dist_to_sn = abs(_normalize_angle_error(sun_lon - south_node_lon)) orb = min(dist_to_nn, dist_to_sn) nearest = "north" if dist_to_nn < dist_to_sn else "south" if orb <= SOLAR_ECLIPSE_MAX_ORB: solar_eclipse = Eclipse( julian_day=jd, datetime_utc=_julian_day_to_datetime(jd), eclipse_type="solar", sun_longitude=sun_lon, moon_longitude=moon_lon, node_longitude=node_lon, orb_to_node=orb, nearest_node=nearest, sign=_get_sign_from_longitude(sun_lon), classification=_classify_eclipse("solar", orb), ) # Find next Full Moon (potential lunar eclipse) if eclipse_types in ("both", "lunar"): full_moon = _find_lunation(current_jd, "full", max_days=35) if full_moon and full_moon[0] <= end_jd: jd, sun_lon, moon_lon = full_moon full_moon_jd = jd # Get node position node_lon, _ = _get_position_and_speed( SWISS_EPHEMERIS_IDS["True Node"], jd ) south_node_lon = (node_lon + 180) % 360 # For lunar eclipse, check Moon's distance to nodes dist_to_nn = abs(_normalize_angle_error(moon_lon - node_lon)) dist_to_sn = abs(_normalize_angle_error(moon_lon - south_node_lon)) orb = min(dist_to_nn, dist_to_sn) nearest = "north" if dist_to_nn < dist_to_sn else "south" if orb <= LUNAR_ECLIPSE_MAX_ORB: lunar_eclipse = Eclipse( julian_day=jd, datetime_utc=_julian_day_to_datetime(jd), eclipse_type="lunar", sun_longitude=sun_lon, moon_longitude=moon_lon, node_longitude=node_lon, orb_to_node=orb, nearest_node=nearest, sign=_get_sign_from_longitude(moon_lon), classification=_classify_eclipse("lunar", orb), ) # Return the earlier eclipse if we found any if solar_eclipse and lunar_eclipse: # Return whichever comes first if solar_eclipse.julian_day <= lunar_eclipse.julian_day: return solar_eclipse else: return lunar_eclipse elif solar_eclipse: return solar_eclipse elif lunar_eclipse: return lunar_eclipse # No eclipse found, move past whichever lunation we found if new_moon_jd and full_moon_jd: current_jd = min(new_moon_jd, full_moon_jd) + 1.0 elif new_moon_jd: current_jd = new_moon_jd + 1.0 elif full_moon_jd: current_jd = full_moon_jd + 1.0 else: current_jd += 15.0 # Half a lunar cycle return None
[docs] def find_all_eclipses( start: datetime | float, end: datetime | float, eclipse_types: Literal["both", "solar", "lunar"] = "both", max_results: int = 20, ) -> list[Eclipse]: """Find all eclipses in a date range. Args: start: Start datetime (UTC) or Julian day end: End datetime (UTC) or Julian day eclipse_types: Which types to find ("both", "solar", "lunar") max_results: Safety limit on number of results (default 20) Returns: List of Eclipse objects, chronologically ordered Example: >>> # Find all eclipses in 2024 >>> eclipses = find_all_eclipses( ... datetime(2024, 1, 1), ... datetime(2024, 12, 31) ... ) >>> for e in eclipses: ... print(e) """ if isinstance(start, datetime): start_jd = _datetime_to_julian_day(start) else: start_jd = start if isinstance(end, datetime): end_jd = _datetime_to_julian_day(end) else: end_jd = end results = [] current_jd = start_jd while current_jd < end_jd and len(results) < max_results: eclipse = find_eclipse( current_jd, direction="forward", eclipse_types=eclipse_types, max_days=end_jd - current_jd + 1, ) if eclipse is None or eclipse.julian_day > end_jd: break results.append(eclipse) # Move past this eclipse - but not too far! # Eclipse seasons have ~2 eclipses ~2 weeks apart # Step only 12 days to catch both solar and lunar in same season current_jd = eclipse.julian_day + 12.0 return results