Custom SerializersΒΆ

By default tierkreis will serialize values to bytes if possible. In some cases it might be preferable to use a specific format.

Tierkreis supports this by using python Annotations to indicate custom de- serializers. A common example is the numpy.ndarray. In this example we will change the serialization type of array is in multiple ways.

We use this in a simple graph simple graph.

%pip install tierkreis numpy scipy
/home/runner/work/tierkreis/tierkreis/.venv/bin/python3: No module named pip
Note: you may need to restart the kernel to use updated packages.
from typing import NamedTuple

from tierkreis.controller.data.models import TKR, OpaqueType

NDArray = OpaqueType["numpy.ndarray"]


class ScipyOutputs(NamedTuple):
    a: TKR[NDArray]
    p: TKR[float]

We use a worker to expose some elementary tasks:

from tierkreis.builder import GraphBuilder
from tierkreis.controller.data.core import EmptyModel

from scipy_worker import (
    add_point,
    eval_point,
    linspace,
    reshape,
    transpose,
)

sample_graph = GraphBuilder(EmptyModel, ScipyOutputs)
onedim = sample_graph.task(linspace(sample_graph.const(0), sample_graph.const(10)))

pointed = sample_graph.task(add_point(onedim, sample_graph.const(0)))
scalar = sample_graph.task(eval_point(pointed))

twodim = sample_graph.task(reshape(onedim, sample_graph.const([5, 10])))
a = sample_graph.task(transpose(twodim))
sample_graph.outputs(ScipyOutputs(a, scalar))

The worker contains the following code to use different serialization methods.

import io
import os
import pickle
from typing import Annotated

import numpy as np

from tierkreis.controller.data.core import Deserializer, Serializer


def save(a: np.ndarray) -> bytes:
    with io.BytesIO() as bs:
        np.save(bs, a)
        return bs.getvalue()


def load(bs: bytes) -> np.ndarray:
    with io.BytesIO() as bi:
        bi.write(bs)
        bi.seek(0)
        return np.load(bi, encoding="bytes")


SER_METHOD = os.environ.get("SER_METHOD")
if SER_METHOD == "dumps":
    ser = Serializer(np.ndarray.dumps)
    deser = Deserializer(pickle.loads)
elif SER_METHOD == "tolist":
    ser = Serializer(np.ndarray.tolist, "json")
    deser = Deserializer(np.array, "json")
elif SER_METHOD == "save":
    ser = Serializer(save)
    deser = Deserializer(load)
else:
    ser = None
    deser = None

NDArray = Annotated[np.ndarray, ser, deser]

It is crucial that the operations are the inverse of each other. For the first run we are going to use the tierkreis defaults (dumps, pickle.loads)

from pathlib import Path
from uuid import UUID

from tierkreis import run_graph
from tierkreis.executor import UvExecutor
from tierkreis.storage import FileStorage, read_outputs

storage = FileStorage(UUID(int=207), do_cleanup=True, name="scipy_graph")
executor = UvExecutor(Path().parent / "example_workers", storage.logs_path)
run_graph(storage, executor, sample_graph, {})

outputs = read_outputs(sample_graph, storage)

Investigating the checkpoints will show the serialized binary.

cd ~/.tierkreis/checkpoints/00000000-0000-0000-0000-0000000000cf`
cat ./-.N2/outputs/value

Now to change the serialization format we have to declare the $SER_METHOD environment variable and rerun the graph, after which we can investigate the file structure again.

os.environ["SER_METHOD"] = "tolist"
storage.clean_graph_files()
run_graph(storage, executor, sample_graph, {})

outputs = read_outputs(sample_graph, storage)