Python SDK
Submit jobs and chain them together programmatically from Python.
Prerequisites
Before using the SDK, make sure you have:
- Python >= 3.10
- anycloud API running — start it with
anycloud api start - Cloud credentials configured — create them with
anycloud credentials new
See Getting Started for full setup instructions.
Install
anycloud has two release channels: stable (promoted, production-ready) and latest (every release, matching the CLI and API latest tags).
# Stable (default)
pip install anycloud-sdk
# Latest (matches the latest CLI and API release)
pip install --pre anycloud-sdk
Bucket operations stream through the local anycloud api server using a credential you've already registered with anycloud credentials new — no extra installs needed.
Quick Start
If you have credentials configured via anycloud credentials new, the SDK picks them up automatically:
import anycloud
ac = anycloud.Client()
job = ac.submit("my-training:latest", gpu="h100:8")
job.wait()
print(job.logs())
Multiple credential sets
When you have more than one credential set, select one by name:
ac = anycloud.Client(credentials="aws-prod")
Explicit cloud config
You can still pass credentials directly — this takes precedence over the credentials file:
from anycloud.types import CloudConfig, AWSCredentials
cc = CloudConfig(
cloudProvider="AWS",
credentials=AWSCredentials(
accessKeyId="AKIA...",
secretAccessKey="...",
),
)
ac = anycloud.Client(cloud_config=cc)
The client connects to your local anycloud API at http://localhost:8080 by default.
Function Decorator
The @anycloud.function() decorator lets you run any Python function remotely. You provide a Docker image, and your repo is cloned at the current commit on the remote VM via git-based code sync. See Deployment Workflows for when to use the decorator vs build+submit.
import anycloud
from anycloud.types import CloudConfig
@anycloud.function(
image="ghcr.io/acme/csp-prod:latest",
gpu="t4:1",
cloud_config=CloudConfig(
credentials="azure2",
vm_type="Standard_NC16as_T4_v3",
spot=True,
disk_size_gb=400,
input_bucket="eval-data",
output_bucket="eval-results",
),
)
def relax(structure_index: str, fmax: float = 0.01):
import os, shutil
from ase.io import read
from ase.optimize import BFGS
from fairchem.core import OCPCalculator
checkpoint_dir = f"/mnt/checkpoint/relax/{structure_index}"
output_dir = f"/mnt/output/relax/{structure_index}"
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
atoms = read("/mnt/input/structures.cif", index=structure_index)
atoms.calc = OCPCalculator(model_name="uma-s-1p1:omc")
opt = BFGS(atoms, trajectory=f"{checkpoint_dir}/relax.traj")
opt.run(fmax=fmax)
shutil.copytree(checkpoint_dir, output_dir, dirs_exist_ok=True)
Submitting jobs
submit() sends the function to run remotely and returns a Job handle immediately. Pass function arguments as positional/keyword args, plus optional id and env overrides:
# Single job
job = relax.submit("0:10", fmax=0.005, id="relax-0to10")
job.wait()
print(job.logs())
# Fan out — submit 27 jobs with custom IDs
args = [f"{i}:{i+10}" for i in range(0, 270, 10)]
ids = [f"relax-{i}to{i+10}" for i in range(0, 270, 10)]
jobs = relax.map(args, ids=ids)
jobs.wait()
Function.map issues all submissions in parallel.
Fanning out with map()
Function.map() submits the decorated function once per item in parallel:
jobs = relax.map(range(0, 270, 10)) # auto-generated IDs
jobs.wait()
# With custom IDs (parallel list)
args = [f"{i}:{i+10}" for i in range(0, 270, 10)]
ids = [f"relax-{i}to{i+10}" for i in range(0, 270, 10)]
jobs = relax.map(args, ids=ids)
Each item in args becomes a single positional argument. For multi-argument calls, pass tuples and unpack inside the function, or use submit() in a loop.
Use Function.map when you're fanning out the same decorated function across varying arguments. For heterogeneous configs (different images, clouds, GPUs), use client.submit_many with a list of Submission objects.
Partial failures follow the same rules as submit_many: by default BatchSubmitError is raised with the successful jobs attached; pass return_exceptions=True to receive a mixed-entry JobGroup instead. See submit_many — Partial failures.
Per-submit environment variables
Pass extra env vars per-submit that merge on top of decorator-level env:
job = relax.submit("0:10", env={"WANDB_RUN_ID": "experiment-42"})
Requirements:
- Your code must be in a GitHub repo, committed and pushed
- The Docker image must have
gitinstalled
Decorator Parameters
| Parameter | Type | Description |
|---|---|---|
image | str | Docker image reference (required). Must have git installed. |
gpu | str | GPU type shorthand (e.g. "h100:8"). |
env | dict | Extra environment variables passed to the container. |
secrets | list[str] | Names of saved secrets to inject as env vars. See Secrets. |
cloud_config | CloudConfig | Cloud config for this function's submissions. |
docker_options | dict | Docker runtime options (shmSize, gpus, ipc, etc.). |
target_path | str | Absolute path the repo is cloned into on the remote container. Default "/app". Set this when your image already populates /app with build artifacts. |
submit() Parameters
| Parameter | Type | Description |
|---|---|---|
*args | Positional arguments for the wrapped function. | |
id | str | Custom deployment ID (auto-generated if omitted). Maps to --id. |
env | dict | Per-submit env vars, merged on top of decorator-level env. |
secrets | list[str] | Per-submit secret names, merged on top of decorator-level secrets. |
**kwargs | Keyword arguments for the wrapped function. |
Arguments must be JSON-serializable — only
str,int,float,bool,None,list, anddictare supported. For complex or large data, useinput_bucketinstead.No return values — the decorated function runs remotely and its return value is discarded. Write results to
/mnt/outputviaoutput_bucket.
CloudConfig Parameters
CloudConfig accepts credentials as either a string name (e.g. "azure2", resolved lazily from saved credentials) or a credential object. See CloudConfig Internals for GPU vs VM type, credential resolution, and config variants.
| Parameter | Type | Description |
|---|---|---|
credentials | str | CloudCredentials | Credential name or object. |
cloud_provider | CloudType | Cloud provider (inferred from credentials if omitted). |
vm_type | str | VM type (e.g. "Standard_NC16as_T4_v3"). |
spot | bool | Use spot/preemptible instances. |
region | str | Cloud region. |
availability_zone | str | Availability zone. |
disk_size_gb | int | Disk size in GB. |
input_bucket | str | Input bucket name (mounted at /mnt/input). |
output_bucket | str | Output bucket name (mounted at /mnt/output). |
input_storage_credentials | CloudCredentials | Credentials for input bucket (cross-cloud). |
input_storage_region | str | Region for input bucket storage. |
output_storage_credentials | CloudCredentials | Credentials for output bucket (cross-cloud). |
output_storage_region | str | Region for output bucket storage. |
Runtime Environment
Inside the remote container, the following environment variable is available:
| Variable | Description |
|---|---|
DEPLOYMENT_ID | Unique job ID — use to namespace output paths. |
For mount paths (/mnt/input, /mnt/output, /mnt/checkpoint), see Bucket Sync.
Custom Images
Define Docker images programmatically instead of writing Dockerfiles. Each method returns a new immutable Image, so you can chain freely.
image = (
anycloud.Image("python:3.11", repository="ghcr.io/user/train")
.pip_install("torch", "transformers")
.apt_install("curl")
.env({"HF_HOME": "/root/models"})
.workdir("/app")
)
ac.build(image).wait()
job = ac.submit(image.ref, gpu="h100:8")
job.wait()
image.ref is a deterministic tag derived from the Dockerfile content — same definition always produces the same ref, so unchanged images skip rebuilding.
Creating Images
| Factory | Description |
|---|---|
Image(base, *, repository=None) | Start from any base image (default: "python:3.11"). |
Image.debian_slim(python_version="3.11", *, repository=None) | Start from the official Python slim Debian base. |
Image.from_dockerfile(path, *, repository=None) | Load a Dockerfile from disk. The FROM line becomes the base; remaining lines become steps you can extend with builder methods. |
# Quick default
image = anycloud.Image.debian_slim().pip_install("torch")
# Extend an existing Dockerfile
image = anycloud.Image.from_dockerfile("./Dockerfile").pip_install("extra-dep")
Image Methods
| Method | Dockerfile instruction |
|---|---|
.pip_install(*packages) | RUN pip install --no-cache-dir pkg1 pkg2 ... |
.apt_install(*packages) | RUN apt-get update && apt-get install -y ... && cleanup |
.run_commands(*commands) | RUN cmd1 && cmd2 && ... |
.env({"K": "V"}) | ENV K="V" |
.workdir("/path") | WORKDIR /path |
| Property | Description |
|---|---|
image.ref | Full image reference (repository:content_hash). Requires repository. |
image.to_dockerfile() | Render the full Dockerfile string. |
build() Parameters
| Parameter | Type | Description |
|---|---|---|
image | Image | An Image object defining the Dockerfile to build. |
target_image | str | Registry path to push the built image. Defaults to image.ref when the Image has a repository. |
cloud_config | CloudConfig | Cloud config for the build VM. Falls back to client default. |
deployment_id | str | Custom deployment ID. |
Buckets
Use Bucket objects to manage cloud buckets and wire data between jobs. See Bucket Sync for bucket types and usage patterns. See Deploying Jobs for data chaining and fan-in patterns.
Bucket Handles
ac.bucket() returns a lazy handle — no cloud calls are made until you use it:
data = ac.bucket("training-data")
results = ac.bucket("results")
Upload and Download
data = ac.bucket("training-data")
# Upload local files to the bucket (auto-creates bucket if needed)
data.upload("~/datasets/imagenet")
# Upload to a specific subdirectory
data.upload("~/labels.csv", remote_path="metadata")
# Download bucket contents locally
data.download("~/local-copy")
Job Methods
Client.submit() returns a Job — a future-like handle to a deployment.
| Method | Description |
|---|---|
job.wait(timeout=None, poll_interval=2.0) | Block until terminal state. Returns self on success, raises JobFailedError on failure. |
job.state() | Fetch current DeploymentState from the API. |
job.status() | Full StatusResponse (events, VM health, SSH key). |
job.logs() | Container stdout/stderr. |
job.exec(command, vm=False) | Run a command in the container (or on the VM). Returns stdout. |
job.terminate() | Terminate the deployment. |
job.resubmit() | Resubmit a terminal deployment. Returns a new Job. |
Properties:
| Property | Description |
|---|---|
job.id | Deployment ID. |
job.url | HTTPS URL for server deployments (https://<id>.anycloud.sh). |
JobGroup
JobGroup is returned by client.submit_many(...) and fn.map(...). It cannot be constructed directly.
| Method / Property | Description |
|---|---|
group.wait(timeout=None, poll_interval=2.0) | Block until all jobs finish. Raises JobGroupError if any failed. |
group.terminate() | Terminate all jobs in parallel. |
group.ids | List of deployment IDs (successful submissions only). |
group.errors | [(input_index, exception), ...] — only populated with return_exceptions=True. |
len(group), group[i], for job in group | Sequence protocol. With return_exceptions=True, entries may be Job or Exception. |
Client Methods
| Method | Description |
|---|---|
ac.submit(image, ...) | Submit a job. See parameters below. |
ac.submit_many(submissions, ...) | Submit N jobs in parallel from a list of Submissions. Returns a JobGroup. |
ac.serve(image, ...) | Deploy a long-running server. See parameters below. |
ac.build(image, ...) | Build a Docker image remotely from an Image object. |
ac.bucket(name) | Get a lazy Bucket handle. No cloud calls until use. |
ac.list(limit=20) | List recent deployments. |
ac.get(deployment_id) | Get a Job handle for an existing deployment. |
ac.close() | Close the HTTP connection. |
Client() Parameters
| Parameter | Type | Description |
|---|---|---|
api_url | str | API server URL. Falls back to API_URL env var. Default: http://localhost:8080. |
cloud_config | CloudConfig | Default cloud config for all submits. Takes precedence over credentials file. |
credentials | str | Name of a credential set saved in the local anycloud API database. Auto-selects if only one exists. |
submit() Parameters
| Parameter | Type | Description |
|---|---|---|
image | str | Docker image reference (required). |
cloud_config | CloudConfig | Cloud config (provider, credentials, region, etc.). Falls back to client default. |
gpu | str | GPU type (e.g. "h100:8", "a100:4"). |
env | dict | Environment variables for the container. |
secrets | list[str] | Names of saved secrets to inject as env vars. See Secrets. |
docker_options | dict | Docker runtime options (shmSize, gpus, ipc, etc.). |
command | list[str] | Override container CMD. |
persist | bool | Keep VM alive after job completion. |
deployment_id | str | Custom deployment ID. |
input | Bucket | Bucket to mount at /mnt/input (read-only). |
output | Bucket | Bucket to mount at /mnt/output (write, synced periodically). |
submit() also accepts a Submission object directly when you've built one programmatically:
from anycloud import Submission
s = Submission(image="train:latest", gpu="h100:8")
job = ac.submit(s)
submit_many() Parameters
| Parameter | Type | Description |
|---|---|---|
submissions | list[Submission] | List of submissions to submit in parallel. |
max_workers | int | Thread pool size (defaults to len(list)). |
return_exceptions | bool | If True, return a mixed Job | Exception group instead of raising. Default False. |
Submission accepts the same fields as submit() (image, cloud_config, gpu, env, command, input, output, etc.).
from anycloud import Submission
jobs = ac.submit_many([
Submission(image="features:latest", output=shared),
Submission(image="labels:latest", output=shared),
])
jobs.wait()
Use Submission.replace(**kwargs) to vary one field off a shared base — useful when most of the config is constant across a fan-out:
base = Submission(image="worker:latest", gpu="h100:8", cloud_config=cc)
jobs = ac.submit_many([
base.replace(env={"SHARD": str(i)}) for i in range(8)
])
For fanning out a decorated function across varying arguments, use fn.map() instead.
Partial failures
By default, if any submission fails, submit_many raises BatchSubmitError. Successful submissions are attached to the error so you can still wait on or terminate them:
from anycloud import BatchSubmitError
try:
jobs = ac.submit_many(subs)
except BatchSubmitError as e:
print(f"{len(e.errors)} failed, {len(e.submitted)} running")
for i, exc in e.errors:
print(f" submission {i}: {exc}")
# Optionally keep the successful ones running, or clean up:
for job in e.submitted:
job.terminate()
Pass return_exceptions=True to never raise. You get a JobGroup whose entries are Job or Exception in input order; group.wait(), group.terminate(), and group.ids silently skip the exception entries, and group.errors lists them:
jobs = ac.submit_many(subs, return_exceptions=True)
jobs.wait() # only awaits the successful entries
for idx, exc in jobs.errors:
print(f"submission {idx} failed: {exc}")
Function.map() accepts the same return_exceptions flag and raises the same BatchSubmitError on partial failure.
serve() Parameters
| Parameter | Type | Description |
|---|---|---|
image | str | Docker image reference (required). Container must listen on port 8088. |
cloud_config | CloudConfig | Cloud config (provider, credentials, region, etc.). Falls back to client default. |
gpu | str | GPU type (e.g. "h100:8", "a100:4"). |
env | dict | Environment variables for the container. |
secrets | list[str] | Names of saved secrets to inject as env vars. See Secrets. |
docker_options | dict | Docker runtime options (shmSize, gpus, ipc, etc.). |
command | list[str] | Override container CMD. |
deployment_id | str | Custom deployment ID. |
The returned Job has a job.url property with the HTTPS URL (e.g. https://<id>.anycloud.sh).
Catalog browsing (regions, VM types, pricing, GPUs) is available via the
anycloud CLI: anycloud regions, anycloud vm-types, anycloud pricing,
anycloud gpus.
Credentials Methods
Manage cloud provider credentials stored on the API server.
| Method | Description |
|---|---|
ac.list_credentials() | List saved credentials (secrets redacted). |
ac.get_credential(name) | Get a single credential by name (secrets redacted). |
ac.save_credential(name, credential) | Save a cloud credential set. credential must include cloudProvider and provider-specific fields. |
ac.delete_credential(name) | Delete a saved credential set. |
creds = ac.list_credentials()
ac.save_credential("aws-prod", {
"cloudProvider": "AWS",
"accessKeyId": "AKIA...",
"secretAccessKey": "...",
})
ac.delete_credential("aws-prod")
Secrets Methods
Manage named secrets. See the Secrets guide.
| Method | Description |
|---|---|
ac.list_secrets() | List saved secrets (names and timestamps only — values are never returned). |
ac.create_secret(name, values) | Create or update a secret. values is a dict[str, str] of env-var names to values. |
ac.delete_secret(name, *, force=False) | Delete a secret. Raises ConflictError when non-terminal deployments reference it; pass force=True to override. |
ac.create_secret("hf", {"HF_TOKEN": "hf_xxxx"})
job = ac.submit("train:latest", gpu="h100:8", secrets=["hf"])
ac.delete_secret("hf", force=True)
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
GITHUB_TOKEN | GitHub token for authentication (CI fallback). | — |
API_URL | API server URL. | http://localhost:8080 |
Context Manager
The client can be used as a context manager to automatically close the HTTP connection:
with anycloud.Client() as ac:
job = ac.submit("task:latest")
job.wait()
Error Handling
| Exception | Description |
|---|---|
AnyCloudError | Base exception for all SDK errors. |
APIError | HTTP error from the conductor API. |
ConflictError | Deployment ID already exists (409). |
NotFoundError | Deployment not found (404). |
JobFailedError | Job reached a terminal failure state. .logs has last output. |
JobGroupError | One or more jobs in a group failed during wait(). .errors has all failures. |
BatchSubmitError | One or more submissions in submit_many / fn.map failed. .submitted and .errors have the split. |
TimeoutError | Operation timed out. |
from anycloud import AnyCloudError, JobFailedError
try:
job.wait(timeout=3600)
except JobFailedError as e:
print(f"Job {e.job_id} failed with state: {e.state}")
if e.logs:
print(e.logs)
except AnyCloudError as e:
print(f"SDK error: {e}")