Run inline
Durable run inline is a step that can be used run code inline in a durable way. The step is executed inline, the command is not suspended if the step succeeds. If a step fails, the code is retried according to the retry policy, whether or not the command is suspended after failure can be configured.
Inline run steps can be considered as milestones in a long-running command or as a way to run code that is not idempotent.
API reference:
Required scopes:
Data/Buildby.Taskurai/steps/create
Optional scopes:
Data/Buildby.Taskurai/sensitive/read: Can return sensitive data.Data/Buildby.Taskurai/secrets/read: Can read global secrets.
Prerequisites
- You have completed the setup of a worker in Taskurai, see Workers and Commands.
Initializing a step
To initialize a step, either init or initAsync can be used to initialize a step. The initialization of the step only occurs once and the initialization data can be used as deterministic arguments for the step.
If the task already contains arguments that are passed through to the step as-is, they do not need to be repeated in the step initialization.
The initialization should only include arguments that are newly introduced or transformed for the step.
Run delegate
The run delegate can either be a sync or async delegate.
Example
Example:
- C#
[Command]
public async Task<TaskCommandResult> ValidateIpAgainstRemoteList(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string ipv4)
{
Logger.LogInformation("Validating IP against remote list...");
if (string.IsNullOrWhiteSpace(ipv4))
{
return Failed(HttpStatusCode.BadRequest, "Invalid IP address");
}
var blockedIps = await context.RunInlineAsync<List<string>>(
id: "fetch-blocked-ip-list",
run: async (TaskuraiStepContext stepContext, CancellationToken ct) =>
{
var httpClient = _httpClientFactory.CreateClient();
var response = await httpClient.GetAsync(
"https://example.com/blocked-ips.txt",
ct);
response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync(ct);
return content
.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.Select(ip => ip.Trim())
.Where(ip => !string.IsNullOrWhiteSpace(ip))
.ToList();
}
);
if (blockedIps.Contains(ipv4, StringComparer.OrdinalIgnoreCase))
{
return Failed(HttpStatusCode.Forbidden, $"IP '{ipv4}' is blocked.");
}
return Succeeded(
result: new ResultResponse()
{
Messages =
{
new ResultMessage()
{
Code = "200",
Message = $"IP '{ipv4}' is allowed."
}
}
}
);
}
Arguments mapping
Commands can receive arguments from the task payload. Arguments can be of any serializable type.
Standard arguments:
- TaskuraiStepContext: The step context.
- CancellationToken: The cancellation token.
Other arguments are mapped by name from the task payload and are can have the following types:
- Serializable type: Must match the type of the task argument.
- StepArgument: Argument is returned as the
StepArgument. - StateReference: The
StateReferenceis returned (if available on the argument).
To override the default argument mapping by name, use the [FromArgument] attribute.
Example:
- C#
var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
if (dailySales == null)
{
return 0;
}
return dailySales.Sum();
}
);
Accessing step arguments
Step information can be accessed easily using the step context.
- C#
var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: async (TaskuraiStepContext context, CancellationToken cancellationToken) =>
{
await context.ProgressAsync(10, "Calculating daily sales...");
var step = context.Step;
List<double> dailySales2 = step.Arguments.GetValue<List<double>>("dailySales");
if (dailySales == null)
{
return 0;
}
await context.ProgressAsync(100, "Calculating daily sales done.");
return dailySales.Sum();
}
);
Reporting progress
Sending a heartbeat event
While running a step, it is recommended to send heartbeat events for long running steps. Other processes can check the progress in the step record.
- C#
await context.ProgressAsync();
Progress, heartbeat
While running a step, it is possible to report on progress and send a heartbeat event and the same time.
- C#
await context.ProgressAsync(
percentage: 10,
message: $"Requesting ocr for invoice {upload.Id}...");
Completing a step
Each must be completed in Taskurai with a final state. The final state can be a success, failure, or canceled state.
Completing with serializable return types
Commands are completed by either returning a serializable type (Succeeded) or by throwing an exception (Failed).
Example:
- C#
var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
if (dailySales == null)
{
throw new ArgumentNullException(nameof(dailySales), "No daily sales data available.");
}
return dailySales.Sum();
}
);
Completing with StepCommandResult
- C#
var sumDailySales2 = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: async (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
await context.ProgressAsync(10, "Calculating daily sales...");
if (dailySales == null)
{
return context.Failed(HttpStatusCode.BadRequest, "No daily sales data available.");
}
return context.Succeeded(
progress: new Progress()
{
Percentage = 100,
Message = "Calculating daily sales done."
},
result: new ResultResponse()
{
Outputs = {
new ResultOutput()
{
Name = "sumDailySales",
Value = dailySales.Sum()
}
}
}
);
}
);