Orchestrator
The orchestrator package provides DAG-based task orchestration on top of the
OSAPI SDK client. Define tasks with dependencies and the library handles
execution order, parallelism, conditional logic, and reporting.
Quick Start
import (
"github.com/retr0h/osapi/pkg/sdk/orchestrator"
"github.com/retr0h/osapi/pkg/sdk/client"
)
c := client.New("http://localhost:8080", "your-jwt-token")
plan := orchestrator.NewPlan(c)
health := plan.TaskFunc("check-health",
func(
ctx context.Context,
c *client.Client,
) (*orchestrator.Result, error) {
_, err := c.Health.Liveness(ctx)
return &orchestrator.Result{Changed: false}, err
},
)
hostname := plan.TaskFunc("get-hostname",
func(
ctx context.Context,
c *client.Client,
) (*orchestrator.Result, error) {
resp, err := c.Node.Hostname(ctx, "_any")
if err != nil {
return nil, err
}
return orchestrator.CollectionResult(
resp.Data,
func(r client.HostnameResult) orchestrator.HostResult {
return orchestrator.HostResult{
Hostname: r.Hostname,
Changed: r.Changed,
Error: r.Error,
}
},
), nil
},
)
hostname.DependsOn(health)
report, err := plan.Run(context.Background())
Operations
Operations are the building blocks of orchestration plans. Each operation maps to an OSAPI job type that agents execute.
| Operation | Description | Idempotent | Category |
|---|---|---|---|
command.exec.execute | Execute a command | No | Command |
command.shell.execute | Execute a shell string | No | Command |
file.deploy.execute | Deploy file to agent | Yes | File |
file.status.get | Check file status | Read-only | File |
file.upload | Upload to Object Store | Yes | File |
network.dns.get | Get DNS configuration | Read-only | Network |
network.dns.update | Update DNS servers | Yes | Network |
network.ping.do | Ping a host | Read-only | Network |
node.hostname.get | Get system hostname | Read-only | Node |
node.status.get | Get node status | Read-only | Node |
node.disk.get | Get disk usage | Read-only | Node |
node.memory.get | Get memory stats | Read-only | Node |
node.uptime.get | Get system uptime | Read-only | Node |
node.load.get | Get load averages | Read-only | Node |
docker.create.execute | Create a container | No | Docker |
docker.list.get | List containers | Read-only | Docker |
docker.inspect.get | Inspect a container | Read-only | Docker |
docker.start.execute | Start a container | No | Docker |
docker.stop.execute | Stop a container | No | Docker |
docker.remove.execute | Remove a container | No | Docker |
docker.exec.execute | Exec in a container | No | Docker |
docker.pull.execute | Pull a container image | No | Docker |
Idempotency
- Read-only operations never modify state and always return
Changed: false. - Idempotent write operations check current state before mutating and return
Changed: trueonly if something actually changed. - Non-idempotent operations (command exec/shell) always return
Changed: true. Use guards (When,OnlyIfChanged) to control when they run.
Hooks
Register callbacks to control logging and progress at every stage:
hooks := orchestrator.Hooks{
BeforePlan: func(summary orchestrator.PlanSummary) { ... },
AfterPlan: func(report *orchestrator.Report) { ... },
BeforeLevel: func(level int, tasks []*orchestrator.Task, parallel bool) { ... },
AfterLevel: func(level int, results []orchestrator.TaskResult) { ... },
BeforeTask: func(task *orchestrator.Task) { ... },
AfterTask: func(task *orchestrator.Task, result orchestrator.TaskResult) { ... },
OnRetry: func(task *orchestrator.Task, attempt int, err error) { ... },
OnSkip: func(task *orchestrator.Task, reason string) { ... },
}
plan := orchestrator.NewPlan(client, orchestrator.WithHooks(hooks))
The SDK performs no logging — hooks are the only output mechanism. Consumers bring their own formatting.
Error Strategies
| Strategy | Behavior |
|---|---|
StopAll (default) | Fail fast, cancel everything |
Continue | Skip dependents, keep running independent tasks |
Retry(n) | Retry n times immediately before failing |
Retry(n, WithRetryBackoff(...)) | Retry n times with exponential backoff |
Strategies can be set at plan level or overridden per-task:
plan := orchestrator.NewPlan(client, orchestrator.OnError(orchestrator.Continue))
task.OnError(orchestrator.Retry(3)) // immediate
task.OnError(orchestrator.Retry(3, orchestrator.WithRetryBackoff(1*time.Second, 30*time.Second))) // backoff
Result Types
Result
The Result struct returned by task functions:
| Field | Type | Description |
|---|---|---|
Changed | bool | Whether the operation modified state |
Data | map[string]any | Operation-specific response data |
Status | Status | Terminal status (changed, unchanged) |
HostResults | []HostResult | Per-host results for broadcast operations |
TaskResult
The TaskResult struct provided to AfterTask hooks and in Report.Tasks:
| Field | Type | Description |
|---|---|---|
Name | string | Task name |
Status | Status | Terminal status |
Changed | bool | Whether the operation reported changes |
Duration | time.Duration | Execution time |
Error | error | Error if task failed; nil on success |
Data | map[string]any | Operation response data for post-run access |
HostResult
Per-host data for broadcast operations (targeting _all or label selectors):
| Field | Type | Description |
|---|---|---|
Hostname | string | Agent hostname |
Changed | bool | Whether this host reported changes |
Error | string | Error message; empty on success |
Data | map[string]any | Host-specific response data |
Bridge Helpers
Two helpers simplify converting SDK client responses into orchestrator Result
values.
CollectionResult
CollectionResult converts a collection response (any SDK call that returns
per-host results) into an orchestrator Result with populated HostResults.
Use it for most operations:
return orchestrator.CollectionResult(
resp.Data,
func(r client.HostnameResult) orchestrator.HostResult {
return orchestrator.HostResult{
Hostname: r.Hostname,
Changed: r.Changed,
Error: r.Error,
}
},
), nil
The first argument is the SDK response data (which must have a Results field).
The second is a mapper function that converts each per-host result into an
orchestrator.HostResult.
StructToMap
StructToMap converts a Go struct into a map[string]any using JSON
round-tripping. Use it for non-collection responses where you want to store the
full response in Result.Data:
return &orchestrator.Result{
JobID: resp.Data.JobID,
Changed: resp.Data.Changed,
Data: orchestrator.StructToMap(resp.Data),
}, nil
TaskFuncWithResults
Use TaskFuncWithResults when a task needs to read results from prior tasks:
summarize := plan.TaskFuncWithResults(
"summarize",
func(
ctx context.Context,
client *client.Client,
results orchestrator.Results,
) (*orchestrator.Result, error) {
r := results.Get("get-hostname")
hostname := r.Data["hostname"].(string)
return &orchestrator.Result{
Changed: true,
Data: map[string]any{"summary": hostname},
}, nil
},
)
summarize.DependsOn(getHostname)
Unlike TaskFunc, the function receives the Results map containing completed
dependency outputs.
Features
| Feature | Description |
|---|---|
| Basic Plans | Tasks, dependencies, and execution |
| Task Functions | Custom Go logic with TaskFunc |
| Parallel Execution | Concurrent tasks at the same level |
| Guards | Conditional execution with When |
| Only If Changed | Skip unless dependencies changed |
| Lifecycle Hooks | Callbacks at every execution stage |
| Error Strategies | StopAll, Continue, and Retry |
| Failure Recovery | Recovery tasks on upstream failure |
| Retry | Automatic retry on failure |
| Broadcast | Multi-host targeting and HostResults |
| File Deployment | Upload, deploy, and verify workflow |
| Result Decode | Post-run and inter-task data access |
| Introspection | Explain, Levels, and Validate |
| Container Targeting | Run providers inside containers |
Examples
See the orchestrator examples for runnable demonstrations of each feature.