Custom SerializersΒΆ
By default tierkreis will serialize values to bytes if possible. In some cases it might be preferable to use a specific format.
Tierkreis supports this by using python Annotations to indicate custom de- serializers.
A common example is the numpy.ndarray.
In this example we will change the serialization type of array is in multiple ways.
We use this in a simple graph simple graph.
%pip install tierkreis numpy scipy
/home/runner/work/tierkreis/tierkreis/.venv/bin/python3: No module named pip
Note: you may need to restart the kernel to use updated packages.
from typing import NamedTuple
from tierkreis.controller.data.models import TKR, OpaqueType
NDArray = OpaqueType["numpy.ndarray"]
class ScipyOutputs(NamedTuple):
a: TKR[NDArray]
p: TKR[float]
We use a worker to expose some elementary tasks:
from tierkreis.builder import GraphBuilder
from tierkreis.controller.data.core import EmptyModel
from scipy_worker import (
add_point,
eval_point,
linspace,
reshape,
transpose,
)
sample_graph = GraphBuilder(EmptyModel, ScipyOutputs)
onedim = sample_graph.task(linspace(sample_graph.const(0), sample_graph.const(10)))
pointed = sample_graph.task(add_point(onedim, sample_graph.const(0)))
scalar = sample_graph.task(eval_point(pointed))
twodim = sample_graph.task(reshape(onedim, sample_graph.const([5, 10])))
a = sample_graph.task(transpose(twodim))
sample_graph.outputs(ScipyOutputs(a, scalar))
The worker contains the following code to use different serialization methods.
import io
import os
import pickle
from typing import Annotated
import numpy as np
from tierkreis.controller.data.core import Deserializer, Serializer
def save(a: np.ndarray) -> bytes:
with io.BytesIO() as bs:
np.save(bs, a)
return bs.getvalue()
def load(bs: bytes) -> np.ndarray:
with io.BytesIO() as bi:
bi.write(bs)
bi.seek(0)
return np.load(bi, encoding="bytes")
SER_METHOD = os.environ.get("SER_METHOD")
if SER_METHOD == "dumps":
ser = Serializer(np.ndarray.dumps)
deser = Deserializer(pickle.loads)
elif SER_METHOD == "tolist":
ser = Serializer(np.ndarray.tolist, "json")
deser = Deserializer(np.array, "json")
elif SER_METHOD == "save":
ser = Serializer(save)
deser = Deserializer(load)
else:
ser = None
deser = None
NDArray = Annotated[np.ndarray, ser, deser]
It is crucial that the operations are the inverse of each other.
For the first run we are going to use the tierkreis defaults (dumps, pickle.loads)
from pathlib import Path
from uuid import UUID
from tierkreis import run_graph
from tierkreis.executor import UvExecutor
from tierkreis.storage import FileStorage, read_outputs
storage = FileStorage(UUID(int=207), do_cleanup=True, name="scipy_graph")
executor = UvExecutor(Path().parent / "example_workers", storage.logs_path)
run_graph(storage, executor, sample_graph, {})
outputs = read_outputs(sample_graph, storage)
Investigating the checkpoints will show the serialized binary.
cd ~/.tierkreis/checkpoints/00000000-0000-0000-0000-0000000000cf`
cat ./-.N2/outputs/value
Now to change the serialization format we have to declare the $SER_METHOD environment variable and rerun the graph, after which we can investigate the file structure again.
os.environ["SER_METHOD"] = "tolist"
storage.clean_graph_files()
run_graph(storage, executor, sample_graph, {})
outputs = read_outputs(sample_graph, storage)