Python SDK reference
Full reference for pip install gnosyslabs
(imported as gnosys). Sync (GnosysClient) and async
(AsyncGnosysClient) facades expose identical methods.
Install
pip install gnosyslabs
Requires Python 3.10+. Dependencies: httpx, pydantic. You install
gnosyslabs but import gnosys (like scikit-learn → sklearn).
Authentication
API key from GnosysClient(api_key=...) or $GNOSYS_API_KEY /
$GNOSYS_API_KEY env. Keys must start with gn_live_. Bad keys
raise AuthenticationError at construction time, not on first
request — fail fast.
from gnosys import GnosysClient
client = GnosysClient(api_key="gn_live_...")
# or
client = GnosysClient() # picks up env
Optional: base_url (default https://gnosyslabs.com),
timeout (default 30s), max_retries (default 3 on 429/5xx).
client.datasets
upload(path_or_file, *, name, task, target_column) → Dataset
Multipart upload of a CSV or parquet file. Returns metadata
(including dataset_id) you pass into a run's spec_template.
ds = client.datasets.upload(
"spaceship_titanic.csv",
name="spaceship-titanic-train",
task="classification",
target_column="Transported",
)
print(ds.dataset_id, ds.n_rows, ds.n_columns)
list() → list[Dataset]
Active datasets for the tenant.
get(dataset_id) → Dataset
One dataset's metadata.
delete(dataset_id) → None
Soft-delete. Audit is preserved on disk; the dataset becomes unreachable from new runs but historical references still resolve.
client.runs
create(...) → RunCreateResponse
Submit a run. Returns immediately with status="queued".
created = client.runs.create(
domain="tabular", # "tabular" | "strategy"
strategist={"kind": "agent", ...}, # fixed / hp_sweep / agent / llm
spec_template={...}, # required for sweep / agent / llm
specs=[...], # required for fixed
max_iterations=5,
tier_reached="paper", # exit on first spec clearing the strict promotion gate
no_progress_window=2,
concurrency=4,
llm={"provider": "anthropic", ...},
llm_critic={"enabled": True},
enable_code_exec=False, # opt-in sandboxed code execution
sandbox_config={"timeout_s": 600}, # only if enable_code_exec=True
)
print(created.run_id, created.status)
get(run_id) → Run
Fetch one run.
wait(run_id, *, timeout=600.0, poll_interval=2.0) → Run
Block until terminal. Raises TimeoutError after timeout
seconds (the run may still complete server-side; the client
just stopped waiting).
list(*, status=None, limit=50) → list[Run]
Tenant-scoped, newest first.
iterations(run_id) → RunIterations
Per-round breakdown: iterations[*].iteration,
n_proposed, n_records, tier_counts,
best_tier_in_iter, stop_reason, duration_seconds.
predict(run_id, file, *, top_n=None) → PredictResponse
Score new rows against the run's kept-spec ensemble. file may be
a path (str / Path) or a file-like object. Returns calibrated
probabilities (MCGrad-applied) and the argmax class_labels.
preds = client.runs.predict(run.run_id, "test.csv")
for label, probs in zip(preds.class_labels, preds.probabilities):
print(label, probs)
model_card(run_id, *, format="html") → bytes
Returns the run's HTML model card as bytes. Save to disk as a compliance artefact.
html = client.runs.model_card(run.run_id)
with open(f"model_card_{run.run_id[:8]}.html", "wb") as f:
f.write(html)
package_submission(run_id) → bytes
Returns either a main.py (tabular code-exec runs) or a tarball
(RL / strategy runs) packaging the top kept spec for handoff to a
Kaggle-style submission slot.
client.findings
list(*, run_id=None, pipeline_run_id=None, validator=None, severity=None, spec_id=None, limit=100) → list[ValidationFinding]
Filtered findings. Filters AND together. Pass run_id (the customer-
facing identifier) OR pipeline_run_id (engine-internal); the server
resolves the former to the latter.
client.findings.list(run_id=run.run_id, severity="blocker")
client.findings.list(validator="honest_eval.shuffled_label")
client.findings.list(spec_id="hp-0-C=1")
correlations(*, validators, severity="blocker", mode="and", pipeline_run_id=None, limit=50) → list[CorrelationMatch]
Specs flagged by multiple validators. mode="and" requires every
listed validator; mode="or" requires any.
matches = client.findings.correlations(
validators=["llm_critic", "honest_eval.shuffled_label"],
severity="blocker",
mode="and",
)
for m in matches:
print(m.spec_id, m.matched_validators)
for f in m.findings:
print(f" {f.validator}: {f.detail[:80]}")
client.api_keys
list() → list[ApiKey]
Active + revoked keys for the tenant. Plaintext is never returned
— only key_prefix + last_4 for display.
revoke(key_id) → None
Revoke. Subsequent auth attempts with the corresponding plaintext return 401 immediately.
Models
Run
@dataclass(frozen=True, slots=True) # actually a pydantic model
class Run:
run_id: str
tenant_id: str
domain: str
strategist: str | None
status: str # "queued" | "running" | "completed" | "failed"
error: str | None
pipeline_run_id: str | None
created_at: str
started_at: str | None
completed_at: str | None
is_terminal: bool # property
is_completed: bool # property
IterationOutcome
class IterationOutcome:
iteration: int
n_proposed: int
n_records: int
best_tier_in_iter: str | None
tier_counts: dict[str, int]
stop_reason: str | None
duration_seconds: float | None
strategist_name: str | None
ValidationFinding
class ValidationFinding:
finding_id: str
exec_id: str | None
pipeline_run_id: str | None
domain: str
spec_id: str
validator: str
stage: str # "pre_execute" | "post_execute" | "honest_eval"
severity: str # "info" | "low" | "medium" | "high" | "blocker"
detail: str | None
payload: dict
created_at: str
is_blocker: bool # property
CorrelationMatch
class CorrelationMatch:
spec_id: str
domain: str
pipeline_run_id: str | None
matched_validators: list[str]
findings: list[ValidationFinding]
ApiKey
class ApiKey:
key_id: str
name: str
key_prefix: str # e.g. "gn_live_a3"
last_4: str # e.g. "X9F2"
scopes: list[str]
created_at: str
revoked_at: str | None
is_active: bool
def to_display(self) -> str # "gn_live_a3…X9F2"
Exceptions
All raised exceptions inherit from GnosysError. Catch the base
class for coarse "did the API call work?" checks; catch the
narrower subclass to react specifically.
| Exception | HTTP status | Meaning |
|---|---|---|
AuthenticationError |
401 | Missing / malformed / revoked key |
ForbiddenError |
403 | Tenant suspended |
NotFoundError |
404 | Wrong run_id, or another tenant's |
ValidationError |
400 / 422 | Bad request payload |
RateLimitError |
429 | Burst rate limit; check .retry_after |
ServerError |
5xx | Persistent server-side fault after retries |
TimeoutError |
— | client.runs.wait exceeded timeout |
from gnosys import RateLimitError
try:
client.runs.create(...)
except RateLimitError as exc:
time.sleep(exc.retry_after or 60)
Async
from gnosys import AsyncGnosysClient
async def main():
async with AsyncGnosysClient(api_key="gn_live_...") as client:
run = await client.runs.create(...)
run = await client.runs.wait(run.run_id)
findings = await client.findings.list(run_id=run.run_id)
The async client is the implementation; the sync GnosysClient
wraps it on a private event loop. They share the same retry,
auth, and error-mapping behaviour.
Versioning
The SDK follows semver. Backwards-incompatible changes bump the major version. Server-side schema changes are forward-compatible — pydantic models are configured to allow extra fields, so an old client against a new server still works modulo new functionality being unreachable.
Changelog
See the changelog for release-by-release diffs.
Found a typo? Tell us.