Skip to content

BenchmarkoorClient

BenchmarkoorClient

BenchmarkoorClient(token: str | None = None, *, cache_dir: Path | None = None, max_workers: int = DEFAULT_MAX_WORKERS, page_size: int = DEFAULT_PAGE_SIZE, retries: int = 3, backoff_factor: int | float = 2, retry_status: tuple[int, ...] = (502, 503, 524), base_url: str = BASE_URL, files_base_url: str = FILES_BASE_URL, cache_enabled: bool = True, verbose: bool = False, reporter: Reporter | None = None)

Read-only client for the Benchmarkoor API.

Token resolution: the token kwarg wins over the BENCHMARKOOR_TOKEN environment variable. Missing both raises RuntimeError at construction.

The client mirrors the four endpoints the package needs (/suites, /runs, /test_stats, /files/.../summary.json) plus a parse(...) wrapper that runs the title parser + opcount join in one call.

Source code in src/benchmarkoor_fetch/client/__init__.py
def __init__(
    self,
    token: str | None = None,
    *,
    cache_dir: Path | None = None,
    max_workers: int = DEFAULT_MAX_WORKERS,
    page_size: int = DEFAULT_PAGE_SIZE,
    retries: int = 3,
    backoff_factor: int | float = 2,
    retry_status: tuple[int, ...] = (502, 503, 524),
    base_url: str = BASE_URL,
    files_base_url: str = FILES_BASE_URL,
    cache_enabled: bool = True,
    verbose: bool = False,
    reporter: Reporter | None = None,
) -> None:
    resolved = token if token is not None else os.environ.get("BENCHMARKOOR_TOKEN")
    if not resolved:
        raise RuntimeError(
            "Benchmarkoor bearer token not found: pass `token=...` or set "
            "BENCHMARKOOR_TOKEN in the environment."
        )
    self._token = resolved
    self._max_workers = max_workers
    self._page_size = page_size
    self._base_url = base_url
    self._files_base_url = files_base_url
    if reporter is not None:
        self.reporter = reporter
    else:
        self.reporter = Reporter(level="verbose" if verbose else "info")
    self._fork: str | None = None

    http_cfg = _SimpleHttpConfig(
        retries=retries,
        backoff_factor=backoff_factor,
        retry_status=retry_status,
    )
    self._session = session_module.build_session(http_cfg)
    self._session.headers.update({"Authorization": f"Bearer {resolved}"})

    self._cache_dir = Path(cache_dir) if cache_dir is not None else None
    if self._cache_dir is not None:
        self._cache: DiskCache | None = DiskCache(
            root=self._cache_dir,
            enabled=cache_enabled,
            reporter=self.reporter,
        )
    else:
        self._cache = None

fetch_test_stats

fetch_test_stats(run_ids: list[str] | list[dict[str, Any]], *, page_size: int | None = None, suite_hash: str | None = None) -> pd.DataFrame

Fetch /test_stats for each run_id; cache per-run when suite_hash is given.

run_ids accepts either a list of run_id strings or a list of run record dicts (as returned by list_runs); dicts have their run_id key extracted.

Source code in src/benchmarkoor_fetch/client/__init__.py
def fetch_test_stats(
    self,
    run_ids: list[str] | list[dict[str, Any]],
    *,
    page_size: int | None = None,
    suite_hash: str | None = None,
) -> pd.DataFrame:
    """Fetch /test_stats for each run_id; cache per-run when `suite_hash` is given.

    `run_ids` accepts either a list of run_id strings or a list of run
    record dicts (as returned by `list_runs`); dicts have their `run_id`
    key extracted.
    """
    effective_page_size = page_size if page_size is not None else self._page_size
    run_ids = [r["run_id"] if isinstance(r, dict) else r for r in run_ids]

    if not run_ids:
        return test_stats_module.fetch_test_stats(
            self._session,
            run_ids=[],
            page_size=effective_page_size,
            max_workers=self._max_workers,
            base_url=self._base_url,
        )

    if suite_hash is not None:
        progress_desc = f"fetching test_stats (suite {suite_hash[:10]})"
    else:
        progress_desc = "fetching test_stats"

    parts: list[pd.DataFrame] = []
    for run_id in self.reporter.progress(
        run_ids, total=len(run_ids), desc=progress_desc
    ):
        self.reporter.detail(f"run {run_id}: fetching test_stats")
        cache_key = (
            self._cache.test_stats_key(suite_hash=suite_hash, run_id=run_id)
            if self._cache is not None and suite_hash is not None
            else None
        )

        def fetch_raw(rid: str = run_id) -> pd.DataFrame:
            return test_stats_module.fetch_test_stats_for_run_raw(
                self._session,
                run_id=rid,
                page_size=effective_page_size,
                max_workers=self._max_workers,
                base_url=self._base_url,
            )

        if cache_key is not None and self._cache is not None:
            raw = self._cache.get_or_fetch_parquet(cache_key, fetch_raw)
        else:
            raw = fetch_raw()
        parts.append(raw)

    combined = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
    normalised = test_stats_module.normalise_columns(combined)
    for col in (
        "run_id",
        "client_name",
        "test_title",
        "test_runtime_ms",
        "ingestion_timestamp",
    ):
        if col not in normalised.columns:
            normalised[col] = pd.Series(dtype=object)
    return normalised

fetch_trace

fetch_trace(suite_hash: str) -> dict[str, Any]

Fetch the per-suite trace summary (cached when a cache_dir is set).

Returns a {test_title: {opcode_name: count}} mapping; the raw API payload is cached on disk and transformed on every call.

Source code in src/benchmarkoor_fetch/client/__init__.py
def fetch_trace(self, suite_hash: str) -> dict[str, Any]:
    """Fetch the per-suite trace summary (cached when a cache_dir is set).

    Returns a `{test_title: {opcode_name: count}}` mapping; the raw API
    payload is cached on disk and transformed on every call.
    """

    def fetcher() -> dict[str, Any]:
        return traces_module.fetch_trace_raw(
            self._session,
            suite_hash=suite_hash,
            files_base_url=self._files_base_url,
        )

    if self._cache is None:
        raw = fetcher()
    else:
        key = self._cache.summary_key(suite_hash=suite_hash)
        raw = self._cache.get_or_fetch_json(key, fetcher)
    return traces_module.transform_trace(raw)

list_runs

list_runs(suite_hash: str, *, start_date: str | None = None, end_date: str | None = None, run_id_pattern: str | None = None) -> list[dict[str, Any]]

List runs for the given suite, optionally narrowed by window.

start_date is applied server-side via the timestamp=gt.{unix_ts} filter; end_date and run_id_pattern are applied in-process. run_id_pattern is a regex matched against each run_id with re.fullmatch — the whole run_id must match. This endpoint is not cached: new runs accumulate over time so the response is not content-addressed (same reasoning as suite discovery).

Source code in src/benchmarkoor_fetch/client/__init__.py
def list_runs(
    self,
    suite_hash: str,
    *,
    start_date: str | None = None,
    end_date: str | None = None,
    run_id_pattern: str | None = None,
) -> list[dict[str, Any]]:
    """List runs for the given suite, optionally narrowed by window.

    `start_date` is applied server-side via the `timestamp=gt.{unix_ts}`
    filter; `end_date` and `run_id_pattern` are applied in-process.
    `run_id_pattern` is a regex matched against each `run_id` with
    `re.fullmatch` — the whole `run_id` must match. This endpoint is
    not cached: new runs accumulate over time so the response is not
    content-addressed (same reasoning as suite discovery).
    """

    def fetcher() -> list[dict[str, Any]]:
        try:
            return runs_module.list_runs(
                self._session,
                suite_hash=suite_hash,
                base_url=self._base_url,
                start_date=start_date,
                page_size=self._page_size,
            )
        except requests.HTTPError as exc:
            raise requests.HTTPError(
                f"{exc} (suite_hash={suite_hash})", response=exc.response
            ) from exc

    raw = fetcher()

    return runs_module.filter_runs(
        raw,
        end_date=str(end_date) if end_date is not None else None,
        run_id_pattern=run_id_pattern,
    )

list_suites

list_suites(*, network: str, fork: str, test_type: str) -> list[dict[str, Any]]

Return all suite entries matching the tuple.

Source code in src/benchmarkoor_fetch/client/__init__.py
def list_suites(
    self, *, network: str, fork: str, test_type: str
) -> list[dict[str, Any]]:
    """Return all suite entries matching the tuple."""
    return suites_module.list_suites(
        self._session,
        network=network,
        fork=fork,
        test_type=test_type,
        base_url=self._base_url,
        page_size=self._page_size,
    )

parse

parse(raw_df: DataFrame, trace_df: DataFrame | dict[str, Any], *, fork: str | None = None) -> tuple[pd.DataFrame, pd.DataFrame]

Run the title parser + opcount join over a raw test_stats frame.

Source code in src/benchmarkoor_fetch/client/__init__.py
def parse(
    self,
    raw_df: pd.DataFrame,
    trace_df: pd.DataFrame | dict[str, Any],
    *,
    fork: str | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Run the title parser + opcount join over a raw test_stats frame."""
    from benchmarkoor_fetch.parse.opcount import add_opcount
    from benchmarkoor_fetch.parse.titles import parse_test_titles

    effective_fork = fork or self._fork or "prague"
    parsed, _ = parse_test_titles(raw_df)

    trace_frame = _trace_to_dataframe(trace_df)
    bench_df = add_opcount(parsed, trace_frame, fork=effective_fork)
    return bench_df, trace_frame

resolve_suite

resolve_suite(*, network: str, fork: str, test_type: str) -> str

Return the suite_hash of the latest suite matching the tuple.

Source code in src/benchmarkoor_fetch/client/__init__.py
def resolve_suite(self, *, network: str, fork: str, test_type: str) -> str:
    """Return the `suite_hash` of the latest suite matching the tuple."""
    self._fork = fork
    return suites_module.resolve_suite(
        self._session,
        network=network,
        fork=fork,
        test_type=test_type,
        base_url=self._base_url,
        page_size=self._page_size,
    )

run

run(config: FetchConfig) -> FetchResult

Execute the full fetch + parse + write pipeline.

Source code in src/benchmarkoor_fetch/client/__init__.py
def run(self, config: FetchConfig) -> FetchResult:
    """Execute the full fetch + parse + write pipeline."""
    from benchmarkoor_fetch.pipeline import run_pipeline

    return run_pipeline(config, client=self)