Workflows
Ready-to-use patterns for common workloads. These apply to project-based custom code jobs. For platform-provided compute primitives, see Sealed Recipes.
By role
| You are… | Pattern | Example |
|---|---|---|
| ML engineer | GPU structured_runner with progress and artifacts | ML Training |
| Data scientist | sweep for grid search, benchmark for comparisons | Benchmarking |
| Backend engineer | structured_runner with fan-out | Agent Swarms |
| AI agent developer | structured_runner + fan-out by agent/prompt | Agent Swarms |
| Quant | map_reduce or chunked structured_runner | Monte Carlo |
| Optimizer | structured_runner + fan_out.items per generation | CMA Optimization |
By intent
| I want to… | Use |
|---|---|
| Run one script once | structured_runner |
| Run code across a list of inputs | structured_runner + fan_out.by |
| Evaluate evolutionary candidates | structured_runner + fan_out.items |
| Train on a GPU | structured_runner + profile: "gpu" |
| Chain stages (train then evaluate) | depends_on |
| Search a parameter grid | sweep |
| Chunk simulations and reduce | map_reduce |
| Compare named strategies | benchmark |
| Submit many jobs at once | POST /api/v1/jobs/batch |
Fan-out by values
{
"type": "structured_runner",
"runner_command": ["python", "evaluate.py"],
"payload": {"items": ["a", "b", "c"], "config": {"n_trials": 80}},
"fan_out": {"by": "items"},
"merge_strategy": "keyed",
"project": "my-project",
"timeout_s": 600
}Fan-out by explicit items
{
"type": "structured_runner",
"runner_command": ["python3", "evaluate.py"],
"payload": {"dataset": "smoke"},
"fan_out": {
"items": [
{"candidate_id": 0, "strategy_b64": "AAA..."},
{"candidate_id": 1, "strategy_b64": "BBB..."}
]
},
"merge_strategy": "collect",
"project": "my-project"
}Chunked range
{
"type": "structured_runner",
"runner_command": ["python", "simulate.py"],
"payload": {"total_seeds": 10000, "strategy": "v3"},
"fan_out": {"chunks": 20, "range_field": "total_seeds", "total": 10000},
"merge_strategy": "collect",
"project": "my-project"
}Parameter sweep (matrix)
{
"type": "structured_runner",
"payload": {
"variants": [{"name": "baseline", "alpha": 0.1}, {"name": "aggressive", "alpha": 0.5}],
"total_seeds": 1000
},
"fan_out": {"matrix": true, "variant_field": "variants", "chunk_field": "total_seeds", "total": 1000, "chunks_per": 5},
"merge_strategy": "collect",
"project": "my-project"
}Multi-stage pipeline (DAG)
base_job = client.submit_structured(
runner_command=["python", "train.py"],
payload={"epochs": 100},
project="my-project",
)
eval_job = client.submit_structured(
runner_command=["python", "evaluate.py"],
payload={"models": ["model_1h", "model_4h"]},
fan_out={"by": "models"},
depends_on=[base_job["id"]],
project="my-project",
)GPU training with progress
# In your runner script:
import json, os, sys
payload = json.load(open(os.environ["COMPUTALOT_TASK_PAYLOAD"]))
resume_state = payload.get("_resume") or {}
start_step = resume_state.get("step", 0)
for step in range(start_step, total_steps):
if step % 1000 == 0:
print(f"COMPUTALOT_PROGRESS:{json.dumps({'step': step, 'loss': loss})}")
sys.stdout.flush(){
"type": "structured_runner",
"runner_command": ["python", "train.py"],
"payload": {"total_steps": 1000000},
"project": "my-project",
"timeout_s": 21600,
"requirements": {"profile": "gpu", "gpu_count": 1, "gpu_memory_mb": 16384},
"checkpointing": {"enabled": true, "resume_from_latest": true}
}External data with artifact cache
data_meta = client.register_artifact(
url="https://storage.example.com/dataset.parquet",
sha256="abcdef...",
filename="dataset.parquet",
)
result = client.run_structured(
runner_command=["python", "process.py"],
payload={
"items": ["a", "b", "c"],
"_artifacts": {"download": {"dataset": data_meta["id"]}},
},
fan_out={"by": "items"},
project="my-project",
)Last updated on