Leveraging parallelism through mapΒΆ

One major advantage in workflow systems is the ease of scaling computation horizontally. Data-parallel tasks can act independently; In tierkreis this can simply be achieved through the map function. Each map element will receive exactly one sets of inputs and can therefore be immediately dispatched. In this example we will observe the speedup by running multiple independent graphs in parallel.

First we define a simple graph that will run a circuit in two version:

  1. Using the qiskit aer simulator

  2. Using the qulacs simulator

%pip install tierkreis pytket qiskit-aer
/home/runner/work/tierkreis/tierkreis/.venv/bin/python3: No module named pip
Note: you may need to restart the kernel to use updated packages.
from typing import Literal, NamedTuple
from tierkreis.builder import GraphBuilder
from tierkreis.controller.data.models import TKR, OpaqueType
from tierkreis.builtins import untuple
from tierkreis.aer_worker import (
    get_compiled_circuit as aer_compile,
    run_circuit as aer_run,
)
from tierkreis.qulacs_worker import (
    get_compiled_circuit as qulacs_compile,
    run_circuit as qulacs_run,
)

type BackendResult = OpaqueType["pytket.backends.backendresult.BackendResult"]  # noqa: F821
type Circuit = OpaqueType["pytket._tket.circuit.Circuit"]  # noqa: F821


class SimulateJobInputsSingle(NamedTuple):
    simulator_name: TKR[Literal["aer", "qulacs"]]
    circuit_shots: TKR[tuple[Circuit, int]]
    compilation_optimisation_level: TKR[int]


def aer_simulate_single():
    g = GraphBuilder(SimulateJobInputsSingle, TKR[BackendResult])
    circuit_shots = g.task(untuple(g.inputs.circuit_shots))

    compiled_circuit = g.task(
        aer_compile(
            circuit=circuit_shots.a,
            optimisation_level=g.inputs.compilation_optimisation_level,
        )
    )
    res = g.task(aer_run(compiled_circuit, circuit_shots.b))
    g.outputs(res)
    return g


def qulacs_simulate_single():
    g = GraphBuilder(SimulateJobInputsSingle, TKR[BackendResult])
    circuit_shots = g.task(untuple(g.inputs.circuit_shots))

    compiled_circuit = g.task(
        qulacs_compile(
            circuit=circuit_shots.a,
            optimisation_level=g.inputs.compilation_optimisation_level,
        )
    )
    res = g.task(qulacs_run(compiled_circuit, circuit_shots.b))
    g.outputs(res)
    return g

So far these are regular graphs that compile and simulate a single circuit. We are going to combine them into a single graph taking a parameter to decide which simulator to run using ifelse Although we we will have two similar subgraphs in the evaluation, this is not a performance detriment as ifelse only evaluates lazily.

from tierkreis.builtins import str_eq


def compile_simulate_single():
    g = GraphBuilder(SimulateJobInputsSingle, TKR[BackendResult])

    aer_res = g.eval(aer_simulate_single(), g.inputs)
    qulacs_res = g.eval(qulacs_simulate_single(), g.inputs)
    res = g.ifelse(
        g.task(str_eq(g.inputs.simulator_name, g.const("aer"))), aer_res, qulacs_res
    )

    g.outputs(res)
    return g

To make this parallel over multiple circuits we are using the map feature in a new graph.

class SimulateJobInputs(NamedTuple):
    simulator_name: TKR[Literal["aer", "qulacs"]]
    circuits: TKR[list[Circuit]]
    n_shots: TKR[list[int]]
    compilation_optimisation_level: TKR[int]


g = GraphBuilder(SimulateJobInputs, TKR[list[BackendResult]])

Each of the SimulateJobInputsSingle expects a tuple (Circuit, n_shots) which we generate by zipping

from tierkreis.builtins import tkr_zip

circuits_shots = g.task(tkr_zip(g.inputs.circuits, g.inputs.n_shots))

A convenient way to aggregate the inputs is using a map over a lambda

job_inputs = g.map(
    lambda x: SimulateJobInputsSingle(
        simulator_name=g.inputs.simulator_name,
        circuit_shots=x,
        compilation_optimisation_level=g.inputs.compilation_optimisation_level,
    ),
    circuits_shots,
)

and finally we can map over the jobs

res = g.map(compile_simulate_single(), job_inputs)

g.outputs(res)

preparing the storage, executor and inputs

from pathlib import Path
from uuid import UUID

from pytket.qasm.qasm import circuit_from_qasm

from tierkreis.consts import PACKAGE_PATH
from tierkreis.storage import FileStorage
from tierkreis.executor import UvExecutor

circuit = circuit_from_qasm(Path().parent / "data" / "ghz_state_n23.qasm")
circuits = [circuit] * 3
n_shots = 1024

storage = FileStorage(UUID(int=107), do_cleanup=True)
executor = UvExecutor(PACKAGE_PATH / ".." / "tierkreis_workers", storage.logs_path)
inputs = {
    "circuits": circuits,
    "n_shots": [n_shots] * len(circuits),
    "compilation_optimisation_level": 2,
}

we can now benchmark aer by setting the simulator_name input

import time
from tierkreis.controller import run_graph

inputs["simulator_name"] = "aer"
print("Simulating using aer...")
start = time.time()
run_graph(storage, executor, g, inputs, polling_interval_seconds=0.1)
print(f"time taken: {time.time() - start}")
Simulating using aer...
time taken: 11.141263008117676

and

inputs["simulator_name"] = "qulacs"

print("Simulating using qulacs...")
storage.clean_graph_files()
start = time.time()
run_graph(storage, executor, g, inputs, polling_interval_seconds=0.1)
print(f"time taken: {time.time() - start}")
Simulating using qulacs...
time taken: 48.19670557975769

compared against running the same graph three times:

start = time.time()
for circuit in circuits:
    inputs = {
        "circuit_shots": (circuit, n_shots),
        "compilation_optimisation_level": 2,
        "simulator_name": "aer",
    }
    storage.clean_graph_files()
    run_graph(
        storage,
        executor,
        compile_simulate_single(),
        inputs,
        polling_interval_seconds=0.1,
    )
print(f"time taken: {time.time() - start}")
time taken: 18.41044569015503