Python SDK reference

Full reference for pip install gnosyslabs (imported as gnosys). Sync (GnosysClient) and async (AsyncGnosysClient) facades expose identical methods.

Install

pip install gnosyslabs

Requires Python 3.10+. Dependencies: httpx, pydantic. You install gnosyslabs but import gnosys (like scikit-learnsklearn).

Authentication

API key from GnosysClient(api_key=...) or $GNOSYS_API_KEY / $GNOSYS_API_KEY env. Keys must start with gn_live_. Bad keys raise AuthenticationError at construction time, not on first request — fail fast.

from gnosys import GnosysClient

client = GnosysClient(api_key="gn_live_...")
# or
client = GnosysClient()  # picks up env

Optional: base_url (default https://gnosyslabs.com), timeout (default 30s), max_retries (default 3 on 429/5xx).

client.datasets

upload(path_or_file, *, name, task, target_column) → Dataset

Multipart upload of a CSV or parquet file. Returns metadata (including dataset_id) you pass into a run's spec_template.

ds = client.datasets.upload(
    "spaceship_titanic.csv",
    name="spaceship-titanic-train",
    task="classification",
    target_column="Transported",
)
print(ds.dataset_id, ds.n_rows, ds.n_columns)

list() → list[Dataset]

Active datasets for the tenant.

get(dataset_id) → Dataset

One dataset's metadata.

delete(dataset_id) → None

Soft-delete. Audit is preserved on disk; the dataset becomes unreachable from new runs but historical references still resolve.

client.runs

create(...) → RunCreateResponse

Submit a run. Returns immediately with status="queued".

created = client.runs.create(
    domain="tabular",                       # "tabular" | "strategy"
    strategist={"kind": "agent", ...},      # fixed / hp_sweep / agent / llm
    spec_template={...},                    # required for sweep / agent / llm
    specs=[...],                            # required for fixed
    max_iterations=5,
    tier_reached="paper",                   # exit on first spec clearing the strict promotion gate
    no_progress_window=2,
    concurrency=4,
    llm={"provider": "anthropic", ...},
    llm_critic={"enabled": True},
    enable_code_exec=False,                 # opt-in sandboxed code execution
    sandbox_config={"timeout_s": 600},      # only if enable_code_exec=True
)
print(created.run_id, created.status)

get(run_id) → Run

Fetch one run.

wait(run_id, *, timeout=600.0, poll_interval=2.0) → Run

Block until terminal. Raises TimeoutError after timeout seconds (the run may still complete server-side; the client just stopped waiting).

list(*, status=None, limit=50) → list[Run]

Tenant-scoped, newest first.

iterations(run_id) → RunIterations

Per-round breakdown: iterations[*].iteration, n_proposed, n_records, tier_counts, best_tier_in_iter, stop_reason, duration_seconds.

predict(run_id, file, *, top_n=None) → PredictResponse

Score new rows against the run's kept-spec ensemble. file may be a path (str / Path) or a file-like object. Returns calibrated probabilities (MCGrad-applied) and the argmax class_labels.

preds = client.runs.predict(run.run_id, "test.csv")
for label, probs in zip(preds.class_labels, preds.probabilities):
    print(label, probs)

model_card(run_id, *, format="html") → bytes

Returns the run's HTML model card as bytes. Save to disk as a compliance artefact.

html = client.runs.model_card(run.run_id)
with open(f"model_card_{run.run_id[:8]}.html", "wb") as f:
    f.write(html)

package_submission(run_id) → bytes

Returns either a main.py (tabular code-exec runs) or a tarball (RL / strategy runs) packaging the top kept spec for handoff to a Kaggle-style submission slot.

client.findings

list(*, run_id=None, pipeline_run_id=None, validator=None, severity=None, spec_id=None, limit=100) → list[ValidationFinding]

Filtered findings. Filters AND together. Pass run_id (the customer- facing identifier) OR pipeline_run_id (engine-internal); the server resolves the former to the latter.

client.findings.list(run_id=run.run_id, severity="blocker")
client.findings.list(validator="honest_eval.shuffled_label")
client.findings.list(spec_id="hp-0-C=1")

correlations(*, validators, severity="blocker", mode="and", pipeline_run_id=None, limit=50) → list[CorrelationMatch]

Specs flagged by multiple validators. mode="and" requires every listed validator; mode="or" requires any.

matches = client.findings.correlations(
    validators=["llm_critic", "honest_eval.shuffled_label"],
    severity="blocker",
    mode="and",
)
for m in matches:
    print(m.spec_id, m.matched_validators)
    for f in m.findings:
        print(f"  {f.validator}: {f.detail[:80]}")

client.api_keys

list() → list[ApiKey]

Active + revoked keys for the tenant. Plaintext is never returned — only key_prefix + last_4 for display.

revoke(key_id) → None

Revoke. Subsequent auth attempts with the corresponding plaintext return 401 immediately.

Models

Run

@dataclass(frozen=True, slots=True)  # actually a pydantic model
class Run:
    run_id: str
    tenant_id: str
    domain: str
    strategist: str | None
    status: str          # "queued" | "running" | "completed" | "failed"
    error: str | None
    pipeline_run_id: str | None
    created_at: str
    started_at: str | None
    completed_at: str | None
    is_terminal: bool    # property
    is_completed: bool   # property

IterationOutcome

class IterationOutcome:
    iteration: int
    n_proposed: int
    n_records: int
    best_tier_in_iter: str | None
    tier_counts: dict[str, int]
    stop_reason: str | None
    duration_seconds: float | None
    strategist_name: str | None

ValidationFinding

class ValidationFinding:
    finding_id: str
    exec_id: str | None
    pipeline_run_id: str | None
    domain: str
    spec_id: str
    validator: str
    stage: str       # "pre_execute" | "post_execute" | "honest_eval"
    severity: str    # "info" | "low" | "medium" | "high" | "blocker"
    detail: str | None
    payload: dict
    created_at: str
    is_blocker: bool  # property

CorrelationMatch

class CorrelationMatch:
    spec_id: str
    domain: str
    pipeline_run_id: str | None
    matched_validators: list[str]
    findings: list[ValidationFinding]

ApiKey

class ApiKey:
    key_id: str
    name: str
    key_prefix: str   # e.g. "gn_live_a3"
    last_4: str       # e.g. "X9F2"
    scopes: list[str]
    created_at: str
    revoked_at: str | None
    is_active: bool

    def to_display(self) -> str   # "gn_live_a3…X9F2"

Exceptions

All raised exceptions inherit from GnosysError. Catch the base class for coarse "did the API call work?" checks; catch the narrower subclass to react specifically.

Exception HTTP status Meaning
AuthenticationError 401 Missing / malformed / revoked key
ForbiddenError 403 Tenant suspended
NotFoundError 404 Wrong run_id, or another tenant's
ValidationError 400 / 422 Bad request payload
RateLimitError 429 Burst rate limit; check .retry_after
ServerError 5xx Persistent server-side fault after retries
TimeoutError client.runs.wait exceeded timeout
from gnosys import RateLimitError

try:
    client.runs.create(...)
except RateLimitError as exc:
    time.sleep(exc.retry_after or 60)

Async

from gnosys import AsyncGnosysClient

async def main():
    async with AsyncGnosysClient(api_key="gn_live_...") as client:
        run = await client.runs.create(...)
        run = await client.runs.wait(run.run_id)
        findings = await client.findings.list(run_id=run.run_id)

The async client is the implementation; the sync GnosysClient wraps it on a private event loop. They share the same retry, auth, and error-mapping behaviour.

Versioning

The SDK follows semver. Backwards-incompatible changes bump the major version. Server-side schema changes are forward-compatible — pydantic models are configured to allow extra fields, so an old client against a new server still works modulo new functionality being unreachable.

Changelog

See the changelog for release-by-release diffs.


Found a typo? Tell us.