Skip to main content

Orchestrator

The orchestrator package provides DAG-based task orchestration on top of the OSAPI SDK client. Define tasks with dependencies and the library handles execution order, parallelism, conditional logic, and reporting.

Quick Start

import (
"github.com/retr0h/osapi/pkg/sdk/orchestrator"
"github.com/retr0h/osapi/pkg/sdk/client"
)

c := client.New("http://localhost:8080", "your-jwt-token")
plan := orchestrator.NewPlan(c)

health := plan.TaskFunc("check-health",
func(
ctx context.Context,
c *client.Client,
) (*orchestrator.Result, error) {
_, err := c.Health.Liveness(ctx)
return &orchestrator.Result{Changed: false}, err
},
)

hostname := plan.TaskFunc("get-hostname",
func(
ctx context.Context,
c *client.Client,
) (*orchestrator.Result, error) {
resp, err := c.Node.Hostname(ctx, "_any")
if err != nil {
return nil, err
}

return orchestrator.CollectionResult(
resp.Data,
func(r client.HostnameResult) orchestrator.HostResult {
return orchestrator.HostResult{
Hostname: r.Hostname,
Changed: r.Changed,
Error: r.Error,
}
},
), nil
},
)
hostname.DependsOn(health)

report, err := plan.Run(context.Background())

Operations

Operations are the building blocks of orchestration plans. Each operation maps to an OSAPI job type that agents execute.

OperationDescriptionIdempotentCategory
command.exec.executeExecute a commandNoCommand
command.shell.executeExecute a shell stringNoCommand
file.deploy.executeDeploy file to agentYesFile
file.status.getCheck file statusRead-onlyFile
file.uploadUpload to Object StoreYesFile
network.dns.getGet DNS configurationRead-onlyNetwork
network.dns.updateUpdate DNS serversYesNetwork
network.ping.doPing a hostRead-onlyNetwork
node.hostname.getGet system hostnameRead-onlyNode
node.status.getGet node statusRead-onlyNode
node.disk.getGet disk usageRead-onlyNode
node.memory.getGet memory statsRead-onlyNode
node.uptime.getGet system uptimeRead-onlyNode
node.load.getGet load averagesRead-onlyNode
docker.create.executeCreate a containerNoDocker
docker.list.getList containersRead-onlyDocker
docker.inspect.getInspect a containerRead-onlyDocker
docker.start.executeStart a containerNoDocker
docker.stop.executeStop a containerNoDocker
docker.remove.executeRemove a containerNoDocker
docker.exec.executeExec in a containerNoDocker
docker.pull.executePull a container imageNoDocker

Idempotency

  • Read-only operations never modify state and always return Changed: false.
  • Idempotent write operations check current state before mutating and return Changed: true only if something actually changed.
  • Non-idempotent operations (command exec/shell) always return Changed: true. Use guards (When, OnlyIfChanged) to control when they run.

Hooks

Register callbacks to control logging and progress at every stage:

hooks := orchestrator.Hooks{
BeforePlan: func(summary orchestrator.PlanSummary) { ... },
AfterPlan: func(report *orchestrator.Report) { ... },
BeforeLevel: func(level int, tasks []*orchestrator.Task, parallel bool) { ... },
AfterLevel: func(level int, results []orchestrator.TaskResult) { ... },
BeforeTask: func(task *orchestrator.Task) { ... },
AfterTask: func(task *orchestrator.Task, result orchestrator.TaskResult) { ... },
OnRetry: func(task *orchestrator.Task, attempt int, err error) { ... },
OnSkip: func(task *orchestrator.Task, reason string) { ... },
}

plan := orchestrator.NewPlan(client, orchestrator.WithHooks(hooks))

The SDK performs no logging — hooks are the only output mechanism. Consumers bring their own formatting.

Error Strategies

StrategyBehavior
StopAll (default)Fail fast, cancel everything
ContinueSkip dependents, keep running independent tasks
Retry(n)Retry n times immediately before failing
Retry(n, WithRetryBackoff(...))Retry n times with exponential backoff

Strategies can be set at plan level or overridden per-task:

plan := orchestrator.NewPlan(client, orchestrator.OnError(orchestrator.Continue))
task.OnError(orchestrator.Retry(3)) // immediate
task.OnError(orchestrator.Retry(3, orchestrator.WithRetryBackoff(1*time.Second, 30*time.Second))) // backoff

Result Types

Result

The Result struct returned by task functions:

FieldTypeDescription
ChangedboolWhether the operation modified state
Datamap[string]anyOperation-specific response data
StatusStatusTerminal status (changed, unchanged)
HostResults[]HostResultPer-host results for broadcast operations

TaskResult

The TaskResult struct provided to AfterTask hooks and in Report.Tasks:

FieldTypeDescription
NamestringTask name
StatusStatusTerminal status
ChangedboolWhether the operation reported changes
Durationtime.DurationExecution time
ErrorerrorError if task failed; nil on success
Datamap[string]anyOperation response data for post-run access

HostResult

Per-host data for broadcast operations (targeting _all or label selectors):

FieldTypeDescription
HostnamestringAgent hostname
ChangedboolWhether this host reported changes
ErrorstringError message; empty on success
Datamap[string]anyHost-specific response data

Bridge Helpers

Two helpers simplify converting SDK client responses into orchestrator Result values.

CollectionResult

CollectionResult converts a collection response (any SDK call that returns per-host results) into an orchestrator Result with populated HostResults. Use it for most operations:

return orchestrator.CollectionResult(
resp.Data,
func(r client.HostnameResult) orchestrator.HostResult {
return orchestrator.HostResult{
Hostname: r.Hostname,
Changed: r.Changed,
Error: r.Error,
}
},
), nil

The first argument is the SDK response data (which must have a Results field). The second is a mapper function that converts each per-host result into an orchestrator.HostResult.

StructToMap

StructToMap converts a Go struct into a map[string]any using JSON round-tripping. Use it for non-collection responses where you want to store the full response in Result.Data:

return &orchestrator.Result{
JobID: resp.Data.JobID,
Changed: resp.Data.Changed,
Data: orchestrator.StructToMap(resp.Data),
}, nil

TaskFuncWithResults

Use TaskFuncWithResults when a task needs to read results from prior tasks:

summarize := plan.TaskFuncWithResults(
"summarize",
func(
ctx context.Context,
client *client.Client,
results orchestrator.Results,
) (*orchestrator.Result, error) {
r := results.Get("get-hostname")
hostname := r.Data["hostname"].(string)

return &orchestrator.Result{
Changed: true,
Data: map[string]any{"summary": hostname},
}, nil
},
)
summarize.DependsOn(getHostname)

Unlike TaskFunc, the function receives the Results map containing completed dependency outputs.

Features

FeatureDescription
Basic PlansTasks, dependencies, and execution
Task FunctionsCustom Go logic with TaskFunc
Parallel ExecutionConcurrent tasks at the same level
GuardsConditional execution with When
Only If ChangedSkip unless dependencies changed
Lifecycle HooksCallbacks at every execution stage
Error StrategiesStopAll, Continue, and Retry
Failure RecoveryRecovery tasks on upstream failure
RetryAutomatic retry on failure
BroadcastMulti-host targeting and HostResults
File DeploymentUpload, deploy, and verify workflow
Result DecodePost-run and inter-task data access
IntrospectionExplain, Levels, and Validate
Container TargetingRun providers inside containers

Examples

See the orchestrator examples for runnable demonstrations of each feature.