Source code for timedb.client

"""TimeDBClient — the ClickHouse-only public facade.

Pure time-series I/O. No metadata, no runs table, no shape dispatch. Callers
(energydb) supply ``series_id``, ``run_id``, and ``retention`` as context;
``timedb`` just stores and retrieves.
"""

from __future__ import annotations

import os
from collections.abc import Sequence
from datetime import datetime, timedelta
from datetime import time as dt_time
from importlib import resources

import clickhouse_connect
import pandas as pd
import polars as pl

from . import read as _read
from . import write as _write


def _get_ch_url() -> str:
    ch_url = os.environ.get("TIMEDB_CH_URL")
    if not ch_url:
        raise ValueError("ClickHouse connection not configured. Pass ch_url or set TIMEDB_CH_URL.")
    return ch_url


_DDL = resources.files("timedb").joinpath("sql", "ch_create_tables.sql").read_text(encoding="utf-8")

_CH_TABLES = ["series_values", "run_series"]


[docs] class TimeDBClient: # Auxiliary CH clients for parallel inserts. clickhouse-connect rejects # concurrent calls on a single client ("Attempt to execute concurrent # queries within the same session"), so we keep a small sidecar pool # for the write path. Two clients is the sweet spot: measured 556 ms → # 349 ms (1.59×) on the 1.7 M-row insert; 4-way is no better because the # CH-side write pipeline (parsing + merge tree insert) saturates first. _AUX_INSERT_CLIENTS = 1 # number of clients beyond ``self._ch``
[docs] def __init__(self, ch_url: str | None = None): self._ch_url = ch_url or _get_ch_url() self._ch = clickhouse_connect.get_client(dsn=self._ch_url) self._aux_clients: list = []
def _ensure_aux_clients(self) -> list: """Lazily build the sidecar insert clients (constructed on first big write).""" if not self._aux_clients: self._aux_clients = [ clickhouse_connect.get_client(dsn=self._ch_url) for _ in range(self._AUX_INSERT_CLIENTS) ] return self._aux_clients # ------------------------------------------------------------------ # Schema # ------------------------------------------------------------------
[docs] def create(self) -> None: """Create the series_values table and run_series mapping.""" for statement in _DDL.split(";"): s = statement.strip() if not s: continue non_comment = [ln for ln in s.splitlines() if ln.strip() and not ln.strip().startswith("--")] if not non_comment: continue self._ch.command(s)
[docs] def delete(self) -> None: """Drop both CH tables.""" for name in _CH_TABLES: self._ch.command(f"DROP TABLE IF EXISTS {name}")
# ------------------------------------------------------------------ # I/O # ------------------------------------------------------------------ def write( self, df: pd.DataFrame | pl.DataFrame, *, retention: str | None = None, knowledge_time: datetime | None = None, ) -> None: return _write.write( self._ch, df, retention=retention, knowledge_time=knowledge_time, aux_clients=self._ensure_aux_clients, ) def read( self, *, series_ids: Sequence[int], retention: str | Sequence[str] | None = None, start_valid: datetime | None = None, end_valid: datetime | None = None, start_known: datetime | None = None, end_known: datetime | None = None, include_updates: bool = False, include_knowledge_time: bool = False, ) -> pl.DataFrame: return _read.read( self._ch, series_ids=series_ids, retention=retention, start_valid=start_valid, end_valid=end_valid, start_known=start_known, end_known=end_known, include_updates=include_updates, include_knowledge_time=include_knowledge_time, ) def read_relative( self, *, series_ids: Sequence[int], retention: str | Sequence[str] | None = None, window_length: timedelta | None = None, issue_offset: timedelta | None = None, start_window: datetime | None = None, start_valid: datetime | None = None, end_valid: datetime | None = None, days_ahead: int | None = None, time_of_day: dt_time | None = None, ) -> pl.DataFrame: return _read.read_relative( self._ch, series_ids=series_ids, retention=retention, window_length=window_length, issue_offset=issue_offset, start_window=start_window, start_valid=start_valid, end_valid=end_valid, days_ahead=days_ahead, time_of_day=time_of_day, )
[docs] def read_run_series( self, *, series_id: int, ) -> list[int]: """Return run_ids that touched a given series_id, latest first. Data only — the ``energydb.runs`` PG table hydrates the metadata. """ sql = """ SELECT run_id FROM run_series FINAL WHERE series_id = {series_id:UInt64} ORDER BY first_seen DESC """ result = self._ch.query(sql, parameters={"series_id": series_id}) return [int(row[0]) for row in result.result_rows]