{ "cells": [ { "cell_type": "markdown", "id": "30f14c09", "metadata": {}, "source": [ "# Custom Serializers\n", "\n", "By default tierkreis will serialize values to bytes if possible.\n", "In some cases it might be preferable to use a specific format.\n", "\n", "Tierkreis supports this by using python `Annotation`s to indicate custom de- serializers.\n", "A common example is the `numpy.ndarray`.\n", "In this example we will change the serialization type of array is in multiple ways.\n", "\n", "We use this in a simple graph simple graph." ] }, { "cell_type": "code", "execution_count": null, "id": "d24594b1", "metadata": {}, "outputs": [], "source": [ "%pip install tierkreis numpy scipy" ] }, { "cell_type": "code", "execution_count": null, "id": "c39f98e0", "metadata": {}, "outputs": [], "source": [ "from typing import NamedTuple\n", "\n", "from tierkreis.controller.data.models import TKR, OpaqueType\n", "\n", "NDArray = OpaqueType[\"numpy.ndarray\"]\n", "\n", "\n", "class ScipyOutputs(NamedTuple):\n", " a: TKR[NDArray]\n", " p: TKR[float]" ] }, { "cell_type": "markdown", "id": "306376b9", "metadata": {}, "source": [ "We use a worker to expose some elementary tasks:" ] }, { "cell_type": "code", "execution_count": null, "id": "cac4ce67", "metadata": {}, "outputs": [], "source": [ "from tierkreis.builder import GraphBuilder\n", "from tierkreis.controller.data.core import EmptyModel\n", "\n", "from scipy_worker import (\n", " add_point,\n", " eval_point,\n", " linspace,\n", " reshape,\n", " transpose,\n", ")\n", "\n", "sample_graph = GraphBuilder(EmptyModel, ScipyOutputs)\n", "onedim = sample_graph.task(linspace(sample_graph.const(0), sample_graph.const(10)))\n", "\n", "pointed = sample_graph.task(add_point(onedim, sample_graph.const(0)))\n", "scalar = sample_graph.task(eval_point(pointed))\n", "\n", "twodim = sample_graph.task(reshape(onedim, sample_graph.const([5, 10])))\n", "a = sample_graph.task(transpose(twodim))\n", "sample_graph.outputs(ScipyOutputs(a, scalar))" ] }, { "cell_type": "markdown", "id": "7ca9e829", "metadata": {}, "source": [ "The worker contains the following code to use different serialization methods." ] }, { "cell_type": "code", "execution_count": null, "id": "74111639", "metadata": {}, "outputs": [], "source": [ "import io\n", "import os\n", "import pickle\n", "from typing import Annotated\n", "\n", "import numpy as np\n", "\n", "from tierkreis.controller.data.core import Deserializer, Serializer\n", "\n", "\n", "def save(a: np.ndarray) -> bytes:\n", " with io.BytesIO() as bs:\n", " np.save(bs, a)\n", " return bs.getvalue()\n", "\n", "\n", "def load(bs: bytes) -> np.ndarray:\n", " with io.BytesIO() as bi:\n", " bi.write(bs)\n", " bi.seek(0)\n", " return np.load(bi, encoding=\"bytes\")\n", "\n", "\n", "SER_METHOD = os.environ.get(\"SER_METHOD\")\n", "if SER_METHOD == \"dumps\":\n", " ser = Serializer(np.ndarray.dumps)\n", " deser = Deserializer(pickle.loads)\n", "elif SER_METHOD == \"tolist\":\n", " ser = Serializer(np.ndarray.tolist, \"json\")\n", " deser = Deserializer(np.array, \"json\")\n", "elif SER_METHOD == \"save\":\n", " ser = Serializer(save)\n", " deser = Deserializer(load)\n", "else:\n", " ser = None\n", " deser = None\n", "\n", "NDArray = Annotated[np.ndarray, ser, deser]" ] }, { "cell_type": "markdown", "id": "75269308", "metadata": {}, "source": [ "It is crucial that the operations are the inverse of each other.\n", "For the first run we are going to use the tierkreis defaults (`dumps`, `pickle.loads`)" ] }, { "cell_type": "code", "execution_count": null, "id": "9fe911c0", "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "from uuid import UUID\n", "\n", "from tierkreis import run_graph\n", "from tierkreis.executor import UvExecutor\n", "from tierkreis.storage import FileStorage, read_outputs\n", "\n", "storage = FileStorage(UUID(int=207), do_cleanup=True, name=\"scipy_graph\")\n", "executor = UvExecutor(Path().parent / \"example_workers\", storage.logs_path)\n", "run_graph(storage, executor, sample_graph, {})\n", "\n", "outputs = read_outputs(sample_graph, storage)" ] }, { "cell_type": "markdown", "id": "df0eb6e5", "metadata": {}, "source": [ "Investigating the checkpoints will show the serialized binary." ] }, { "cell_type": "code", "execution_count": null, "id": "44a370c4", "metadata": { "tags": [ "skip-execution" ], "vscode": { "languageId": "shellscript" } }, "outputs": [], "source": [ "cd ~/.tierkreis/checkpoints/00000000-0000-0000-0000-0000000000cf`\n", "cat ./-.N2/outputs/value" ] }, { "cell_type": "markdown", "id": "a82d7fb3", "metadata": {}, "source": [ "Now to change the serialization format we have to declare the `$SER_METHOD` environment variable and rerun the graph, after which we can investigate the file structure again." ] }, { "cell_type": "code", "execution_count": null, "id": "29d38c75", "metadata": {}, "outputs": [], "source": [ "os.environ[\"SER_METHOD\"] = \"tolist\"\n", "storage.clean_graph_files()\n", "run_graph(storage, executor, sample_graph, {})\n", "\n", "outputs = read_outputs(sample_graph, storage)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.11" } }, "nbformat": 4, "nbformat_minor": 5 }