Source code for volkit.estimate.future_from_option_quotes

# volkit/implied/future.py

from __future__ import annotations
from typing import Optional, Tuple, List, Dict, Iterable, DefaultDict
from collections import defaultdict
import numpy as np

from volkit.estimate.future_res import ImpliedFutureResult
from volkit.estimate.future_from_option_quotes_plot import (
    estimate_future_from_option_quotes_plot,
)


# ======================================================================
# Public API
# ======================================================================


[docs] def estimate_future_from_option_quotes( K: np.ndarray, call_bid: np.ndarray, call_ask: np.ndarray, put_bid: np.ndarray, put_ask: np.ndarray, *, eps: float = 1e-12, plot: bool = False, ax=None, ) -> Tuple[Optional[ImpliedFutureResult], np.ndarray]: """ Robust single-expiry inference with minimal exclusions + max-width future band. Returns ------- (result, valid_mask) result : ImpliedFutureResult or None No feasible subset if fewer than 2 distinct strikes. valid_mask : ndarray of bool, shape (len(K),) In ORIGINAL input order. True for strikes used by the solution; False otherwise. Notes ----- - If result is None, valid_mask may still mark one "best" row *only* in the case of Ds == ∅ *and* there exist ≥2 distinct strikes among valid rows. If all valid rows share the same strike (e.g., duplicates-only situation), the mask is all False. - Duplicate-K subsets are rejected when selecting the final subset (they do not define a future band). - Selection across depths: we try the highest feasible depth first, then fall back to lower depths (≥2). Within a depth, we choose the subset/D that maximizes width. """ # ---------------------------- # 0) Basic filtering & ordering # ---------------------------- n0 = len(K) mask_out = np.zeros(n0, dtype=bool) K0, Cb0, Ca0, Pb0, Pa0, idx_sorted_to_orig = _filter_and_sort_finite( K, call_bid, call_ask, put_bid, put_ask ) if K0.size < 2: return None, mask_out # At least two *distinct* strikes present in finite slice? if np.unique(K0).size < 2: return None, mask_out L, U = _make_LU(Cb0, Ca0, Pb0, Pa0) # Quick validity flags at row-level (independent of D) row_valid = (U >= L + eps) & np.isfinite(L) & np.isfinite(U) # ------------------------------------------- # 1) Candidate discount grid (event locations) # ------------------------------------------- Ds = _critical_discounts(K0, L, U, eps=eps) # If NO positive candidate discounts: # Mark narrowest valid singleton (only if ≥2 distinct K among valid rows), # else leave mask empty. if Ds.size == 0: idx_single = _narrowest_valid_singleton_index(K0, L, U, row_valid, eps) if idx_single is not None: mask_out[idx_sorted_to_orig[idx_single]] = True return None, mask_out eval_Ds = _with_midpoints(Ds) # -------------------------------------------------------------------- # 2) Collect ALL max-overlap subsets grouped by DEPTH across all eval_Ds # -------------------------------------------------------------------- depths_to_subsets, subset_to_Ds = _collect_subsets_by_depth( K0, L, U, row_valid, eval_Ds, eps=eps ) if not depths_to_subsets: # No valid intervals anywhere return None, mask_out # ------------------------------------------------------------ # 3) Choose subset with fallback: highest depth -> lower depth # (reject duplicate-K subsets; require feasible D interval) # ------------------------------------------------------------ choice = _choose_subset_with_fallback( K0, L, U, depths_to_subsets, subset_to_Ds, eps=eps ) if choice is None: # Nothing feasible across any depth (≥2) return None, mask_out D_star, F_bid_star, F_ask_star, D_lo, D_hi, key_sorted = choice keep_orig = idx_sorted_to_orig[np.array(key_sorted, dtype=int)] mask_out[keep_orig] = True result = ImpliedFutureResult( F=float(0.5 * (F_bid_star + F_ask_star)), F_bid=float(F_bid_star), F_ask=float(F_ask_star), D=float((D_lo + D_hi) / 2), D_min=float(D_lo), D_max=float(D_hi), ) if plot: estimate_future_from_option_quotes_plot( K, call_bid, call_ask, put_bid, put_ask, result, mask_out, ax=ax ) return result, mask_out
# ====================================================================== # Helpers (private) # ====================================================================== def _filter_and_sort_finite( K: np.ndarray, Cb: np.ndarray, Ca: np.ndarray, Pb: np.ndarray, Pa: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Keep rows that are finite on all fields; return arrays sorted by K + back-map.""" K = np.asarray(K, float) Cb = np.asarray(Cb, float) Ca = np.asarray(Ca, float) Pb = np.asarray(Pb, float) Pa = np.asarray(Pa, float) finite = ( np.isfinite(K) & np.isfinite(Cb) & np.isfinite(Ca) & np.isfinite(Pb) & np.isfinite(Pa) ) idx_finite = np.nonzero(finite)[0] if idx_finite.size == 0: return K[:0], Cb[:0], Ca[:0], Pb[:0], Pa[:0], idx_finite Kf, Cbf, Caf, Pbf, Paf = ( K[idx_finite], Cb[idx_finite], Ca[idx_finite], Pb[idx_finite], Pa[idx_finite], ) order = np.argsort(Kf) idx_sorted_to_orig = idx_finite[order] return Kf[order], Cbf[order], Caf[order], Pbf[order], Paf[order], idx_sorted_to_orig def _make_LU( Cb: np.ndarray, Ca: np.ndarray, Pb: np.ndarray, Pa: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """L = C_bid - P_ask; U = C_ask - P_bid.""" L = Cb - Pa U = Ca - Pb return L, U def _critical_discounts( K: np.ndarray, L: np.ndarray, U: np.ndarray, eps: float ) -> np.ndarray: """ Event discounts where the sorted order of interval endpoints can change. Uses equalities among {a_i, b_i} with a_i = D*K_i + L_i, b_i = D*K_i + U_i. Only positive D are kept. """ n = len(K) cand: List[float] = [] for i in range(n): for j in range(n): if i == j: continue denom = K[i] - K[j] if abs(denom) < eps: continue # a_i = a_j ; b_i = b_j ; a_i = b_j D1 = (L[j] - L[i]) / denom D2 = (U[j] - U[i]) / denom D3 = (U[j] - L[i]) / denom for D in (D1, D2, D3): if np.isfinite(D) and (D > 0): cand.append(float(D)) if not cand: return np.array([], dtype=float) return np.array(sorted(set(cand)), dtype=float) def _with_midpoints(Ds: np.ndarray) -> np.ndarray: """Add midpoints between consecutive Ds; de-duplicate; keep ascending.""" if Ds.size == 0: return Ds mids = (Ds[:-1] + Ds[1:]) * 0.5 out = np.concatenate([Ds, mids]) out.sort() mask = np.ones_like(out, dtype=bool) mask[1:] = np.abs(np.diff(out)) > 1e-15 return out[mask] def _intervals_at_D( K: np.ndarray, L: np.ndarray, U: np.ndarray, D: float ) -> Tuple[np.ndarray, np.ndarray]: a = D * K + L b = D * K + U return a, b def _max_overlap_subset_at_D( a: np.ndarray, b: np.ndarray, row_valid: np.ndarray, eps: float ) -> Tuple[int, float, np.ndarray]: """ Sweep-line on [a_i, b_i] to find maximum overlap depth and an x* achieving it. Returns (depth, x_star, indices_active_at_x_star) while ignoring invalid rows. """ events: List[Tuple[float, int, int]] = [] for i, ok in enumerate(row_valid): if not ok: continue ai, bi = a[i], b[i] if not (np.isfinite(ai) and np.isfinite(bi) and ai <= bi + eps): continue events.append((ai, +1, i)) events.append((bi, -1, i)) if not events: return 0, np.nan, np.array([], dtype=int) events.sort(key=lambda t: (t[0], -t[1])) # starts before ends at same x best_depth = -1 x_star = events[0][0] active = 0 for x, typ, _ in events: if typ == +1: active += 1 if active > best_depth: best_depth = active x_star = x else: active -= 1 S = np.where(row_valid & (a <= x_star + eps) & (b + eps >= x_star))[0] return best_depth, x_star, S def _collect_subsets_by_depth( K: np.ndarray, L: np.ndarray, U: np.ndarray, row_valid: np.ndarray, eval_Ds: Iterable[float], eps: float, ) -> Tuple[Dict[int, List[Tuple[int, ...]]], Dict[Tuple[int, ...], List[float]]]: """ For each D in eval_Ds, take the *maximum-overlap* active set at x* and group by depth. Returns: depths_to_subsets: depth -> list of subset keys (tuples of sorted indices) subset_to_Ds : subset key -> list of D values where it appeared """ depths_to_subsets: DefaultDict[int, List[Tuple[int, ...]]] = defaultdict(list) subset_to_Ds: DefaultDict[Tuple[int, ...], List[float]] = defaultdict(list) for D in eval_Ds: a, b = _intervals_at_D(K, L, U, D) depth, _, keep_idx = _max_overlap_subset_at_D(a, b, row_valid, eps=eps) if depth <= 0: continue key = tuple(keep_idx.tolist()) # Deduplicate per depth if not depths_to_subsets[depth] or depths_to_subsets[depth][-1] != key: depths_to_subsets[depth].append(key) subset_to_Ds[key].append(D) return dict(depths_to_subsets), dict(subset_to_Ds) def _feasible_D_interval_for_subset( K: np.ndarray, L: np.ndarray, U: np.ndarray, S: np.ndarray, eps: float ) -> Optional[Tuple[float, float]]: """ Feasible D interval such that ⋂_{i∈S} [D*K_i+L_i, D*K_i+U_i] ≠ ∅ and D>0. Reject subsets with <2 distinct strikes. """ Ks, Ls, Us = K[S], L[S], U[S] if np.unique(Ks).size < 2: return None D_lo = 0.0 D_hi = np.inf m = len(S) for p in range(m): for q in range(m): if Ks[q] <= Ks[p] + eps: continue denom = Ks[q] - Ks[p] lo = (Ls[p] - Us[q]) / denom hi = (Us[p] - Ls[q]) / denom if np.isfinite(lo): D_lo = max(D_lo, lo) if np.isfinite(hi): D_hi = min(D_hi, hi) D_lo = max(D_lo, eps) if not np.isfinite(D_hi) or (D_lo > D_hi + 1e-15): return None return float(D_lo), float(D_hi) def _width_candidate_discounts_for_subset( K: np.ndarray, L: np.ndarray, U: np.ndarray, S: np.ndarray, D_lo: float, D_hi: float, eps: float, ) -> np.ndarray: """ Candidate Ds to check the future band width on a fixed subset S: endpoints of the feasible interval plus cross-equalities among a_i/b_i for i∈S within [D_lo, D_hi]. """ Ks, Ls, Us = K[S], L[S], U[S] cand: List[float] = [D_lo, D_hi] m = len(S) for p in range(m): for q in range(p + 1, m): denom = Ks[p] - Ks[q] if abs(denom) < eps: continue D1 = (Ls[q] - Ls[p]) / denom # a_p = a_q D2 = (Us[q] - Us[p]) / denom # b_p = b_q for D in (D1, D2): if np.isfinite(D) and (D > 0) and (D_lo - 1e-14 <= D <= D_hi + 1e-14): cand.append(float(D)) Ds = np.array(sorted(set(cand)), dtype=float) if Ds.size >= 2: mids = (Ds[:-1] + Ds[1:]) * 0.5 Ds = np.unique(np.concatenate([Ds, mids])) Ds.sort() return Ds def _future_band_for_subset_at_D( K: np.ndarray, L: np.ndarray, U: np.ndarray, S: np.ndarray, D: float ) -> Tuple[float, float]: """ For fixed subset S and discount D, return (F_bid, F_ask) s.t. F ∈ [F_bid, F_ask] iff there exists X with X ∈ ⋂_{i∈S} [D*K_i+L_i, D*K_i+U_i] and F = X / D. """ Ks, Ls, Us = K[S], L[S], U[S] a = D * Ks + Ls b = D * Ks + Us X_low = float(np.max(a)) X_high = float(np.min(b)) return X_low / D, X_high / D def _choose_subset_with_fallback( K: np.ndarray, L: np.ndarray, U: np.ndarray, depths_to_subsets: Dict[int, List[Tuple[int, ...]]], subset_to_Ds: Dict[Tuple[int, ...], List[float]], eps: float, ) -> Optional[Tuple[float, float, float, float, float, Tuple[int, ...]]]: """ Iterate depths from high to low (≥2). For each subset at that depth: - reject if it has <2 distinct Ks - compute feasible D interval; skip if None - build width candidate Ds within [D_lo, D_hi]; maximize width = F_ask - F_bid Return (D_star, F_bid_star, F_ask_star, D_lo, D_hi, subset_key) or None. """ if not depths_to_subsets: return None for depth in sorted(depths_to_subsets.keys(), reverse=True): if depth < 2: continue best: Optional[Tuple[float, float, float, float, float, Tuple[int, ...]]] = None best_w = -np.inf for key in depths_to_subsets[depth]: S_sorted = np.array(key, dtype=int) # Early reject duplicate-K subsets if np.unique(K[S_sorted]).size < 2: continue D_interval = _feasible_D_interval_for_subset(K, L, U, S_sorted, eps=eps) if D_interval is None: continue D_lo, D_hi = D_interval D_cands = _width_candidate_discounts_for_subset( K, L, U, S_sorted, D_lo, D_hi, eps=eps ) if D_cands.size == 0: continue for D in D_cands: F_bid, F_ask = _future_band_for_subset_at_D(K, L, U, S_sorted, D) w = F_ask - F_bid if (w > best_w + 1e-15) or ( abs(w - best_w) <= 1e-15 and (best is None or D < best[0]) ): best_w = w best = (D, F_bid, F_ask, D_lo, D_hi, key) if best is not None: return best return None def _narrowest_valid_singleton_index( K: np.ndarray, L: np.ndarray, U: np.ndarray, row_valid: np.ndarray, eps: float ) -> Optional[int]: """ Return index (in the *sorted* slice) of the narrowest valid singleton IF there are at least two distinct strikes among valid rows. Otherwise return None. """ valid_idx = np.where(row_valid)[0] if valid_idx.size == 0: return None # require ≥ 2 distinct strikes among valid rows; else return None if np.unique(K[valid_idx]).size < 2: return None widths = U[valid_idx] - L[valid_idx] arg = np.argmin(widths) # ties -> first return int(valid_idx[arg])