{ "cells": [ { "cell_type": "markdown", "id": "d22c78d1", "metadata": {}, "source": [ "# Tasks with multiple outputs\n", "\n", "A single output of a Tierkreis task can contain a complex nested structure, which can be passed as a whole as an input to other tasks. Furthermore a Tierkreis task can contain multiple outputs, which are passed separately as inputs to other tasks.\n", "\n", "On this page we show how to choose between the above two options when writing a worker with the Tierkreis Python library.\n", "\n", "## Tasks returning a single output containing a nested structure\n", "\n", "If a worker task returns a `NamedTuple` then there is a single output containing the nested structure described. For example we could create a worker task as follows" ] }, { "cell_type": "code", "execution_count": null, "id": "2ace4687", "metadata": {}, "outputs": [], "source": [ "from typing import NamedTuple\n", "\n", "from tierkreis import Worker\n", "\n", "worker = Worker(\"multiple_outputs_worker\")\n", "\n", "\n", "class OpaquePoint(NamedTuple):\n", " x: float\n", " y: float\n", " z: float\n", "\n", "\n", "@worker.task()\n", "def new_opaque_point(x: float, y: float, z: float) -> OpaquePoint:\n", " return OpaquePoint(x, y, z)" ] }, { "cell_type": "markdown", "id": "614438f3", "metadata": {}, "source": [ "When we use the worker function in a graph, the return value is of type `TKR[OpaquePoint]` and the attributes `x`, `y` and `z` are not accessible to the graph builder. This can be useful when passing complex nested structures as a whole from one task to another." ] }, { "cell_type": "code", "execution_count": null, "id": "6d2f9739", "metadata": {}, "outputs": [], "source": [ "from tierkreis.builder import Graph\n", "from tierkreis.controller.data.models import TKR\n", "from multiple_outputs_worker import new_opaque_point, OpaquePoint # noqa: F811\n", "\n", "g = Graph(TKR[float], TKR[OpaquePoint])\n", "diag = g.task(new_opaque_point(g.inputs, g.inputs, g.inputs))\n", "\n", "# Uncommenting the following line will give an error.\n", "# diag.x ## Cannot access attribute \"x\" for class \"TKR[OpaquePoint]\"\n", "\n", "workflow = g.finish_with_outputs(diag)" ] }, { "cell_type": "markdown", "id": "af74bc82", "metadata": {}, "source": [ "At runtime the outputs will be a single storage element containing a nested structure:" ] }, { "cell_type": "code", "execution_count": null, "id": "0a6bc180", "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "from uuid import UUID\n", "from tierkreis.storage import FileStorage, read_outputs\n", "from tierkreis.executor import UvExecutor\n", "from tierkreis import run_graph\n", "\n", "storage = FileStorage(UUID(int=400), do_cleanup=True)\n", "executor = UvExecutor(Path().parent / \"example_workers\", storage.logs_path)\n", "run_graph(storage, executor, workflow, 4)\n", "outputs = read_outputs(workflow, storage)\n", "assert outputs == {\"x\": 4, \"y\": 4, \"z\": 4}" ] }, { "cell_type": "markdown", "id": "3ca77bfc", "metadata": {}, "source": [ "## Tasks returning multiple outputs using portmapping\n", "\n", "If we need to deconstruct the output of a task then we decorate the `NamedTuple` with the `@portmapping` decorator. For instance this is necessary when we want to pass different parts of the output to different tasks. In this case we would create a worker task as follows" ] }, { "cell_type": "code", "execution_count": null, "id": "3a786435", "metadata": {}, "outputs": [], "source": [ "from tierkreis.models import portmapping\n", "\n", "\n", "@portmapping\n", "class Point(NamedTuple):\n", " x: float\n", " y: float\n", " z: float\n", "\n", "\n", "@worker.task()\n", "def new_point(x: float, y: float, z: float) -> Point:\n", " return Point(x, y, z)" ] }, { "cell_type": "markdown", "id": "13fd2361", "metadata": {}, "source": [ "When we use `new_point` in a graph, the output has to be de-constructed in order to be passed into other tasks." ] }, { "cell_type": "code", "execution_count": null, "id": "0fb8e201", "metadata": {}, "outputs": [], "source": [ "from tierkreis.builder import Graph\n", "from tierkreis.controller.data.models import TKR\n", "from tierkreis.builtins.stubs import add\n", "from multiple_outputs_worker import new_point # noqa: F811\n", "\n", "g = Graph(TKR[float], TKR[float])\n", "diag = g.task(new_point(g.inputs, g.inputs, g.inputs))\n", "sum = g.task(add(diag.x, g.task(add(diag.y, diag.z))))\n", "workflow = g.finish_with_outputs(sum)" ] }, { "cell_type": "markdown", "id": "5d437303", "metadata": {}, "source": [ "Each of the floats `diag.x`, `diag.y` and `diag.z` will be in separate elements of the storage and so can be passed directly into other tasks." ] }, { "cell_type": "code", "execution_count": null, "id": "4fcdc0ce", "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "from uuid import UUID\n", "from tierkreis.storage import FileStorage, read_outputs\n", "from tierkreis.executor import UvExecutor\n", "from tierkreis import run_graph\n", "\n", "storage = FileStorage(UUID(int=400), do_cleanup=True)\n", "executor = UvExecutor(Path().parent / \"example_workers\", storage.logs_path)\n", "run_graph(storage, executor, workflow, 4)\n", "outputs = read_outputs(workflow, storage)\n", "assert outputs == 12" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.11" } }, "nbformat": 4, "nbformat_minor": 5 }