Tierkreis and HPC

Tierkreis supports execution in HPC environments through executors natively. Before we take at a look at the executor lets define a problem to run.

For this example we are using symbolic circuits, which are supported through the tkr_pytket_worker. As before were defining inputs and outputs:

%pip install tierkreis pytket
from typing import NamedTuple

from tierkreis.controller.data.models import TKR, OpaqueType


class SymbolicCircuitsInputs(NamedTuple):
    a: TKR[float]
    b: TKR[float]
    c: TKR[float]
    ansatz: TKR[OpaqueType["pytket._tket.circuit.Circuit"]]  # noqa: F821


class SymbolicCircuitsOutputs(NamedTuple):
    expectation: TKR[float]

and defining a graph under the assumption that our circuit will have three free symbols

from tierkreis.builder import GraphBuilder
from substitution_worker import substitute
from tierkreis.pytket_worker import (
    add_measure_all,
    expectation,
    optimise_phase_gadgets,
)
from tierkreis.aer_worker import submit_single


def symbolic_execution() -> GraphBuilder:
    """A graph that substitutes 3 parameters into a circuit and gets an expectation value."""
    g = GraphBuilder(SymbolicCircuitsInputs, SymbolicCircuitsOutputs)
    a = g.inputs.a
    b = g.inputs.b
    c = g.inputs.c
    ansatz = g.inputs.ansatz
    n_shots = g.const(100)

    substituted_circuit = g.task(substitute(a=a, b=b, c=c, circuit=ansatz))
    measurement_circuit = g.task(add_measure_all(circuit=substituted_circuit))

    compiled_circuit = g.task(optimise_phase_gadgets(circuit=measurement_circuit))
    backend_result = g.task(submit_single(circuit=compiled_circuit, n_shots=n_shots))
    av = g.task(expectation(backend_result=backend_result))

    g.outputs(SymbolicCircuitsOutputs(expectation=av))
    return g

defining the ansatz:

from pytket._tket.circuit import Circuit, fresh_symbol


def build_ansatz() -> Circuit:
    a = fresh_symbol("a")
    b = fresh_symbol("b")
    c = fresh_symbol("c")
    circ = Circuit(4)
    circ.CX(0, 1)
    circ.CX(1, 2)
    circ.CX(2, 3)
    circ.Rz(a, 3)
    circ.CX(2, 3)
    circ.CX(1, 2)
    circ.CX(0, 1)
    circ.Rz(b, 0)
    circ.CX(0, 1)
    circ.CX(1, 2)
    circ.CX(2, 3)
    circ.Rz(c, 3)
    circ.CX(2, 3)
    circ.CX(1, 2)
    circ.CX(0, 1)
    return circ

Most HPC systems use a shared file system which can be used as a storage. Make sure the the checkpoints live on there!

from pathlib import Path
from uuid import UUID

from tierkreis.controller.storage.filestorage import ControllerFileStorage

storage = ControllerFileStorage(
    UUID(int=101),
    name="symbolic_circuits",
    do_cleanup=True,
    tierkreis_directory=Path.home()
    / ".tierkreis"
    / "checkpoints",  # This is the default, change this to the shared fs
)

For everything that doesn’t need HPC nodes we are going to use the regular executor based on uv which we will use for a composed executor.

from tierkreis.consts import PACKAGE_PATH
from tierkreis.executor import UvExecutor

executor = UvExecutor(PACKAGE_PATH.parent / "tierkreis_workers", storage.logs_path)

If we wan to use HPC resources we need to provide a definition of our job. This includes common information like number of nodes, memory, walltime etc. For this we are defining a job spec which runs uv on a compute node. In this case we use a pre-build environment in case we have different architectures as the login node

from tierkreis.controller.executor.hpc.job_spec import JobSpec, ResourceSpec

spec = JobSpec(
    job_name="tkr_symbolic_circuits",
    account="<group_name>",  # TODO replace with an actual account / group name
    command="env UV_PROJECT_ENVIRONMENT=compute_venv uv run main.py",  # Takes care of the architecture
    resource=ResourceSpec(nodes=1, memory_gb=None, gpus_per_node=None),
    walltime="00:15:00",
    output_path=Path(storage.logs_path),
    error_path=Path(storage.logs_path),
    include_no_check_directory_flag=True,
)

From this we can now construct an executor. There are different implementations for the following schedulers:

  • slurm

  • PBS

  • PJSUB

We are using PJSUB

from tierkreis.controller.executor.hpc.pjsub import PJSUBExecutor

custom_executor = PJSUBExecutor(
    spec=spec,
    registry_path=Path().parent / "example_workers",
    logs_path=storage.logs_path,
)

Combining this into one common executor

from tierkreis.controller.executor.multiple import MultipleExecutor

multi_executor = MultipleExecutor(
    executor,
    executors={"custom": custom_executor},
    assignments={"substitution_worker": "custom"},
)

We can now run the example after defining the inputs. Make sure that the code is running on an HPC system and you have selected the correct group and executor!

from tierkreis.controller import run_graph
from tierkreis.storage import read_outputs

run_graph(
    storage,
    multi_executor,
    symbolic_execution().data,
    {
        "ansatz": build_ansatz(),
        "a": 0.2,
        "b": 0.55,
        "c": 0.75,
    },
    polling_interval_seconds=0.1,
)
output = read_outputs(symbolic_execution().data, storage)