diff --git a/quafu/algorithms/layers/qnode.py b/quafu/algorithms/layers/qnode.py index a1cdf61..60192af 100644 --- a/quafu/algorithms/layers/qnode.py +++ b/quafu/algorithms/layers/qnode.py @@ -13,6 +13,7 @@ # limitations under the License. +from typing import List, Optional import numpy as np from quafu import QuantumCircuit from quafu.algorithms import Hamiltonian @@ -30,15 +31,17 @@ def _generate_expval_z(num_qubits: int): # TODO(zhaoyilun): support more measurement types -def execute(circ: QuantumCircuit): - """execute. +def run_circ(circ: QuantumCircuit, params: Optional[List[float]] = None): + """Execute a circuit Args: circ (QuantumCircuit): circ + params (Optional[List[float]]): params """ obs_list = _generate_expval_z(circ.num) estimator = Estimator(circ) - params = [g.paras for g in circ.parameterized_gates] + if params is None: + params = [g.paras for g in circ.parameterized_gates] output = [estimator.run(obs, params) for obs in obs_list] return np.array(output) @@ -53,10 +56,10 @@ def jacobian(circ: QuantumCircuit, params_input: np.ndarray): """ batch_size, num_params = params_input.shape obs_list = _generate_expval_z(circ.num) - num_expvals = len(obs_list) + num_outputs = len(obs_list) estimator = Estimator(circ) calc_grad = ParamShift(estimator) - output = np.zeros((batch_size, num_expvals, num_params)) + output = np.zeros((batch_size, num_outputs, num_params)) for i in range(batch_size): grad_list = [ np.array(calc_grad(obs, params_input[i, :].tolist())) for obs in obs_list @@ -69,16 +72,15 @@ def compute_vjp(jac: np.ndarray, dy: np.ndarray): """compute vector-jacobian product Args: - jac (np.ndarray): jac with shape (batch_size, num_expvals, num_params) - dy (np.ndarray): dy + jac (np.ndarray): jac with shape (batch_size, num_outputs, num_params) + dy (np.ndarray): dy with shape (batch_size, num_outputs) """ - batch_size, num_expvals, num_params = jac.shape - assert dy.shape[0] == batch_size and dy.shape[1] == num_expvals + batch_size, num_outputs, num_params = jac.shape + assert dy.shape[0] == batch_size and dy.shape[1] == num_outputs vjp = np.zeros((batch_size, num_params)) for i in range(batch_size): vjp[i] = dy[i, :].T @ jac[i, :, :] - vjp = vjp.sum(0) return vjp diff --git a/quafu/algorithms/layers/torch.py b/quafu/algorithms/layers/torch.py index c886de5..6ddf95b 100644 --- a/quafu/algorithms/layers/torch.py +++ b/quafu/algorithms/layers/torch.py @@ -14,28 +14,36 @@ """quafu PyTorch quantum layer""" -from typing import Any import torch +import numpy as np from quafu import QuantumCircuit -from quafu import QuantumCircuit, simulate -from quafu.elements.element_gates.matrices import ZMatrix +from quafu.algorithms.layers.qnode import compute_vjp, jacobian class ExecuteCircuits(torch.autograd.Function): """TODO(zhaoyilun): document""" @staticmethod - def forward(ctx, parameters, **kwargs) -> Any: + def forward(ctx, parameters, kwargs): ctx.run_fn = kwargs["run_fn"] ctx.circ = kwargs["circ"] - out = ctx.run_fn(ctx.circ, parameters) - return out + ctx.save_for_backward(parameters) + parameters = parameters.numpy().tolist() + outputs = [] + for para in parameters: + out = ctx.run_fn(ctx.circ, para) + outputs.append(out) + outputs = np.stack(outputs) + outputs = torch.from_numpy(outputs) + return outputs @staticmethod - def backward(ctx: Any, grad_out) -> Any: - circ, grad_fn = ctx.saved_tensors - grad = grad_fn(circ, grad_out) - return grad + def backward(ctx, grad_out): + (parameters,) = ctx.saved_tensors + jac = jacobian(ctx.circ, parameters.numpy()) + vjp = compute_vjp(jac, grad_out.numpy()) + vjp = torch.from_numpy(vjp) + return vjp, None # TODO(zhaoyilun): doc @@ -50,4 +58,4 @@ def execute(circ: QuantumCircuit, run_fn, grad_fn, parameters: torch.Tensor): kwargs = {"circ": circ, "run_fn": run_fn, "grad_fn": grad_fn} - return ExecuteCircuits.apply(parameters, **kwargs) + return ExecuteCircuits.apply(parameters, kwargs) diff --git a/tests/quafu/algorithms/layers_test.py b/tests/quafu/algorithms/layers_test.py index 6a9c5bf..6f0b9e9 100644 --- a/tests/quafu/algorithms/layers_test.py +++ b/tests/quafu/algorithms/layers_test.py @@ -13,22 +13,50 @@ # limitations under the License. import numpy as np +import torch.nn from quafu.algorithms.layers import jacobian, compute_vjp +from quafu.algorithms.layers.torch import execute +from quafu.algorithms.layers.qnode import run_circ from quafu.circuits.quantum_circuit import QuantumCircuit -def test_compute_vjp(): +class MyModel(torch.nn.Module): + def __init__(self, circ: QuantumCircuit): + super().__init__() + self.circ = circ + self.linear = torch.nn.Linear(3, 3, dtype=torch.double) + + def forward(self, features): + out = self.linear(features) + out = execute(self.circ, run_circ, None, out) + return out + + +class TestLayers: circ = QuantumCircuit(2) circ.x(0) circ.rx(0, 0.1) circ.ry(1, 0.5) circ.ry(0, 0.1) - params_input = np.random.randn(4, 3) - jac = jacobian(circ, params_input) + def test_compute_vjp(self): + params_input = np.random.randn(4, 3) + jac = jacobian(self.circ, params_input) + + dy = np.random.randn(4, 2) + vjp = compute_vjp(jac, dy) - dy = np.random.randn(4, 2) - vjp = compute_vjp(jac, dy) + assert len(vjp.shape) == 2 + assert vjp.shape[0] == 4 - assert len(vjp.shape) == 1 - assert vjp.shape[0] == 3 + def test_torch_layer(self): + batch_size = 1 + model = MyModel(self.circ) + features = torch.randn( + batch_size, 3, requires_grad=True, dtype=torch.double + ) # batch_size=4, num_params=3 + outputs = model(features) + targets = torch.randn(batch_size, 2, dtype=torch.double) + criterion = torch.nn.MSELoss() + loss = criterion(outputs, targets) + loss.backward()