diff --git a/source/pip/qsharp/estimator/__init__.py b/source/pip/qsharp/estimator/__init__.py index ef870f3dad..d79bb09fed 100644 --- a/source/pip/qsharp/estimator/__init__.py +++ b/source/pip/qsharp/estimator/__init__.py @@ -18,6 +18,10 @@ EstimatorParams, ) +from ._layout_psspc import PSSPCEstimator +from ._qec_surface_code import SurfaceCode +from ._factory_round_based import RoundBasedFactory + __all__ = [ "EstimatorError", "LogicalCounts", @@ -33,4 +37,7 @@ "EstimatorConstraints", "EstimatorInputParamsItem", "EstimatorParams", + "PSSPCEstimator", + "SurfaceCode", + "RoundBasedFactory", ] diff --git a/source/pip/qsharp/estimator/_factory_round_based.py b/source/pip/qsharp/estimator/_factory_round_based.py new file mode 100644 index 0000000000..e5e99c2eb2 --- /dev/null +++ b/source/pip/qsharp/estimator/_factory_round_based.py @@ -0,0 +1,188 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from typing import Any, Dict +from ._utils import extract_qubit_metric + + +class RoundBasedFactory: + """ + Factory for generating magic states using round-based distillation protocols. + + This class implements magic state distillation using round-based protocols. + It generates distillation units that can operate at both physical and logical levels. + """ + + def __init__( + self, + *, + with_physical: bool = True, + gate_time: str = "gate_time", + gate_error: str = "gate_error", + clifford_error: str = "clifford_error", + use_max_qubits_per_round: bool = False, + max_rounds: int = 3, + max_extra_rounds: int = 5 + ): + """ + Initialize the round-based magic state factory. + + :param with_physical: Whether to include physical-level distillation units + (default: True) + :param gate_time: Key name (or list of key names) for extracting gate time + from qubit metrics. If a list is provided, the sum of all corresponding + times is used (default: "gate_time") + :param gate_error: Key name (or list of key names) for extracting gate + error rate from qubit metrics. If a list is provided, the maximum of + all corresponding error rates is used (default: "gate_error") + :param clifford_error: Key name (or list of key names) for extracting + Clifford gate error rate from qubit metrics. If a list is provided, + the maximum of all corresponding error rates is used + (default: "clifford_error") + :param use_max_qubits_per_round: Whether to maximize qubits used per round + (default: False) + :param max_rounds: Maximum number of distillation rounds (default: 3) + :param max_extra_rounds: Maximum number of additional rounds beyond + max_rounds (default: 5) + """ + self.with_physical = with_physical + self.gate_time = gate_time + self.gate_error = gate_error + self.clifford_error = clifford_error + self.use_max_qubits_per_round = use_max_qubits_per_round + self.max_rounds = max_rounds + self.max_extra_rounds = max_extra_rounds + + def distillation_units( + self, code: Any, qubit: Dict[str, Any], max_code_parameter: int + ): + """ + Generate a list of distillation units for magic state production. + + Creates distillation units using 15-to-1 protocols (RM prep and space + efficient variants) at both physical level (if enabled) and across + all valid code parameters up to the maximum. + + :param code: QEC code object that provides code parameters and metrics + :param qubit: Dictionary containing physical qubit characteristics + :param max_code_parameter: Maximum code parameter (distance) to consider + :return: List of distillation unit dictionaries, each containing + configuration and callable functions for resource calculations + """ + units = [] + + gate_time = extract_qubit_metric(qubit, self.gate_time) + clifford_error = extract_qubit_metric(qubit, self.clifford_error) + + if self.with_physical: + units.append( + _create_unit( + "15-to-1 RM prep", + 1, + 24, + gate_time, + 1, + 31, + clifford_error, + ) + ) + units.append( + _create_unit( + "15-to-1 space efficient", + 1, + 45, + gate_time, + 1, + 12, + clifford_error, + ) + ) + + for code_parameter in code.code_parameter_range(): + if code.code_parameter_cmp(qubit, code_parameter, max_code_parameter) == 1: + break + + units.append( + _create_unit( + "15-to-1 RM prep", + code_parameter, + 11, + code.logical_cycle_time(qubit, code_parameter), + code.physical_qubits(code_parameter), + 31, + code.logical_error_rate(qubit, code_parameter), + ) + ) + units.append( + _create_unit( + "15-to-1 space efficient", + code_parameter, + 13, + code.logical_cycle_time(qubit, code_parameter), + code.physical_qubits(code_parameter), + 20, + code.logical_error_rate(qubit, code_parameter), + ) + ) + + return units + + def trivial_distillation_unit( + self, code: Any, qubit: Dict[str, Any], code_parameter: Any + ): + """ + Creates this 1-to-1 distillation unit in the case where the target error + rate is already met by the physical qubit. + + :param code: QEC code object that provides code parameters and metrics + :param qubit: Dictionary containing physical qubit characteristics + :param code_parameter: Code parameter chosen to run the algorithm + """ + + return { + "name": "trivial 1-to-1", + "code_parameter": code_parameter, + "num_input_states": 1, + "num_output_states": 1, + "physical_qubits": lambda _: code.physical_qubits(code_parameter), + "duration": lambda _: code.logical_cycle_time(qubit, code_parameter), + "output_error_rate": lambda input_error_rate: input_error_rate, + "failure_probability": lambda _: 0.0, + } + + +def _create_unit( + name: str, + code_parameter: Any, + num_cycles: int, + cycle_time: int, + physical_qubits_factor: int, + physical_qubits: int, + clifford_error_rate: float, +): + """ + Create a distillation unit configuration dictionary. + + :param name: Name of the distillation protocol + :param code_parameter: Code parameter (distance) for this unit + :param num_cycles: Number of cycles required for distillation + :param cycle_time: Time per cycle + :param physical_qubits_factor: Multiplier for physical qubit count + :param physical_qubits: Base number of physical qubits + :param clifford_error_rate: Error rate for Clifford operations + :return: Dictionary containing unit configuration and callable functions + for calculating physical qubits, duration, output error rate, and + failure probability + """ + + return { + "name": name, + "code_parameter": code_parameter, + "num_input_states": 15, + "num_output_states": 1, + "physical_qubits": lambda _: physical_qubits * physical_qubits_factor, + "duration": lambda _: num_cycles * cycle_time, + "output_error_rate": lambda input_error_rate: 35 * input_error_rate**3 + + 7.1 * clifford_error_rate, + "failure_probability": lambda input_error_rate: 15 * input_error_rate + + 356 * clifford_error_rate, + } diff --git a/source/pip/qsharp/estimator/_layout_psspc.py b/source/pip/qsharp/estimator/_layout_psspc.py new file mode 100644 index 0000000000..0dbdec02d2 --- /dev/null +++ b/source/pip/qsharp/estimator/_layout_psspc.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import math +from typing import Union, List, Optional + +from ._estimator import LogicalCounts + + +NUM_MEASUREMENTS_PER_R = 1 +NUM_MEASUREMENTS_PER_TOF = 3 + + +class PSSPCEstimator: + """ + Computes post-layout logical resources based on the Parallel Synthesis + Sequential Pauli Computation (PSSPC) layout method. + """ + + def __init__( + self, + source: Union[List[str], str, LogicalCounts], + expression: Optional[str] = None, + ): + """ + Constructor for PSSPC layout method. The source can be a list of Q# + source files (use a list even if there is only one file), a path to a Q# + project (directory with a qsharp.json file), or a LogicalCounts object. + + :param source: The Q# source files, project path, or LogicalCounts + object. + :param expression: An entry point expression that must only be used when + the source is a list of Q# files or a project path. + """ + + if isinstance(source, LogicalCounts): + if expression is not None: + raise ValueError( + "Cannot specify entry point expression when source is LogicalCounts" + ) + self._counts = source + else: + if expression is None: + raise ValueError( + "Must specify entry point expression when source is not LogicalCounts" + ) + self._counts = self._compute_counts(source, expression) + + def logical_qubits(self): + """ + Calculates the number of logical qubits required for the PSSPC layout + according to Eq. (D1) in [arXiv:2211.07629](https://arxiv.org/pdf/2211.07629) + """ + + num_qubits = self._counts["numQubits"] + + qubit_padding = math.ceil(math.sqrt(8 * num_qubits)) + 1 + return 2 * num_qubits + qubit_padding + + def logical_depth(self, budget): + """ + Calculates the number of multi-qubit Pauli measurements executed in + sequence according to Eq. (D3) in + [arXiv:2211.07629](https://arxiv.org/pdf/2211.07629) + """ + + budget_rotations = budget["rotations"] + tof_count = self._counts.get("cczCount", 0) + self._counts.get("ccixCount", 0) + num_ts_per_rotation = self._num_ts_per_rotation(budget_rotations) + + return ( + ( + self._counts.get("measurementCount", 0) + + self._counts.get("rotationCount", 0) + + self._counts.get("tCount", 0) + ) + * NUM_MEASUREMENTS_PER_R + + tof_count * NUM_MEASUREMENTS_PER_TOF + + ( + num_ts_per_rotation + * self._counts.get("rotationDepth", 0) + * NUM_MEASUREMENTS_PER_R + ) + ) + + def num_magic_states(self, budget, index): + """ + Calculates the number of T magic states that are consumbed by + multi-qubit Pauli measurements executed by PSSPC according to Eq. (D4) + in [arXiv:2211.07629](https://arxiv.org/pdf/2211.07629) + """ + + # Only works for one kind of magic states, which is assumed to be T + # magic states + assert index == 0 + + budget_rotations = budget["rotations"] + tof_count = self._counts.get("cczCount", 0) + self._counts.get("ccixCount", 0) + num_ts_per_rotation = self._num_ts_per_rotation(budget_rotations) + + return ( + 4 * tof_count + + self._counts.get("tCount", 0) + + num_ts_per_rotation * self._counts.get("rotationCount", 0) + ) + + def algorithm_overhead(self, budget): + """ + Returns the pre-layout logical resources as algorithm overhead, which + can be accessed from the estimation result. + """ + return self._counts + + def prune_error_budget(self, budget, strategy): + if self._counts.get("rotationCount", 0) == 0: + budget_rotations = budget.get("rotations", 0) + budget["rotations"] = 0 + budget["logical"] += budget_rotations / 2 + budget["magic_states"] += budget_rotations / 2 + + def _compute_counts(self, source: Union[List[str], str], expression): + # NOTE: Importing qsharp here to avoid circular dependency + import qsharp + + if isinstance(source, list): + qsharp.init() + for file in source: + qsharp.eval(qsharp._fs.read_file(file)[1]) + elif isinstance(source, str): + qsharp.init(project_root=source) + else: + raise ValueError("Invalid source type for PSSPCEstimator") + + return qsharp.logical_counts(expression) + + def _num_ts_per_rotation(self, rotation_budget): + rotation_count = self._counts.get("rotationCount", 0) + + if rotation_count > 0: + return math.ceil(0.53 * math.log2(rotation_count / rotation_budget) + 4.86) + + else: + return 0 diff --git a/source/pip/qsharp/estimator/_qec_surface_code.py b/source/pip/qsharp/estimator/_qec_surface_code.py new file mode 100644 index 0000000000..7f8b1036cd --- /dev/null +++ b/source/pip/qsharp/estimator/_qec_surface_code.py @@ -0,0 +1,179 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import math +from typing import Any, Dict, List, Union +from ._utils import extract_qubit_metric + + +class SurfaceCode: + """ + Surface code quantum error correction implementation. + + This class implements the surface code QEC scheme, which is a widely-studied + topological quantum error correction code. It provides methods to calculate + physical resource requirements, logical error rates, and timing parameters + based on code distance and physical qubit characteristics. + """ + + def __init__( + self, + *, + crossing_prefactor=0.03, + error_correction_threshold=0.01, + one_qubit_gate_time: Union[str, List[str]] = "one_qubit_gate_time", + measurement_time: Union[str, List[str]] = "measurement_time", + two_qubit_gate_time: Union[str, List[str]] = "two_qubit_gate_time", + two_qubit_gate_error_rate: Union[str, List[str]] = "two_qubit_gate_error_rate", + physical_qubits_formula: str = "2*distance**2", + logical_cycle_time_formula: str = "(one_qubit_gate_time + measurement_time + 4 * two_qubit_gate_time) * distance", + max_distance: int = 149, + ): + """ + Initialize the surface code QEC scheme. + + :param crossing_prefactor: Prefactor used in logical error rate calculation + (default: 0.03) + :param error_correction_threshold: Error correction threshold below which + the code can effectively correct errors (default: 0.01) + :param one_qubit_gate_time: Key name (or list of key names) for extracting + single-qubit gate time from qubit metrics. If a list is provided, the + sum of all corresponding times is used (default: "one_qubit_gate_time") + :param measurement_time: Key name (or list of key names) for extracting + measurement time from qubit metrics. If a list is provided, the sum + of all corresponding times is used (default: "measurement_time") + :param two_qubit_gate_time: Key name (or list of key names) for extracting + two-qubit gate time from qubit metrics. If a list is provided, the sum + of all corresponding times is used (default: "two_qubit_gate_time") + :param two_qubit_gate_error_rate: Key name (or list of key names) for + extracting two-qubit gate error rate from qubit metrics. If a list is + provided, the maximum of all corresponding error rates is used + (default: "two_qubit_gate_error_rate") + :param physical_qubits_formula: Mathematical formula to calculate the + number of physical qubits as a function of distance. The formula is + evaluated with 'distance' and 'math' module available (default: + "2*distance**2") + :param logical_cycle_time_formula: Mathematical formula to calculate the + logical cycle time as a function of distance and gate times (default: + "(one_qubit_gate_time + measurement_time + 4 * two_qubit_gate_time) * distance") + :param max_distance: Maximum code distance to consider (default: 149) + """ + # Logical error rate coefficients + self._crossing_prefactor = crossing_prefactor + self._error_correction_threshold = error_correction_threshold + + # Keys to extract physical qubit metrics + self._one_qubit_gate_time = one_qubit_gate_time + self._measurement_time = measurement_time + self._two_qubit_gate_time = two_qubit_gate_time + self._two_qubit_gate_error_rate = two_qubit_gate_error_rate + + self._physical_qubits_formula = physical_qubits_formula + self._logical_cycle_time_formula = logical_cycle_time_formula + self._max_distance = max_distance + + def physical_qubits(self, distance: int): + """ + Calculate the number of physical qubits required for a given code distance. + + :param distance: The code distance + :return: Number of physical qubits required + """ + safe_context = { + "distance": distance, + "math": math, + "__builtins__": { # Prevent access to built-in functions + "abs": abs, + "min": min, + "max": max, + "pow": pow, + "round": round, + }, + } + + return eval(self._physical_qubits_formula, safe_context) + + def logical_qubits(self, distance: int): + """ + Calculate the number of logical qubits encoded by this code. + + For surface codes, this is always 1 logical qubit per code block. + + :param distance: The code distance (unused but kept for interface consistency) + :return: Number of logical qubits (always 1) + """ + return 1 + + def logical_cycle_time(self, qubit, distance): + """ + Calculate the time required for one logical cycle. + + A logical cycle includes the time for syndrome extraction and correction, + which depends on the physical gate times and the code distance. + + :param qubit: Dictionary containing physical qubit characteristics + :param distance: The code distance + :return: Logical cycle time in the same units as the physical gate times + """ + one_qubit_gate_time = extract_qubit_metric(qubit, self._one_qubit_gate_time) + measurement_time = extract_qubit_metric(qubit, self._measurement_time) + two_qubit_gate_time = extract_qubit_metric(qubit, self._two_qubit_gate_time) + + safe_context = { + "one_qubit_gate_time": one_qubit_gate_time, + "measurement_time": measurement_time, + "two_qubit_gate_time": two_qubit_gate_time, + "distance": distance, + "math": math, + "__builtins__": { # Prevent access to built-in functions + "abs": abs, + "min": min, + "max": max, + "pow": pow, + "round": round, + }, + } + + return eval(self._logical_cycle_time_formula, safe_context) + + def logical_error_rate(self, qubit: Dict[str, Any], distance: int): + """ + Calculate the logical error rate for a given code distance. + + The logical error rate is calculated using an exponential suppression + formula based on the ratio of physical error rate to the error correction + threshold. + + :param qubit: Dictionary containing physical qubit characteristics + :param distance: The code distance + :return: Logical error rate per cycle + """ + physical_error_rate = extract_qubit_metric( + qubit, self._two_qubit_gate_error_rate, combine=max + ) + + return self._crossing_prefactor * ( + (physical_error_rate / self._error_correction_threshold) + ** ((distance + 1) // 2) + ) + + def code_parameter_range(self): + """ + Get the range of valid code distances for this surface code. + + Returns odd integers from 3 to max_distance (inclusive), as surface + codes require odd distances. + + :return: List of valid code distances + """ + return list(range(3, self._max_distance + 1, 2)) + + def code_parameter_cmp(self, qubit: Dict[str, Any], p1: int, p2: int): + """ + Compare two code parameters (distances). + + :param qubit: Dictionary containing physical qubit characteristics (unused) + :param p1: First code parameter to compare + :param p2: Second code parameter to compare + :return: 1 if p1 > p2, -1 if p1 < p2, 0 if p1 == p2 + """ + return (p1 > p2) - (p1 < p2) diff --git a/source/pip/qsharp/estimator/_utils.py b/source/pip/qsharp/estimator/_utils.py new file mode 100644 index 0000000000..2843efb5f9 --- /dev/null +++ b/source/pip/qsharp/estimator/_utils.py @@ -0,0 +1,24 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from typing import Any, Dict, List, Union + + +def extract_qubit_metric( + qubit: Dict[str, Any], keys: Union[str, List[str]], combine=sum +): + """ + Extracts a metric from a dictionary and combine multiple metrics if a list + of keys is provided. + + :param qubit: Dictionary containing qubit metrics + :param keys: Key name or list of key names to extract from the dictionary + :param combine: Function to combine multiple metrics (default: sum) + """ + + try: + if isinstance(keys, str): + return qubit[keys] + else: + return combine(qubit[key] for key in keys) + except KeyError as e: + raise KeyError(f"Missing qubit property: {e}") from e diff --git a/source/pip/tests/test_re_models.py b/source/pip/tests/test_re_models.py new file mode 100644 index 0000000000..2d469f33e9 --- /dev/null +++ b/source/pip/tests/test_re_models.py @@ -0,0 +1,165 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import pytest + +from qsharp.estimator import PSSPCEstimator, SurfaceCode, RoundBasedFactory, QubitParams + + +@pytest.fixture +def qsharp(): + import qsharp + import qsharp._fs + + qsharp._fs.read_file = read_file_memfs + qsharp._fs.list_directory = list_directory_memfs + qsharp._fs.exists = exists_memfs + qsharp._fs.join = join_memfs + qsharp._fs.resolve = resolve_memfs + + return qsharp + + +def test_estimation_from_project(qsharp): + layout = qsharp.estimator.PSSPCEstimator("/project", "Test.Test()") + + assert layout.logical_qubits() == 15 + + +def test_estimation_from_single_file(qsharp): + layout = qsharp.estimator.PSSPCEstimator(["/SingleFile.qs"], "Test()") + + assert layout.logical_qubits() == 42 + + +def test_estimation_comparison(qsharp): + qsharp.init() + + source_file = "/SingleFile.qs" + qsharp.eval(qsharp._fs.read_file(source_file)[1]) + + for qubit_name in [ + QubitParams.GATE_US_E3, + QubitParams.GATE_US_E4, + QubitParams.GATE_NS_E3, + QubitParams.GATE_NS_E4, + ]: + estimates = qsharp.estimate("Test()", {"qubitParams": {"name": qubit_name}}) + + qubit = estimates["jobParams"]["qubitParams"] + + # Remove 'ns' suffix from time metrics + for key, value in qubit.items(): + if isinstance(value, str) and value.endswith("ns"): + value = int(value[:-3]) + qubit[key] = value + + estimates2 = qsharp.estimate_custom( + PSSPCEstimator([source_file], "Test()"), + qubit, + SurfaceCode( + one_qubit_gate_time="oneQubitGateTime", + two_qubit_gate_time="twoQubitGateTime", + measurement_time="oneQubitMeasurementTime", + two_qubit_gate_error_rate="twoQubitGateErrorRate", + logical_cycle_time_formula="(2 * measurement_time + 4 * two_qubit_gate_time) * distance", + ), + [ + RoundBasedFactory( + gate_error="tGateErrorRate", + gate_time="tGateTime", + clifford_error="twoQubitGateErrorRate", + use_max_qubits_per_round=True, + ) + ], + error_budget=0.001, + ) + + assert ( + estimates["physicalCounts"]["physicalQubits"] + == estimates2["physicalQubits"] + ) + assert estimates["physicalCounts"]["runtime"] == estimates2["runtime"] + + +memfs = { + "": { + "project": { + "src": { + "Test.qs": "operation Test() : Unit { use qs = Qubit[4]; ApplyToEach(T, qs); ResetAll(qs); }", + }, + "qsharp.json": "{}", + }, + "SingleFile.qs": "import Std.TableLookup.*; operation Test() : Unit { use address = Qubit[6]; use target = Qubit[5]; let data = [[true, size = 5], size = 32]; Select(data, address, target); ResetAll(address + target); }", + } +} + + +def read_file_memfs(path): + global memfs + item = memfs + for part in path.split("/"): + if part in item: + if isinstance(item[part], OSError): + raise item[part] + else: + item = item[part] + else: + raise Exception("File not found: " + path) + + return (path, item) + + +def list_directory_memfs(dir_path): + global memfs + item = memfs + for part in dir_path.split("/"): + if part in item: + item = item[part] + else: + raise Exception("Directory not found: " + dir_path) + + contents = list( + map( + lambda x: { + "path": join_memfs(dir_path, x[0]), + "entry_name": x[0], + "type": "folder" if isinstance(x[1], dict) else "file", + }, + item.items(), + ) + ) + + return contents + + +def exists_memfs(path): + global memfs + parts = path.split("/") + item = memfs + for part in parts: + if part in item: + item = item[part] + else: + return False + + return True + + +# The below functions force the use of `/` separators in the unit tests +# so that they function on Windows consistently with other platforms. +def join_memfs(path, *paths): + return "/".join([path, *paths]) + + +def resolve_memfs(base, path): + parts = f"{base}/{path}".split("/") + new_parts = [] + for part in parts: + if part == ".": + continue + if part == "..": + new_parts.pop() + continue + new_parts.append(part) + return "/".join(new_parts)