diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 105b7873..29dc675f 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -22,7 +22,7 @@ jobs: run: python -m pip install torch torchvision torchaudio - name: Install pyquafu - run: python -m pip install . + run: python setup.py develop - name: Run unit tests run: pytest tests/ diff --git a/.gitignore b/.gitignore index 56c3f404..3899e32e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.so *.svg *temp.* +temp/ .DS_Store .idea .vscode diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..fa01be02 --- /dev/null +++ b/conftest.py @@ -0,0 +1,17 @@ +import pytest + + +def pytest_addoption(parser): + # Options for tests/quafu/algorithms/qnn_test.py + parser.addoption("--epoch", action="store", default=1, help="The number of epochs") + parser.addoption("--bsz", action="store", default=1, help="Batch size") + + +@pytest.fixture +def num_epochs(request): + return int(request.config.getoption("--epoch")) + + +@pytest.fixture +def batch_size(request): + return int(request.config.getoption("--bsz")) diff --git a/quafu/algorithms/ansatz.py b/quafu/algorithms/ansatz.py index da844f96..89217e7d 100644 --- a/quafu/algorithms/ansatz.py +++ b/quafu/algorithms/ansatz.py @@ -145,7 +145,9 @@ class QuantumNeuralNetwork(Ansatz): """A Wrapper of quantum circuit as QNN""" # TODO(zhaoyilun): docs - def __init__(self, num_qubits: int, layers: List[Any], interface="torch"): + def __init__( + self, num_qubits: int, layers: List[Any], interface="torch", backend="sim" + ): """""" # Get transformer according to specified interface self._transformer = InterfaceProvider.get(interface) @@ -153,8 +155,17 @@ def __init__(self, num_qubits: int, layers: List[Any], interface="torch"): # FIXME(zhaoyilun): don't use this default value self._weights = np.empty((1, 1)) + + self._backend = backend super().__init__(num_qubits) + def __call__(self, features): + """Compute outputs of QNN given input features""" + from .estimator import Estimator + + estimator = Estimator(self, backend=self._backend) + return self._transformer.execute(self, features, estimator=estimator) + def _build(self): """Essentially initialize weights using transformer""" for layer in self._layers: diff --git a/quafu/algorithms/gradients/vjp.py b/quafu/algorithms/gradients/vjp.py index 1096ff8a..cb60162d 100644 --- a/quafu/algorithms/gradients/vjp.py +++ b/quafu/algorithms/gradients/vjp.py @@ -31,15 +31,24 @@ def _generate_expval_z(num_qubits: int): # TODO(zhaoyilun): support more measurement types -def run_circ(circ: QuantumCircuit, params: Optional[List[float]] = None): +# FIXME(zhaoyilun): remove backend +def run_circ( + circ: QuantumCircuit, + params: Optional[List[float]] = None, + backend: str = "sim", + estimator: Optional[Estimator] = None, +): """Execute a circuit Args: circ (QuantumCircuit): circ params (Optional[List[float]]): params + backend (str): backend + estimator (Optional[Estimator]): estimator """ obs_list = _generate_expval_z(circ.num) - estimator = Estimator(circ) + if estimator is None: + estimator = Estimator(circ, backend=backend) if params is None: params = [g.paras for g in circ.parameterized_gates] output = [estimator.run(obs, params) for obs in obs_list] @@ -47,7 +56,11 @@ def run_circ(circ: QuantumCircuit, params: Optional[List[float]] = None): # TODO(zhaoyilun): support more gradient methods -def jacobian(circ: QuantumCircuit, params_input: np.ndarray): +def jacobian( + circ: QuantumCircuit, + params_input: np.ndarray, + estimator: Optional[Estimator] = None, +): """Calculate Jacobian matrix Args: @@ -57,7 +70,8 @@ def jacobian(circ: QuantumCircuit, params_input: np.ndarray): batch_size, num_params = params_input.shape obs_list = _generate_expval_z(circ.num) num_outputs = len(obs_list) - estimator = Estimator(circ) + if estimator is None: + estimator = Estimator(circ) calc_grad = ParamShift(estimator) output = np.zeros((batch_size, num_outputs, num_params)) for i in range(batch_size): @@ -69,19 +83,38 @@ def jacobian(circ: QuantumCircuit, params_input: np.ndarray): def compute_vjp(jac: np.ndarray, dy: np.ndarray): - """compute vector-jacobian product + r"""compute vector-jacobian product Args: jac (np.ndarray): jac with shape (batch_size, num_outputs, num_params) dy (np.ndarray): dy with shape (batch_size, num_outputs) + + Notes: + Suppose there are n inputs and m outputs in current node + Let x, y denote the inputs and outputs of current node, o denotes the final output + Essentially, jacobian is + + .. math:: + \begin{bmatrix} + \frac{\partial y_1}{\partial x_1} & \cdots & \frac{\partial y_1}{x_n} \\ + \vdots & \ddots & \vdots \\ + \frac{\partial y_m}{\partial x_1} & \cdots & \frac{\partial y_m}{x_n} + \end{bmatrix} + + `dy` is actually the vjp of dependent node + + .. math:: \[ \frac{partial o}{partial y_1} \dots \frac{partial o}{partial y_m} \] + + Therefore the vector jocobian product gets + + .. math:: \[ \frac{partial o}{partial x_1} \dots \frac{partial o}{partial x_n} \] """ batch_size, num_outputs, num_params = jac.shape assert dy.shape[0] == batch_size and dy.shape[1] == num_outputs - vjp = np.zeros((batch_size, num_params)) - - for i in range(batch_size): - vjp[i] = dy[i, :].T @ jac[i, :, :] + # Compute vector-Jacobian product using Einstein summation convention + # the scripts simply mean 'jac-dims,dy-dims->vjp-dims'; so num_outputs is summed over + vjp = np.einsum("ijk,ij->ik", jac, dy) return vjp diff --git a/quafu/algorithms/interface/torch.py b/quafu/algorithms/interface/torch.py index 50e2b5fe..e9146364 100644 --- a/quafu/algorithms/interface/torch.py +++ b/quafu/algorithms/interface/torch.py @@ -13,8 +13,11 @@ # limitations under the License. """quafu PyTorch quantum layer""" +from typing import Optional + import numpy as np import torch +from quafu.algorithms.estimator import Estimator from quafu import QuantumCircuit @@ -28,7 +31,7 @@ def init_weights(shape): """Return torch gradient tensor with specified shape""" return torch.randn(*shape, requires_grad=True, dtype=torch.double) - # TODO(zhaoyilun): doc + # TODO(zhaoyilun): docstrings @staticmethod def execute( circ: QuantumCircuit, @@ -36,6 +39,7 @@ def execute( run_fn=run_circ, grad_fn=None, method="internal", + estimator: Optional[Estimator] = None, ): """execute. @@ -45,11 +49,19 @@ def execute( grad_fn: """ - kwargs = {"circ": circ, "run_fn": run_fn, "grad_fn": grad_fn} + kwargs = { + "circ": circ, + "run_fn": run_fn, + "grad_fn": grad_fn, + "estimator": estimator, + } if method == "external": return ExecuteCircuits.apply(parameters, kwargs) if method == "internal": + from ..ansatz import QuantumNeuralNetwork + + assert isinstance(circ, QuantumNeuralNetwork) return ExecuteCircuits.apply(circ.weights, kwargs) raise NotImplementedError(f"Unsupported execution method: {method}") @@ -61,11 +73,12 @@ class ExecuteCircuits(torch.autograd.Function): def forward(ctx, parameters, kwargs): ctx.run_fn = kwargs["run_fn"] ctx.circ = kwargs["circ"] + ctx.estimator = kwargs["estimator"] ctx.save_for_backward(parameters) parameters = parameters.numpy().tolist() outputs = [] for para in parameters: - out = ctx.run_fn(ctx.circ, para) + out = ctx.run_fn(ctx.circ, para, estimator=ctx.estimator) outputs.append(out) outputs = np.stack(outputs) outputs = torch.from_numpy(outputs) @@ -74,7 +87,7 @@ def forward(ctx, parameters, kwargs): @staticmethod def backward(ctx, grad_out): (parameters,) = ctx.saved_tensors - jac = jacobian(ctx.circ, parameters.numpy()) + jac = jacobian(ctx.circ, parameters.numpy(), estimator=ctx.estimator) vjp = compute_vjp(jac, grad_out.numpy()) vjp = torch.from_numpy(vjp) return vjp, None diff --git a/tests/quafu/algorithms/qnn_test.py b/tests/quafu/algorithms/qnn_test.py index ac14de2c..394d59b9 100644 --- a/tests/quafu/algorithms/qnn_test.py +++ b/tests/quafu/algorithms/qnn_test.py @@ -12,19 +12,71 @@ # See the License for the specific language governing permissions and # limitations under the License. import numpy as np -import torch.nn +import pytest +import torch from quafu.algorithms.ansatz import QuantumNeuralNetwork from quafu.algorithms.gradients import compute_vjp, jacobian from quafu.algorithms.interface.torch import TorchTransformer from quafu.algorithms.templates.basic_entangle import BasicEntangleLayers from quafu.circuits.quantum_circuit import QuantumCircuit +from torch import nn +from torch.utils.data import DataLoader, TensorDataset -class ModelStandardCircuit(torch.nn.Module): +def _generate_random_dataset(num_inputs, num_samples): + """ + Generate random dataset + + Args: + num_inputs: dimension of input data + num_samples: number of samples in the dataset + """ + # Generate random input coordinates using PyTorch's rand function + x = 2 * torch.rand([num_samples, num_inputs], dtype=torch.double) - 1 + + # Calculate labels based on the sum of input coordinates + y01 = (torch.sum(x, dim=1) >= 0).to(torch.long) + + # Convert to one-hot vector + y = torch.zeros(num_samples, 2) # Two classes (0 and 1) + y[torch.arange(num_samples), y01] = 1 + + # Create a PyTorch dataset + dataset = TensorDataset(x, y) + + return dataset + + +class MLP(nn.Module): + """A simple Multi-Layer Perceptron for test""" + + def __init__(self, num_inputs, num_classes, hidden_size): + """ + Args: + num_inputs: input dimension + num_classes: number of classes + hidden_size: number of hidden neuron + """ + super().__init__() + + # Define the layers of the MLP + self.layers = nn.Sequential( + nn.Linear(num_inputs, hidden_size, dtype=torch.double), + nn.ReLU(), + nn.Linear(hidden_size, num_classes, dtype=torch.double), + ) + + def forward(self, x): + # Propagate the input through the layers + return self.layers(x) + + +class ModelStandardCircuit(nn.Module): def __init__(self, circ: QuantumCircuit): super().__init__() self.circ = circ - self.linear = torch.nn.Linear(3, 3, dtype=torch.double) + num_params = len(circ.parameterized_gates) + self.linear = nn.Linear(num_params, num_params, dtype=torch.double) def forward(self, features): out = self.linear(features) @@ -32,7 +84,7 @@ def forward(self, features): return out -class ModelQuantumNeuralNetwork(torch.nn.Module): +class ModelQuantumNeuralNetwork(nn.Module): def __init__(self, circ: QuantumNeuralNetwork): super().__init__() self.circ = circ @@ -42,6 +94,22 @@ def forward(self, features): return out +class ModelQuantumNeuralNetworkNative(nn.Module): + """Test execution of qnn()""" + + def __init__(self, qnn: QuantumNeuralNetwork): + super().__init__() + self.qnn = qnn + + def forward(self, features): + out = self.qnn(features) + return out + + # def parameters(self, recurse=True): + # for p in self.qnn.weights: + # yield nn.Parameter(p) + + class TestLayers: circ = QuantumCircuit(2) circ.x(0) @@ -49,6 +117,19 @@ class TestLayers: circ.ry(1, 0.5) circ.ry(0, 0.1) + def _model_grad(self, model, batch_size): + """Test one forward pass and gradient calculation of a model""" + + # TODO(zhaoyilun): Make out dimension configurable + features = torch.randn( + batch_size, 3, requires_grad=True, dtype=torch.double + ) # batch_size=4, num_params=3 + outputs = model(features) + targets = torch.randn(batch_size, 2, dtype=torch.double) + criterion = nn.MSELoss() + loss = criterion(outputs, targets) + loss.backward() + def test_compute_vjp(self): params_input = np.random.randn(4, 3) jac = jacobian(self.circ, params_input) @@ -67,7 +148,7 @@ def test_torch_layer_standard_circuit(self): ) # batch_size=4, num_params=3 outputs = model(features) targets = torch.randn(batch_size, 2, dtype=torch.double) - criterion = torch.nn.MSELoss() + criterion = nn.MSELoss() loss = criterion(outputs, targets) loss.backward() @@ -77,12 +158,93 @@ def test_torch_layer_qnn(self): entangle_layer = BasicEntangleLayers(weights, 2) qnn = QuantumNeuralNetwork(2, [entangle_layer]) batch_size = 1 + + # Legacy invokation style model = ModelQuantumNeuralNetwork(qnn) - features = torch.randn( - batch_size, 3, requires_grad=True, dtype=torch.double - ) # batch_size=4, num_params=3 - outputs = model(features) - targets = torch.randn(batch_size, 2, dtype=torch.double) - criterion = torch.nn.MSELoss() - loss = criterion(outputs, targets) - loss.backward() + self._model_grad(model, batch_size) + + # New invokation style + model = ModelQuantumNeuralNetworkNative(qnn) + self._model_grad(model, batch_size) + + @pytest.mark.skip(reason="github env doesn't have token") + def test_torch_layer_qnn_real_machine(self): + """Use QuantumNeuralNetwork ansatz""" + weights = np.random.randn(2, 2) + entangle_layer = BasicEntangleLayers(weights, 2) + qnn = QuantumNeuralNetwork(2, [entangle_layer], backend="ScQ-P10") + qnn.measure([0, 1], [0, 1]) + batch_size = 1 + + # New invokation style + model = ModelQuantumNeuralNetworkNative(qnn) + self._model_grad(model, batch_size) + + def test_classification_on_random_dataset(self, num_epochs, batch_size): + """Test e2e hybrid quantum-classical nn training using a synthetic dataset + + Args: + num_epochs: number of epoches for training + batch_size: batch size for training + + """ + # Define the hyperparameters + num_inputs = 2 + num_classes = 2 + learning_rate = 0.01 + + # Generate the dataset + dataset = _generate_random_dataset(num_inputs, 100) + + # Create QNN + num_qubits = num_classes + weights = np.random.randn(num_qubits, 2) + entangle_layer = BasicEntangleLayers(weights, 2) + qnn = QuantumNeuralNetwork(num_qubits, [entangle_layer]) + # qnn_model = ModelQuantumNeuralNetworkNative(qnn) + qnn_model = ModelStandardCircuit(qnn) + + # Create MLP + mlp = MLP(num_inputs, 4, 4) + + # Create hybrid model + model = nn.Sequential(mlp, qnn_model) + # model = mlp + + # Define the loss function and optimizer + criterion = nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) + + # Create data loader + data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True) + + # Train the model + for epoch in range(num_epochs): + for inputs, labels in data_loader: + # Forward pass + outputs = model(inputs) + + # Compute the loss + loss = criterion(outputs, labels) + + # Backward pass + optimizer.zero_grad() + loss.backward() + + # Update the parameters + optimizer.step() + + # Print the loss + print(f"Epoch {epoch + 1}/{num_epochs}: Loss = {loss.item()}") + + # Evaluate the model on the dataset + correct = 0 + total = 0 + with torch.no_grad(): + for inputs, labels in data_loader: + outputs = model(inputs) + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels.argmax(dim=1)).sum().item() + + print(f"Accuracy: {100 * correct / total:.2f}%")