Skip to main content

Call tasks

Durable call tasks is a step that can be used to create one or more sub tasks and wait for the tasks to be completed. When waiting for tasks, the command can either be suspended or the command can wait for all tasks to be completed. When suspending the command, the step is reporting progress on each task that is completed.

Tasks are created in a durable way, initialization of the tasks only occurs once. All tasks must be created successfully before any task can start. A sub task is completed when it is succeeded or in case of failure, all retries are exhausted.

API reference:

Required scopes:

  • Data/Buildby.Taskurai/steps/create

Optional scopes:

  • Data/Buildby.Taskurai/sensitive/read: Can return sensitive data.
  • Data/Buildby.Taskurai/secrets/read: Can read global secrets.

Prerequisites

Calling multiple tasks (Fan out/fan in)

In this same, two tasks are called at the same time. The progress is reported on each task that is completed.

using System.Net;
using Azure;
using Taskurai.Models;
using Taskurai.Worker;
using System.Text.Json.Serialization;

public class Recipient
{
public string FullName { get; set; }
public string Email { get; set; }
}

public class SendEmailResponse
{
public bool Success { get; set; }

[JsonConverter(typeof(JsonStringEnumConverter))]
public HttpStatusCode? StatusCode { get; set; }

public string StatusDescription { get; set; }
public string ErrorMessage { get; set; }
}

public class DurableStepController: WorkerController
{
[Command]
[Version(1)]
public async Task<TaskCommandResult> StepsSample(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
Recipient approver,
Recipient cfo,
TaskArgument attachment)
{
if (string.IsNullOrWhiteSpace(approver?.FullName))
{
return Failed(
HttpStatusCode.BadRequest,
"Missing approver full name."
);
}

if (string.IsNullOrWhiteSpace(approver?.Email))
{
return Failed(
HttpStatusCode.BadRequest,
"Missing approver email address."
);
}

// Collect daily sales
var dailySales = await context.CallTasksAsync<double>(
id: "collect-daily-sales-step",
init: (response, taskConfigs) =>
{
taskConfigs.Add(new TaskConfig("collectUSSales")
{
Arguments = {
new TaskArgument("state")
{
Value = "CA"
}
}
});

taskConfigs.Add(new TaskConfig("collectEUSales")
{
Arguments = {
new TaskArgument("country")
{
Value = "FR"
}
}
});
},
maxDuration: 120,
progress: (response, progress) => {
Logger.LogInformation($"Progress collect sales step: {progress.Percentage}%, message = '{progress.Message}'.");
}
);

return Succeeded(
$"Successfully completed sample."
);
}

[Command]
public async Task<double> CollectUSSales(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string state)
{
switch (state?.ToLower())
{
case "ca":
return 154221.36;
case "wa":
return 16545.01;
}

throw new Exception($"Invalid state '{state}'.");
}

[Command]
public async Task<double> CollectEUSales(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string country)
{
// Acquire a distributed lock
await using (var distributedLock = await context.AcquireLockAsync("EuSalesLock", "DurableStepController.CollectEUSales", ttl: TimeSpan.FromMinutes(1), timeout: TimeSpan.FromSeconds(30)))
{
// Check if task may start after acquiring lock
context.CheckTaskExpired();

switch (country?.ToLower())
{
case "fr":
return 65421.51;
case "be":
return 5462.24;
}

throw new Exception($"Invalid country '{country}'.");
}
}
}

Calling a single task (Command chaining)

It is possible to call one task at a time, waiting for the task to be completed before calling the next task.

using System.Net;
using Azure;
using Taskurai.Models;
using Taskurai.Worker;
using System.Text.Json.Serialization;

public class DurableStepController: WorkerController
{
[Command]
[Version(1)]
public async Task<TaskCommandResult> StepsSample(
TaskuraiTaskContext context,
CancellationToken cancellationToken)
{
// Collect daily US sales
var dailyUsSales = await context.CallTaskAsync<double>(
id: "collect-us-sales-step",
init: (response) => new TaskConfig("collectUSSales")
{
Arguments = {
new TaskArgument("state")
{
Value = "CA"
}
}
}
);

// Collect daily EU sales
var dailyEuSales = await context.CallTaskAsync<double>(
id: "collect-eu-sales-step",
init: (response) => new TaskConfig("collectEUSales")
{
Arguments = {
new TaskArgument("country")
{
Value = "FR"
}
}
}
);

var sales = new List<double> { dailyUsSales, dailyEuSales };

var totalSales = await context.CallTaskAsync<double>(
id: "calculate-total-sales-step",
init: (response) => new TaskConfig("calculateTotalSales")
{
Arguments = {
new TaskArgument("sales")
{
Value = sales
}
}
}
);

return Succeeded(totalSales);
}

[Command]
public double CollectUSSales(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string state)
{
switch (state?.ToLower())
{
case "ca":
return 154221.36;
case "wa":
return 16545.01;
}

throw new Exception($"Invalid state '{state}'.");
}

[Command]
public double CollectEUSales(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
string country)
{
switch (country?.ToLower())
{
case "fr":
return 65421.51;
case "be":
return 5462.24;
}

throw new Exception($"Invalid country '{country}'.");
}

[Command]
public double CalculateTotalSales(
TaskuraiTaskContext context,
CancellationToken cancellationToken,
List<double> sales)
{
if (sales == null)
{
throw new ArgumentNullException(nameof(sales));
}

return sales.Sum();
}
}