Tasks with multiple outputs¶
A single output of a Tierkreis task can contain a complex nested structure, which can be passed as a whole as an input to other tasks. Furthermore a Tierkreis task can contain multiple outputs, which are passed separately as inputs to other tasks.
On this page we show how to choose between the above two options when writing a worker with the Tierkreis Python library.
Tasks returning a single output containing a nested structure¶
If a worker task returns a NamedTuple then there is a single output containing the nested structure described. For example we could create a worker task as follows
from typing import NamedTuple
from tierkreis import Worker
worker = Worker("multiple_outputs_worker")
class OpaquePoint(NamedTuple):
x: float
y: float
z: float
@worker.task()
def new_opaque_point(x: float, y: float, z: float) -> OpaquePoint:
return OpaquePoint(x, y, z)
When we use the worker function in a graph, the return value is of type TKR[OpaquePoint] and the attributes x, y and z are not accessible to the graph builder. This can be useful when passing complex nested structures as a whole from one task to another.
from tierkreis.builder import Graph
from tierkreis.controller.data.models import TKR
from multiple_outputs_worker import new_opaque_point, OpaquePoint # noqa: F811
g = Graph(TKR[float], TKR[OpaquePoint])
diag = g.task(new_opaque_point(g.inputs, g.inputs, g.inputs))
# Uncommenting the following line will give an error.
# diag.x ## Cannot access attribute "x" for class "TKR[OpaquePoint]"
workflow = g.finish_with_outputs(diag)
At runtime the outputs will be a single storage element containing a nested structure:
from pathlib import Path
from uuid import UUID
from tierkreis.storage import FileStorage, read_outputs
from tierkreis.executor import UvExecutor
from tierkreis import run_graph
storage = FileStorage(UUID(int=400), do_cleanup=True)
executor = UvExecutor(Path().parent / "example_workers", storage.logs_path)
run_graph(storage, executor, workflow, 4)
outputs = read_outputs(workflow, storage)
assert outputs == {"x": 4, "y": 4, "z": 4}
Tasks returning multiple outputs using portmapping¶
If we need to deconstruct the output of a task then we decorate the NamedTuple with the @portmapping decorator. For instance this is necessary when we want to pass different parts of the output to different tasks. In this case we would create a worker task as follows
from tierkreis.models import portmapping
@portmapping
class Point(NamedTuple):
x: float
y: float
z: float
@worker.task()
def new_point(x: float, y: float, z: float) -> Point:
return Point(x, y, z)
When we use new_point in a graph, the output has to be de-constructed in order to be passed into other tasks.
from tierkreis.builder import Graph
from tierkreis.controller.data.models import TKR
from tierkreis.builtins.stubs import add
from multiple_outputs_worker import new_point # noqa: F811
g = Graph(TKR[float], TKR[float])
diag = g.task(new_point(g.inputs, g.inputs, g.inputs))
sum = g.task(add(diag.x, g.task(add(diag.y, diag.z))))
workflow = g.finish_with_outputs(sum)
Each of the floats diag.x, diag.y and diag.z will be in separate elements of the storage and so can be passed directly into other tasks.
from pathlib import Path
from uuid import UUID
from tierkreis.storage import FileStorage, read_outputs
from tierkreis.executor import UvExecutor
from tierkreis import run_graph
storage = FileStorage(UUID(int=400), do_cleanup=True)
executor = UvExecutor(Path().parent / "example_workers", storage.logs_path)
run_graph(storage, executor, workflow, 4)
outputs = read_outputs(workflow, storage)
assert outputs == 12