Tierkreis and HPC¶
Tierkreis supports execution in HPC environments through executors natively. Before we take at a look at the executor lets define a problem to run.
For this example we are using symbolic circuits, which are supported through the tkr_pytket_worker.
As before were defining inputs and outputs:
%pip install tierkreis pytket
from typing import NamedTuple
from tierkreis.controller.data.models import TKR, OpaqueType
class SymbolicCircuitsInputs(NamedTuple):
a: TKR[float]
b: TKR[float]
c: TKR[float]
ansatz: TKR[OpaqueType["pytket._tket.circuit.Circuit"]] # noqa: F821
class SymbolicCircuitsOutputs(NamedTuple):
expectation: TKR[float]
and defining a graph under the assumption that our circuit will have three free symbols
from tierkreis.builder import GraphBuilder
from substitution_worker import substitute
from tierkreis.pytket_worker import (
add_measure_all,
expectation,
optimise_phase_gadgets,
)
from tierkreis.aer_worker import submit_single
def symbolic_execution() -> GraphBuilder:
"""A graph that substitutes 3 parameters into a circuit and gets an expectation value."""
g = GraphBuilder(SymbolicCircuitsInputs, SymbolicCircuitsOutputs)
a = g.inputs.a
b = g.inputs.b
c = g.inputs.c
ansatz = g.inputs.ansatz
n_shots = g.const(100)
substituted_circuit = g.task(substitute(a=a, b=b, c=c, circuit=ansatz))
measurement_circuit = g.task(add_measure_all(circuit=substituted_circuit))
compiled_circuit = g.task(optimise_phase_gadgets(circuit=measurement_circuit))
backend_result = g.task(submit_single(circuit=compiled_circuit, n_shots=n_shots))
av = g.task(expectation(backend_result=backend_result))
g.outputs(SymbolicCircuitsOutputs(expectation=av))
return g
defining the ansatz:
from pytket._tket.circuit import Circuit, fresh_symbol
def build_ansatz() -> Circuit:
a = fresh_symbol("a")
b = fresh_symbol("b")
c = fresh_symbol("c")
circ = Circuit(4)
circ.CX(0, 1)
circ.CX(1, 2)
circ.CX(2, 3)
circ.Rz(a, 3)
circ.CX(2, 3)
circ.CX(1, 2)
circ.CX(0, 1)
circ.Rz(b, 0)
circ.CX(0, 1)
circ.CX(1, 2)
circ.CX(2, 3)
circ.Rz(c, 3)
circ.CX(2, 3)
circ.CX(1, 2)
circ.CX(0, 1)
return circ
Most HPC systems use a shared file system which can be used as a storage. Make sure the the checkpoints live on there!
from pathlib import Path
from uuid import UUID
from tierkreis.controller.storage.filestorage import ControllerFileStorage
storage = ControllerFileStorage(
UUID(int=101),
name="symbolic_circuits",
do_cleanup=True,
tierkreis_directory=Path.home()
/ ".tierkreis"
/ "checkpoints", # This is the default, change this to the shared fs
)
For everything that doesn’t need HPC nodes we are going to use the regular executor based on uv which we will use for a composed executor.
from tierkreis.consts import PACKAGE_PATH
from tierkreis.executor import UvExecutor
executor = UvExecutor(PACKAGE_PATH.parent / "tierkreis_workers", storage.logs_path)
If we wan to use HPC resources we need to provide a definition of our job.
This includes common information like number of nodes, memory, walltime etc.
For this we are defining a job spec which runs uv on a compute node.
In this case we use a pre-build environment in case we have different architectures as the login node
from tierkreis.controller.executor.hpc.job_spec import JobSpec, ResourceSpec
spec = JobSpec(
job_name="tkr_symbolic_circuits",
account="<group_name>", # TODO replace with an actual account / group name
command="env UV_PROJECT_ENVIRONMENT=compute_venv uv run main.py", # Takes care of the architecture
resource=ResourceSpec(nodes=1, memory_gb=None, gpus_per_node=None),
walltime="00:15:00",
output_path=Path(storage.logs_path),
error_path=Path(storage.logs_path),
include_no_check_directory_flag=True,
)
From this we can now construct an executor. There are different implementations for the following schedulers:
slurm
PBS
PJSUB
We are using PJSUB
from tierkreis.controller.executor.hpc.pjsub import PJSUBExecutor
custom_executor = PJSUBExecutor(
spec=spec,
registry_path=Path().parent / "example_workers",
logs_path=storage.logs_path,
)
Combining this into one common executor
from tierkreis.controller.executor.multiple import MultipleExecutor
multi_executor = MultipleExecutor(
executor,
executors={"custom": custom_executor},
assignments={"substitution_worker": "custom"},
)
We can now run the example after defining the inputs. Make sure that the code is running on an HPC system and you have selected the correct group and executor!
from tierkreis.controller import run_graph
from tierkreis.storage import read_outputs
run_graph(
storage,
multi_executor,
symbolic_execution().data,
{
"ansatz": build_ansatz(),
"a": 0.2,
"b": 0.55,
"c": 0.75,
},
polling_interval_seconds=0.1,
)
output = read_outputs(symbolic_execution().data, storage)