#!/usr/bin/env python3 """Standalone SEC EDGAR fetcher for claude-code-stock-analysis-skill. Public functions: - get_cik(ticker) - get_company_facts(ticker) - get_financials(ticker) - get_filings_metadata(ticker) Examples: python tools/edgar.py AAPL python tools/edgar.py NVDA --mode filings """ from __future__ import annotations import argparse import json from collections import Counter, defaultdict from datetime import datetime, timezone import time from typing import Any, Optional import requests _SEC_CIK_LOOKUP = "https://www.sec.gov/files/company_tickers.json" _SEC_COMPANY_FACTS = "https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json" _SEC_SUBMISSIONS = "https://data.sec.gov/submissions/CIK{cik}.json" _TIMEOUT = 25 _MAX_RETRIES = 3 _INITIAL_BACKOFF_SECONDS = 1.0 _RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504} _ACCEPTED_FORMS = {"10-K", "10-Q", "20-F", "6-K"} _ANNUAL_FORMS = {"10-K", "20-F"} _QUARTERLY_FORMS = {"10-Q", "6-K"} _HEADERS = { "User-Agent": "claude-code-stock-analysis-skill/1.0 (research@xvary.com)", "Accept": "application/json", "Accept-Encoding": "gzip, deflate", } # statement -> field -> accepted concept labels (US-GAAP + IFRS aliases) _FIELD_CONCEPTS: dict[str, dict[str, tuple[str, ...]]] = { "income_statement": { "revenue": ( "Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "Revenue", "RevenueFromContractsWithCustomers", "RevenueFromRenderingOfServices", ), "gross_profit": ("GrossProfit",), "operating_income": ("OperatingIncomeLoss", "ProfitLossFromOperatingActivities"), "net_income": ( "NetIncomeLoss", "ProfitLoss", "ProfitLossAttributableToOwnersOfParent", ), "eps_diluted": ("EarningsPerShareDiluted", "DilutedEarningsLossPerShare"), "eps_basic": ( "EarningsPerShareBasic", "BasicEarningsLossPerShare", "BasicAndDilutedEarningsLossPerShare", ), "r_and_d": ("ResearchAndDevelopmentExpense",), "sga": ( "SellingGeneralAndAdministrativeExpense", "GeneralAndAdministrativeExpense", ), "interest_expense": ( "InterestExpense", "FinanceCosts", "BorrowingCostsRecognisedAsExpense", ), "income_tax_expense": ("IncomeTaxExpenseBenefit",), }, "balance_sheet": { "total_assets": ("Assets",), "current_assets": ("AssetsCurrent", "CurrentAssets"), "current_liabilities": ("LiabilitiesCurrent", "CurrentLiabilities"), "total_liabilities": ("Liabilities",), "stockholders_equity": ("StockholdersEquity", "Equity"), "cash_and_equivalents": ( "CashAndCashEquivalentsAtCarryingValue", "CashAndCashEquivalents", ), "long_term_debt": ("LongTermDebt", "LongTermDebtNoncurrent", "LongtermBorrowings"), "short_term_borrowings": ( "ShortTermBorrowings", "CurrentPortionOfLongtermBorrowings", ), "shares_outstanding": ( "CommonStockSharesOutstanding", "EntityCommonStockSharesOutstanding", "NumberOfSharesIssued", "ShareIssued", "OrdinarySharesNumber", ), }, "cash_flow": { "operating_cash_flow": ( "NetCashProvidedByOperatingActivities", "OperatingCashFlow", "CashFlowsFromUsedInOperatingActivities", "NetCashProvidedByUsedInOperatingActivities", ), "capex": ( "PaymentsToAcquirePropertyPlantAndEquipment", "PurchaseOfPropertyPlantAndEquipmentClassifiedAsInvestingActivities", ), "depreciation_amortization": ( "DepreciationDepletionAndAmortization", "Depreciation", "DepreciationAndAmortization", "DepreciationExpense", ), "stock_based_compensation": ( "StockBasedCompensation", "ShareBasedCompensation", "AdjustmentsForSharebasedPayments", ), "dividends_paid": ( "DividendsCommonStockCash", "DividendsPaid", "DividendsPaidOrdinarySharesPerShare", ), }, } def _concept_map() -> dict[str, tuple[str, str]]: out: dict[str, tuple[str, str]] = {} for statement, fields in _FIELD_CONCEPTS.items(): for field, concepts in fields.items(): for concept in concepts: out[concept] = (statement, field) return out _CONCEPT_MAP = _concept_map() def _field_concept_priority() -> dict[tuple[str, str], dict[str, int]]: priorities: dict[tuple[str, str], dict[str, int]] = {} for statement, fields in _FIELD_CONCEPTS.items(): for field, concepts in fields.items(): priorities[(statement, field)] = { concept: idx for idx, concept in enumerate(concepts) } return priorities _FIELD_CONCEPT_PRIORITY = _field_concept_priority() def _session() -> requests.Session: s = requests.Session() s.headers.update(_HEADERS) return s def _request_json(url: str, session: requests.Session) -> dict[str, Any]: last_error: Optional[Exception] = None for attempt in range(1, _MAX_RETRIES + 1): try: response = session.get(url, timeout=_TIMEOUT) if response.status_code in _RETRYABLE_STATUS_CODES: raise requests.HTTPError( f"Retryable status {response.status_code}", response=response, ) response.raise_for_status() return response.json() except (requests.RequestException, ValueError) as exc: last_error = exc if attempt >= _MAX_RETRIES: break backoff = _INITIAL_BACKOFF_SECONDS * (2 ** (attempt - 1)) time.sleep(backoff) assert last_error is not None raise last_error def _variants(ticker: str) -> list[str]: t = ticker.strip().upper() candidates = [ t, t.replace(".", "-"), t.replace("-", "."), t.replace(".", ""), t.split(".")[0], t.split("-")[0], ] out: list[str] = [] for c in candidates: if c and c not in out: out.append(c) return out def _parse_period_months(start: Optional[str], end: Optional[str]) -> Optional[int]: if not end: return None if not start: return 0 try: s = datetime.strptime(start, "%Y-%m-%d") e = datetime.strptime(end, "%Y-%m-%d") except ValueError: return None days = (e - s).days if days <= 0: return 0 if days <= 120: return 3 if days <= 210: return 6 if days <= 310: return 9 return 12 def _is_quarterly(form: str, period_months: Optional[int]) -> bool: if form in _QUARTERLY_FORMS: return True return period_months is not None and 1 <= period_months <= 4 def _to_float(value: Any) -> Optional[float]: try: if value is None: return None return float(value) except (TypeError, ValueError): return None def get_cik(ticker: str) -> Optional[str]: """Resolve ticker to zero-padded SEC CIK.""" with _session() as s: data = _request_json(_SEC_CIK_LOOKUP, s) lookup: dict[str, str] = {} for entry in data.values(): if not isinstance(entry, dict): continue symbol = str(entry.get("ticker", "")).strip().upper() cik_raw = entry.get("cik_str") if symbol and cik_raw is not None: lookup[symbol] = str(cik_raw).zfill(10) for candidate in _variants(ticker): if candidate in lookup: return lookup[candidate] return None def get_company_facts(ticker: str) -> dict[str, Any]: """Fetch raw EDGAR companyfacts payload for a ticker.""" normalized = ticker.strip().upper() cik = get_cik(normalized) if not cik: raise ValueError(f"CIK not found for ticker: {normalized}") with _session() as s: facts = _request_json(_SEC_COMPANY_FACTS.format(cik=cik), s) return { "ticker": normalized, "cik": cik, "entity_name": facts.get("entityName", normalized), "facts": facts.get("facts", {}), "raw": facts, "retrieved_utc": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } def get_filings_metadata(ticker: str, limit: int = 10) -> list[dict[str, Any]]: """Return recent SEC filing metadata for common report forms.""" normalized = ticker.strip().upper() cik = get_cik(normalized) if not cik: raise ValueError(f"CIK not found for ticker: {normalized}") with _session() as s: payload = _request_json(_SEC_SUBMISSIONS.format(cik=cik), s) recent = payload.get("filings", {}).get("recent", {}) forms = recent.get("form", []) filing_dates = recent.get("filingDate", []) report_dates = recent.get("reportDate", []) accessions = recent.get("accessionNumber", []) docs = recent.get("primaryDocument", []) rows: list[dict[str, Any]] = [] for index, form in enumerate(forms): if form not in _ACCEPTED_FORMS: continue rows.append( { "form": form, "filing_date": filing_dates[index] if index < len(filing_dates) else None, "report_date": report_dates[index] if index < len(report_dates) else None, "accession_number": accessions[index] if index < len(accessions) else None, "primary_document": docs[index] if index < len(docs) else None, } ) if len(rows) >= limit: break return rows def _extract_line_items(company_facts: dict[str, Any]) -> dict[tuple[str, str], list[dict[str, Any]]]: root = company_facts.get("facts", {}) items: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) for namespace in ("us-gaap", "ifrs-full"): ns = root.get(namespace, {}) if not isinstance(ns, dict): continue for concept, concept_payload in ns.items(): mapped = _CONCEPT_MAP.get(concept) if not mapped: continue statement, field = mapped units = concept_payload.get("units", {}) if not isinstance(units, dict): continue for unit, entries in units.items(): for entry in entries: form = entry.get("form", "") if form not in _ACCEPTED_FORMS: continue value = _to_float(entry.get("val")) if value is None: continue end = entry.get("end") if not end: continue start = entry.get("start") items[(statement, field)].append( { "value": value, "unit": unit, "form": form, "period_end": end, "period_start": start, "period_months": _parse_period_months(start, end), "filed": entry.get("filed"), "concept": concept, "namespace": namespace, } ) return items def _best_entry( records: list[dict[str, Any]], quarterly: bool, statement: str, field: str, ) -> Optional[dict[str, Any]]: if not records: return None scoped: list[dict[str, Any]] = [] for record in records: is_q = _is_quarterly(record.get("form", ""), record.get("period_months")) if quarterly and is_q: scoped.append(record) elif not quarterly and not is_q and record.get("form") in _ANNUAL_FORMS: scoped.append(record) if not scoped: return None concept_priority = _FIELD_CONCEPT_PRIORITY.get((statement, field), {}) if concept_priority: default_rank = len(concept_priority) + 100 best_rank = min(concept_priority.get(r.get("concept", ""), default_rank) for r in scoped) scoped = [ r for r in scoped if concept_priority.get(r.get("concept", ""), default_rank) == best_rank ] unit_counts = Counter(r.get("unit") for r in scoped) preferred_unit = unit_counts.most_common(1)[0][0] scoped = [r for r in scoped if r.get("unit") == preferred_unit] scoped.sort(key=lambda r: (r.get("period_end", ""), r.get("filed", "")), reverse=True) return scoped[0] def _build_snapshot( line_items: dict[tuple[str, str], list[dict[str, Any]]], quarterly: bool, ) -> tuple[dict[str, dict[str, float]], dict[str, dict[str, Any]], Optional[str]]: snapshot: dict[str, dict[str, float]] = { "income_statement": {}, "balance_sheet": {}, "cash_flow": {}, } sources: dict[str, dict[str, Any]] = {} period_end: Optional[str] = None for (statement, field), records in line_items.items(): best = _best_entry( records, quarterly=quarterly, statement=statement, field=field, ) if not best: continue snapshot[statement][field] = best["value"] key = f"{statement}.{field}" sources[key] = { "form": best.get("form"), "filed": best.get("filed"), "period_end": best.get("period_end"), "unit": best.get("unit"), "concept": best.get("concept"), "namespace": best.get("namespace"), } if best.get("period_end") and (not period_end or best["period_end"] > period_end): period_end = best["period_end"] return snapshot, sources, period_end def get_financials(ticker: str) -> dict[str, Any]: """Return normalized annual + quarterly financial snapshots.""" company = get_company_facts(ticker) line_items = _extract_line_items(company) annual_snapshot, annual_sources, annual_period = _build_snapshot( line_items, quarterly=False ) quarterly_snapshot, quarterly_sources, quarterly_period = _build_snapshot( line_items, quarterly=True ) return { "ticker": company["ticker"], "cik": company["cik"], "entity_name": company["entity_name"], "annual": { "period_end": annual_period, "statements": annual_snapshot, "sources": annual_sources, }, "quarterly": { "period_end": quarterly_period, "statements": quarterly_snapshot, "sources": quarterly_sources, }, "retrieved_utc": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } def _main() -> None: parser = argparse.ArgumentParser(description="Standalone EDGAR fetcher") parser.add_argument("ticker", help="Ticker symbol, e.g. AAPL") parser.add_argument( "--mode", default="financials", choices=("financials", "facts", "filings"), help="Output mode", ) parser.add_argument( "--indent", type=int, default=2, help="JSON indent", ) args = parser.parse_args() if args.mode == "financials": payload = get_financials(args.ticker) elif args.mode == "facts": payload = get_company_facts(args.ticker) payload = { "ticker": payload["ticker"], "cik": payload["cik"], "entity_name": payload["entity_name"], "namespaces": list(payload.get("facts", {}).keys()), "retrieved_utc": payload.get("retrieved_utc"), } else: payload = { "ticker": args.ticker.strip().upper(), "filings": get_filings_metadata(args.ticker), "retrieved_utc": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } print(json.dumps(payload, indent=args.indent, sort_keys=False)) if __name__ == "__main__": _main()