Source code for volkit.estimate.future_from_option_prices

# file: volkit/implied/future_from_prices.py
from __future__ import annotations

from typing import Optional, Tuple

import numpy as np

__all__ = [
    "ImpliedFutureResult",
    "estimate_future_from_option_prices",
    "_constrained_ols_line",
    "_mad",
    "_mad_threshold",
    "_feasible_D_band_from_pairwise_slopes",
    "_future_band_from_D_band",
    "_count_distinct",
    "_canon_D_band",
]

from .future_res import ImpliedFutureResult
from .future_from_option_prices_plot import estimate_future_from_option_prices_plot

# --------------------------- helpers (private) ---------------------------------


def _as_np1d(x, name: str) -> np.ndarray:
    arr = np.asarray(x, dtype=float)
    if arr.ndim != 1:
        raise ValueError(f"{name} must be 1-D, got shape={arr.shape}")
    return arr


def _finite_mask(*arrays: np.ndarray) -> np.ndarray:
    m = np.ones_like(arrays[0], dtype=bool)
    for a in arrays:
        m &= np.isfinite(a)
    return m


def _count_distinct(x: np.ndarray, eps: float) -> int:
    if x.size == 0:
        return 0
    xs = np.sort(x)
    jumps = np.diff(xs) > eps
    return int(1 + jumps.sum())


def _constrained_ols_line(
    K: np.ndarray, y: np.ndarray, *, eps: float
) -> Tuple[float, float]:
    """Return (a, b) minimizing ||y - (a + bK)||^2 with slope constraint b ∈ [-1, 0]."""
    if K.size != y.size or K.size < 2:
        raise ValueError("Need at least two points for OLS line")

    K_mean = K.mean()
    y_mean = y.mean()
    Kc = K - K_mean
    yc = y - y_mean
    varK = float(Kc.dot(Kc))
    if varK <= eps:
        b = 0.0  # flat; implies D≈0
        a = y_mean
        return a, b
    b_ols = float(Kc.dot(yc) / varK)
    b = float(np.clip(b_ols, -1.0, 0.0))
    a = float((y - b * K).mean())
    return a, b


def _mad(x: np.ndarray, eps: float) -> float:
    if x.size == 0:
        return eps
    med = np.median(x)
    mad = np.median(np.abs(x - med))
    return float(max(eps, 1.4826 * mad))


def _mad_threshold(resid: np.ndarray, mult: float, eps: float) -> float:
    return float(max(5 * eps, mult * _mad(resid, eps)))


def _feasible_D_band_from_pairwise_slopes(
    K: np.ndarray,
    y: np.ndarray,
    *,
    q_low: float,
    q_high: float,
    clip_D: Tuple[float, float],
    eps: float,
) -> Tuple[Optional[float], Optional[float]]:
    """
    Robust D band from pairwise slopes s_ij = (y_i - y_j)/(K_i - K_j) via quantiles.
    Returns (D_min, D_max) possibly as None if insufficient/degenerate data.
    """
    n = K.size
    if n < 2:
        return None, None

    slopes = []
    for i in range(n - 1):
        dK = K[i + 1 :] - K[i]
        m = np.abs(dK) > eps
        if not m.any():
            continue
        dy = y[i + 1 :] - y[i]
        slopes.append(dy[m] / dK[m])

    if not slopes:
        return None, None

    s_all = np.concatenate(slopes)
    s_all = s_all[np.isfinite(s_all)]
    if s_all.size == 0:
        return None, None

    lo_q, hi_q = np.quantile(s_all, (q_low, q_high))
    s_lo, s_hi = sorted((float(lo_q), float(hi_q)))  # s_lo <= s_hi

    # D = -s (order flips)
    D_lo_raw, D_hi_raw = -s_hi, -s_lo  # D_lo_raw <= D_hi_raw

    # Do NOT clamp here; canonicalization/containment happens in _canon_D_band
    return float(D_lo_raw), float(D_hi_raw)


def _future_band_from_D_band(
    K: np.ndarray,
    y: np.ndarray,
    D_min: Optional[float],
    D_max: Optional[float],
) -> Tuple[Optional[float], Optional[float]]:
    """
    Intersect per-strike future intervals induced by D ∈ [D_min, D_max].
    Vectorized, low-branch version:
      • Guards: shape/None/finite/positive
      • Computes per-strike [lo_i, hi_i] with a single min/max
      • Intersects across strikes; empty ⇒ (None, None)
    """
    # Basic shape/None guards
    if K.size == 0 or y.size == 0 or K.size != y.size or D_min is None or D_max is None:
        return None, None

    # Finite + ordered positive D-band
    d = np.array([D_min, D_max], dtype=float)
    if not np.all(np.isfinite(d)):
        return None, None
    d.sort()
    if d[0] <= 0.0:
        return None, None
    D_lo, D_hi = float(d[0]), float(d[1])

    # Vectorized per-strike intervals over D∈[D_lo,D_hi]
    f_at_lo = K + y / D_lo
    f_at_hi = K + y / D_hi
    lo_i = np.minimum(f_at_lo, f_at_hi)
    hi_i = np.maximum(f_at_lo, f_at_hi)

    # Intersection across strikes
    F_lo = float(np.max(lo_i)) if lo_i.size else -np.inf
    F_hi = float(np.min(hi_i)) if hi_i.size else np.inf

    # Empty or non-finite => no feasible intersection
    if not (np.isfinite(F_lo) and np.isfinite(F_hi)) or F_lo > F_hi:
        return None, None
    return F_lo, F_hi


def _scatter_back_mask(
    base_mask: np.ndarray, idx: np.ndarray, inlier_local: np.ndarray
) -> np.ndarray:
    out = np.zeros_like(base_mask, dtype=bool)
    out_idx = idx[inlier_local]
    out[out_idx] = True
    return out


def _canon_D_band(
    D_min: Optional[float],
    D_max: Optional[float],
    D_hat: float,
    c_lo: float,
    c_hi: float,
    tiny: float,
) -> Tuple[float, float]:
    """
    Canonicalize (order), clamp to [c_lo, c_hi], and softly contain D_hat±tiny.
    If the incoming band is missing/invalid or entirely outside the clip,
    seed with a soft band around D_hat. Always returns an ordered pair.
    """
    lo_c, hi_c = (c_lo, c_hi) if c_lo <= c_hi else (c_hi, c_lo)

    # Seed when band is missing or non-finite
    if (
        (D_min is None)
        or (D_max is None)
        or (not np.isfinite(D_min))
        or (not np.isfinite(D_max))
    ):
        xm = float(np.clip(D_hat, lo_c, hi_c))
        return max(lo_c, xm - tiny), min(hi_c, xm + tiny)

    # Order raw band
    d_lo, d_hi = (D_min, D_max) if D_min <= D_max else (D_max, D_min)

    # Entirely outside clip ⇒ collapse near D_hat
    if (d_hi < lo_c) or (d_lo > hi_c):
        xm = float(np.clip(D_hat, lo_c, hi_c))
        return max(lo_c, xm - tiny), min(hi_c, xm + tiny)

    # Clamp and keep order
    d_lo = max(lo_c, d_lo)
    d_hi = min(hi_c, d_hi)

    # Softly contain D_hat ± tiny (these ops cannot invert order for tiny ≥ 0)
    xm = float(np.clip(D_hat, lo_c, hi_c))
    d_lo = min(d_lo, xm + tiny)
    d_hi = max(d_hi, xm - tiny)

    # Order is guaranteed here; returning directly removes an unreachable branch
    return float(d_lo), float(d_hi)


# ------------------------------ public API -------------------------------------


[docs] def estimate_future_from_option_prices( K: np.ndarray, call: np.ndarray, put: np.ndarray, *, eps: float = 1e-12, plot: bool = False, ax=None, slope_quantiles: Tuple[float, float] = (0.25, 0.75), clip_D: Tuple[float, float] = (1e-6, 1.0), trim_mad_mult: float = 3.5, max_trim_iters: int = 5, # optional, robust thresholds tol_sigma: Optional[float] = None, rel_tol: Optional[float] = None, abs_tol: Optional[float] = None, day: float = None, t: float = None, ) -> Tuple[Optional[ImpliedFutureResult], np.ndarray]: """Infer future F and discount factor D from *single* option prices. Returns ------- (result, valid_mask) result : ImpliedFutureResult or None None if fewer than 2 distinct strikes (after filtering) or no feasible band could be established. valid_mask : ndarray of bool, shape (len(K),) In ORIGINAL input order. True if a strike was used in the solution. """ K = _as_np1d(K, "K") C = _as_np1d(call, "call") P = _as_np1d(put, "put") if not (len(K) == len(C) == len(P)): raise ValueError("K, call, put must have the same length") n = len(K) finite_mask = _finite_mask(K, C, P) if finite_mask.sum() < 2: return None, np.zeros(n, dtype=bool) c_lo, c_hi = clip_D if clip_D[0] <= clip_D[1] else (clip_D[1], clip_D[0]) idx = np.nonzero(finite_mask)[0] Kv = K[idx] yv = (C - P)[idx] if _count_distinct(Kv, eps) < 2: return None, np.zeros(n, dtype=bool) # Robust trim loop around constrained OLS fit of y = a + bK, b∈[-1,0] inlier = np.ones_like(Kv, dtype=bool) def fit(Kx, yx): a, b = _constrained_ols_line(Kx, yx, eps=eps) return a, b a_hat, b_hat = fit(Kv[inlier], yv[inlier]) for _ in range(max_trim_iters): resid = yv - (a_hat + b_hat * Kv) if (tol_sigma is not None) or (rel_tol is not None) or (abs_tol is not None): # Sigma/relative thresholding scale = _mad(resid[inlier], eps) if not np.isfinite(scale) or scale <= eps: scale = max(np.median(np.abs(resid[inlier])), eps) sig = tol_sigma if tol_sigma is not None else trim_mad_mult a_tol = abs_tol if abs_tol is not None else 0.0 r_tol = rel_tol if rel_tol is not None else 0.0 thresh = sig * scale allow = np.abs(resid) <= ( thresh + a_tol + r_tol * np.maximum(np.abs(yv), eps) ) else: # Legacy MAD trimming thresh = _mad_threshold(resid[inlier], mult=trim_mad_mult, eps=eps) allow = np.abs(resid) <= thresh new_inlier = allow if new_inlier.sum() < 2: break if np.array_equal(new_inlier, inlier): break inlier = new_inlier a_hat, b_hat = fit(Kv[inlier], yv[inlier]) if inlier.sum() < 2 or _count_distinct(Kv[inlier], eps) < 2: return None, np.zeros(n, dtype=bool) # Final fit a_hat, b_hat = fit(Kv[inlier], yv[inlier]) D_hat = -b_hat D_hat = float(np.clip(D_hat, c_lo, c_hi)) if not np.isfinite(D_hat) or D_hat <= 0.0: return None, _scatter_back_mask(finite_mask, idx, inlier) F_hat = a_hat / D_hat # D interval from pairwise slopes on inliers q_low, q_high = slope_quantiles raw_D_min, raw_D_max = _feasible_D_band_from_pairwise_slopes( Kv[inlier], yv[inlier], q_low=q_low, q_high=q_high, clip_D=(c_lo, c_hi), eps=eps ) tiny = max(1e-12, 5 * eps) D_min, D_max = _canon_D_band(raw_D_min, raw_D_max, D_hat, c_lo, c_hi, tiny) # Map D band to F band F_lo, F_hi = _future_band_from_D_band(Kv[inlier], yv[inlier], D_min, D_max) # Fallback: contain the point estimate tightly when mapping failed/empty if (F_lo is None) or (F_hi is None) or (F_lo > F_hi): F_lo, F_hi = (F_hat - tiny, F_hat + tiny) else: F_lo = float(min(F_lo, F_hat + tiny)) F_hi = float(max(F_hi, F_hat - tiny)) # Guarantee F-band contains F_hat F_lo = min(F_lo, F_hat) F_hi = max(F_hi, F_hat) result = ImpliedFutureResult( F=float(F_hat), F_bid=float(F_lo), F_ask=float(F_hi), D=float(D_hat), D_min=float(D_min), D_max=float(D_max), ) out_mask = _scatter_back_mask(finite_mask, idx, inlier) # Optional plot: fully guarded in try/except to never leak plot errors if plot: estimate_future_from_option_prices_plot( K=Kv, C=C[idx], P=P[idx], inlier=inlier, F_hat=F_hat, F_bid=F_lo, F_ask=F_hi, D_min=D_min, D_max=D_max, D_display=(D_min + D_max) / 2, ax=ax, ) return result, out_mask