Skip to main content

Run inline

Durable run inline is a step that can be used run code inline in a durable way. The step is executed inline, the command is not suspended if the step succeeds. If a step fails, the code is retried according to the retry policy, whether or not the command is suspended after failure can be configured.

Inline run steps can be considered as milestones in a long-running command or as a way to run code that is not idempotent.

API reference:

Required scopes:

  • Data/Buildby.Taskurai/steps/create

Optional scopes:

  • Data/Buildby.Taskurai/sensitive/read: Can return sensitive data.
  • Data/Buildby.Taskurai/secrets/read: Can read global secrets.

Prerequisites

Initializing a step

To initialize a step, either init or initAsync can be used to initialize a step. The initialization of the step only occurs once and the initialization data can be used as deterministic arguments for the step.

info

If the task already contains arguments that are passed through to the step as-is, they do not need to be repeated in the step initialization.

The initialization should only include arguments that are newly introduced or transformed for the step.

Run delegate

The run delegate can either be a sync or async delegate.

Example

Example:

[Command]
public async Task<TaskCommandResult> ValidateIpAgainstRemoteList(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string ipv4)
{
Logger.LogInformation("Validating IP against remote list...");

if (string.IsNullOrWhiteSpace(ipv4))
{
return Failed(HttpStatusCode.BadRequest, "Invalid IP address");
}

var blockedIps = await context.RunInlineAsync<List<string>>(
id: "fetch-blocked-ip-list",
run: async (TaskuraiStepContext stepContext, CancellationToken ct) =>
{
var httpClient = _httpClientFactory.CreateClient();

var response = await httpClient.GetAsync(
"https://example.com/blocked-ips.txt",
ct);

response.EnsureSuccessStatusCode();

var content = await response.Content.ReadAsStringAsync(ct);

return content
.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.Select(ip => ip.Trim())
.Where(ip => !string.IsNullOrWhiteSpace(ip))
.ToList();
}
);

if (blockedIps.Contains(ipv4, StringComparer.OrdinalIgnoreCase))
{
return Failed(HttpStatusCode.Forbidden, $"IP '{ipv4}' is blocked.");
}

return Succeeded(
result: new ResultResponse()
{
Messages =
{
new ResultMessage()
{
Code = "200",
Message = $"IP '{ipv4}' is allowed."
}
}
}
);
}

Arguments mapping

Commands can receive arguments from the task payload. Arguments can be of any serializable type.

Standard arguments:

Other arguments are mapped by name from the task payload and are can have the following types:

  • Serializable type: Must match the type of the task argument.
  • StepArgument: Argument is returned as the StepArgument.
  • StateReference: The StateReference is returned (if available on the argument).

To override the default argument mapping by name, use the [FromArgument] attribute.

Example:

var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
if (dailySales == null)
{
return 0;
}
return dailySales.Sum();
}
);

Accessing step arguments

Step information can be accessed easily using the step context.

var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: async (TaskuraiStepContext context, CancellationToken cancellationToken) =>
{
await context.ProgressAsync(10, "Calculating daily sales...");

var step = context.Step;

List<double> dailySales2 = step.Arguments.GetValue<List<double>>("dailySales");

if (dailySales == null)
{
return 0;
}

await context.ProgressAsync(100, "Calculating daily sales done.");

return dailySales.Sum();
}
);

Reporting progress

Sending a heartbeat event

While running a step, it is recommended to send heartbeat events for long running steps. Other processes can check the progress in the step record.

await context.ProgressAsync();

Progress, heartbeat

While running a step, it is possible to report on progress and send a heartbeat event and the same time.

await context.ProgressAsync(
percentage: 10,
message: $"Requesting ocr for invoice {upload.Id}...");

Completing a step

Each must be completed in Taskurai with a final state. The final state can be a success, failure, or canceled state.

Completing with serializable return types

Commands are completed by either returning a serializable type (Succeeded) or by throwing an exception (Failed).

Example:

var sumDailySales = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
if (dailySales == null)
{
throw new ArgumentNullException(nameof(dailySales), "No daily sales data available.");
}
return dailySales.Sum();
}
);

Completing with StepCommandResult

var sumDailySales2 = await context.RunInlineAsync<double>(
id: "calculate-daily-sales-step",
init: (response, arguments) =>
{
arguments.Add(new StepArgument("dailySales")
{
Value = dailySales
});
},
run: async (TaskuraiStepContext context, CancellationToken cancellationToken, List<double> dailySales) =>
{
await context.ProgressAsync(10, "Calculating daily sales...");

if (dailySales == null)
{
return context.Failed(HttpStatusCode.BadRequest, "No daily sales data available.");
}

return context.Succeeded(
progress: new Progress()
{
Percentage = 100,
Message = "Calculating daily sales done."
},
result: new ResultResponse()
{
Outputs = {
new ResultOutput()
{
Name = "sumDailySales",
Value = dailySales.Sum()
}
}
}
);
}
);