diff --git "a/test/mindspore.mint\346\216\245\345\217\243\346\265\213\350\257\225\344\273\273\345\212\24136 \346\265\213\350\257\225\346\226\207\346\241\243.pdf" "b/test/mindspore.mint\346\216\245\345\217\243\346\265\213\350\257\225\344\273\273\345\212\24136 \346\265\213\350\257\225\346\226\207\346\241\243.pdf" new file mode 100644 index 0000000..fbbcf1b Binary files /dev/null and "b/test/mindspore.mint\346\216\245\345\217\243\346\265\213\350\257\225\344\273\273\345\212\24136 \346\265\213\350\257\225\346\226\207\346\241\243.pdf" differ diff --git a/test/test_binary_cross_entropy.py b/test/test_binary_cross_entropy.py new file mode 100644 index 0000000..26d6fef --- /dev/null +++ b/test/test_binary_cross_entropy.py @@ -0,0 +1,478 @@ +''' +dtype: + pytorch - float16, float32, float64 + mindspore - float16, float32 (和文档一致) +''' + + + + + +import numpy as np +import torch +import torch.nn.functional as F +import mindspore as ms +from mindspore import Tensor +import mindspore.mint.nn.functional as mint_F +import traceback + +# 设置全局精度容差 +TOLERANCE = 1e-3 + +def print_header(title): + print(f"\n{'='*80}\n{title}\n{'='*80}") + +def compare_outputs(pytorch_out, mindspore_out, name="输出"): + """比较两个框架的输出是否在容差范围内""" + pytorch_np = pytorch_out.detach().cpu().numpy() + mindspore_np = mindspore_out.asnumpy() + + max_diff = np.max(np.abs(pytorch_np - mindspore_np)) + mean_diff = np.mean(np.abs(pytorch_np - mindspore_np)) + + print(f"{name} 最大差异: {max_diff}") + print(f"{name} 平均差异: {mean_diff}") + + if max_diff < TOLERANCE: + print(f"✓ {name}在容差范围内一致 (< {TOLERANCE})") + return True + else: + print(f"✗ {name}超出容差范围 (> {TOLERANCE})") + return False + +def test_dtype_support(): + """测试不同数据类型的支持度""" + print_header("1.a) 测试不同数据类型(dtype)的支持度") + + shape = (3, 4) + dtypes_pytorch = [torch.float16, torch.float32, torch.float64, torch.bfloat16] + dtypes_mindspore = [ms.float16, ms.float32, ms.float64, ms.bfloat16] + + for pt_dtype, ms_dtype in zip(dtypes_pytorch, dtypes_mindspore): + print(f"\n测试数据类型: PyTorch {pt_dtype}, MindSpore {ms_dtype}") + + # 生成随机输入 (0-1之间的值用于BCE) + np_input = np.random.random(shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + + try: + # PyTorch + pt_input = torch.tensor(np_input, dtype=pt_dtype) + pt_target = torch.tensor(np_target, dtype=pt_dtype) + pt_output = F.binary_cross_entropy(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}, shape: {pt_output.shape}") + pt_support = "支持" + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_support = "不支持" + + try: + # MindSpore + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_target = Tensor(np_target, dtype=ms_dtype) + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}, shape: {ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_support = "不支持" + + print(f"PyTorch: {pt_support}, MindSpore: {ms_support}") + +def test_random_inputs(): + """测试随机输入值的输出一致性""" + print_header("1.b) 测试随机输入值的输出一致性") + + shapes = [(2, 3), (3, 4, 5), (2, 3, 4, 5)] + + for shape in shapes: + print(f"\n测试shape: {shape}") + + # 生成随机输入 (0-1之间的值用于BCE) + np_input = np.random.random(shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.binary_cross_entropy(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}") + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}") + + compare_outputs(pt_output, ms_output) + + # 测试不同的reduction + for reduction in ['none', 'sum', 'mean']: + print(f"\n测试reduction: {reduction}") + + pt_output = F.binary_cross_entropy(pt_input, pt_target, reduction=reduction) + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, reduction=reduction) + + compare_outputs(pt_output, ms_output, f"reduction={reduction}") + +def test_param_support(): + """测试不同参数类型的支持度""" + print_header("1.c) 测试不同参数类型的支持度") + + shape = (3, 4) + np_input = np.random.random(shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + np_weight = np.random.random(shape).astype(np.float32) + + # 基本输入 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + pt_weight = torch.tensor(np_weight) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_weight = Tensor(np_weight, dtype=ms.float32) + + # 测试不同reduction参数 + reductions = ['none', 'sum', 'mean', 'INVALID'] + + for reduction in reductions: + print(f"\n测试reduction参数: '{reduction}'") + + try: + pt_output = F.binary_cross_entropy(pt_input, pt_target, weight=pt_weight, reduction=reduction) + print(f"PyTorch: 支持 reduction='{reduction}'") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, weight=ms_weight, reduction=reduction) + print(f"MindSpore: 支持 reduction='{reduction}'") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 测试权重参数 + print("\n测试weight参数:") + + # 有权重 + try: + pt_output = F.binary_cross_entropy(pt_input, pt_target, weight=pt_weight) + print("PyTorch: 支持带权重") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, weight=ms_weight) + print("MindSpore: 支持带权重") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 无权重 + try: + pt_output = F.binary_cross_entropy(pt_input, pt_target, weight=None) + print("PyTorch: 支持无权重") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target, weight=None) + print("MindSpore: 支持无权重") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + +def test_error_handling(): + """测试错误处理的准确性""" + print_header("1.d) 测试错误处理的准确性") + + # 测试输入和目标形状不匹配 + print("\n测试输入和目标形状不匹配:") + + pt_input = torch.rand(2, 3) + pt_target = torch.rand(3, 2) + + ms_input = Tensor(np.random.random((2, 3)), dtype=ms.float32) + ms_target = Tensor(np.random.random((3, 2)), dtype=ms.float32) + + try: + pt_output = F.binary_cross_entropy(pt_input, pt_target) + print("PyTorch结果:", pt_output.item()) + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_input, ms_target) + print("MindSpore结果:", ms_output.asnumpy().item()) + except Exception as e: + print(f"MindSpore错误: {str(e)}") + + # 测试错误的输入类型 + print("\n测试错误的输入类型:") + + # 字符串输入 + try: + pt_output = F.binary_cross_entropy(pt_input, "wrong_target") + print("PyTorch支持字符串输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_input, "wrong_target") + print("MindSpore支持字符串输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试超出范围的值 + print("\n测试超出[0,1]范围的值:") + + pt_bad_input = torch.tensor([[-1.0, 0.5], [0.8, 2.0]]) + pt_bad_target = torch.tensor([[0.1, 0.2], [0.3, 0.4]]) + + ms_bad_input = Tensor(np.array([[-1.0, 0.5], [0.8, 2.0]]), dtype=ms.float32) + ms_bad_target = Tensor(np.array([[0.1, 0.2], [0.3, 0.4]]), dtype=ms.float32) + + try: + pt_output = F.binary_cross_entropy(pt_bad_input, pt_bad_target) + print(f"PyTorch结果: {pt_output.item()}") + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy(ms_bad_input, ms_bad_target) + print(f"MindSpore结果: {ms_output.asnumpy().item()}") + except Exception as e: + print(f"MindSpore错误: {str(e)}") + +def test_nn_implementation(): + """测试神经网络实现""" + print_header("2.a/b) 测试神经网络实现和推理结果") + + # 简单的二分类网络 + class PTBinaryClassifier(torch.nn.Module): + def __init__(self, input_dim): + super(PTBinaryClassifier, self).__init__() + self.linear = torch.nn.Linear(input_dim, 1) + self.sigmoid = torch.nn.Sigmoid() + + def forward(self, x, target=None): + output = self.sigmoid(self.linear(x)) + if target is not None: + loss = F.binary_cross_entropy(output, target) + return loss + return output + + class MSBinaryClassifier(ms.nn.Cell): + def __init__(self, input_dim): + super(MSBinaryClassifier, self).__init__() + self.linear = ms.nn.Dense(input_dim, 1) + self.sigmoid = ms.nn.Sigmoid() + + def construct(self, x, target=None): + output = self.sigmoid(self.linear(x)) + if target is not None: + loss = mint_F.binary_cross_entropy(output, target) + return loss + return output + + # 固定输入和权重 + input_dim = 5 + batch_size = 3 + + np_input = np.random.random((batch_size, input_dim)).astype(np.float32) + np_target = np.random.random((batch_size, 1)).astype(np.float32) + + # 创建模型 + pt_model = PTBinaryClassifier(input_dim) + ms_model = MSBinaryClassifier(input_dim) + + # 固定权重 + np_weight = np.random.random((input_dim, 1)).astype(np.float32) + np_bias = np.random.random(1).astype(np.float32) + + pt_model.linear.weight.data = torch.tensor(np_weight).t() + pt_model.linear.bias.data = torch.tensor(np_bias) + + ms_model.linear.weight.set_data(Tensor(np_weight.T, dtype=ms.float32)) + ms_model.linear.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 前向传播测试 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 输出测试 + pt_output = pt_model(pt_input) + ms_output = ms_model(ms_input) + + print("测试模型输出:") + compare_outputs(pt_output, ms_output, "模型输出") + + # 损失测试 + pt_loss = pt_model(pt_input, pt_target) + ms_loss = ms_model(ms_input, ms_target) + + print("\n测试模型损失:") + compare_outputs(pt_loss, ms_loss, "损失值") + +def test_gradient(): + """测试反向传播和梯度计算""" + print_header("2.c) 测试反向传播和梯度计算") + + # 函数的梯度测试 + shape = (3, 4) + np_input = np.random.random(shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.binary_cross_entropy(pt_input, pt_target) + pt_output.backward() + pt_grad = pt_input.grad + + print("PyTorch梯度:") + print(f"形状: {pt_grad.shape}") + print(f"平均值: {pt_grad.mean().item()}") + + # MindSpore - 创建一个计算图用于计算梯度 + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + + def forward_fn(x, target): + return mint_F.binary_cross_entropy(x, target) + + grad_fn = ms.grad(forward_fn) + ms_grad = grad_fn(ms_input, ms_target) + + print("\nMindSpore梯度:") + print(f"形状: {ms_grad.shape}") + print(f"平均值: {ms_grad.asnumpy().mean()}") + + # 比较梯度 + compare_outputs(pt_grad, ms_grad, "梯度") + + # 神经网络的参数梯度测试 + print("\n测试神经网络参数梯度:") + + input_dim = 5 + hidden_dim = 3 + batch_size = 4 + + class PTSimpleNet(torch.nn.Module): + def __init__(self): + super(PTSimpleNet, self).__init__() + self.fc1 = torch.nn.Linear(input_dim, hidden_dim) + self.fc2 = torch.nn.Linear(hidden_dim, 1) + self.sigmoid = torch.nn.Sigmoid() + + def forward(self, x, target=None): + h = torch.nn.functional.relu(self.fc1(x)) + output = self.sigmoid(self.fc2(h)) + if target is not None: + loss = F.binary_cross_entropy(output, target) + return loss + return output + + class MSSimpleNet(ms.nn.Cell): + def __init__(self): + super(MSSimpleNet, self).__init__() + self.fc1 = ms.nn.Dense(input_dim, hidden_dim) + self.fc2 = ms.nn.Dense(hidden_dim, 1) + self.relu = ms.nn.ReLU() + self.sigmoid = ms.nn.Sigmoid() + + def construct(self, x, target=None): + h = self.relu(self.fc1(x)) + output = self.sigmoid(self.fc2(h)) + if target is not None: + loss = mint_F.binary_cross_entropy(output, target) + return loss + return output + + # 创建模型 + pt_net = PTSimpleNet() + ms_net = MSSimpleNet() + + # 固定权重 + np_fc1_weight = np.random.random((hidden_dim, input_dim)).astype(np.float32) + np_fc1_bias = np.random.random(hidden_dim).astype(np.float32) + np_fc2_weight = np.random.random((1, hidden_dim)).astype(np.float32) + np_fc2_bias = np.random.random(1).astype(np.float32) + + pt_net.fc1.weight.data = torch.tensor(np_fc1_weight) + pt_net.fc1.bias.data = torch.tensor(np_fc1_bias) + pt_net.fc2.weight.data = torch.tensor(np_fc2_weight) + pt_net.fc2.bias.data = torch.tensor(np_fc2_bias) + + ms_net.fc1.weight.set_data(Tensor(np_fc1_weight, dtype=ms.float32)) + ms_net.fc1.bias.set_data(Tensor(np_fc1_bias, dtype=ms.float32)) + ms_net.fc2.weight.set_data(Tensor(np_fc2_weight, dtype=ms.float32)) + ms_net.fc2.bias.set_data(Tensor(np_fc2_bias, dtype=ms.float32)) + + # 准备输入和目标 + np_net_input = np.random.random((batch_size, input_dim)).astype(np.float32) + np_net_target = np.random.random((batch_size, 1)).astype(np.float32) + + pt_net_input = torch.tensor(np_net_input) + pt_net_target = torch.tensor(np_net_target) + + ms_net_input = Tensor(np_net_input, dtype=ms.float32) + ms_net_target = Tensor(np_net_target, dtype=ms.float32) + + # PyTorch计算梯度 + pt_optimizer = torch.optim.SGD(pt_net.parameters(), lr=0.1) + pt_optimizer.zero_grad() + pt_loss = pt_net(pt_net_input, pt_net_target) + pt_loss.backward() + + # 获取PyTorch梯度 + pt_fc1_weight_grad = pt_net.fc1.weight.grad.numpy() + pt_fc1_bias_grad = pt_net.fc1.bias.grad.numpy() + pt_fc2_weight_grad = pt_net.fc2.weight.grad.numpy() + pt_fc2_bias_grad = pt_net.fc2.bias.grad.numpy() + + print("PyTorch网络梯度:") + print(f"fc1.weight梯度平均值: {pt_fc1_weight_grad.mean()}") + print(f"fc1.bias梯度平均值: {pt_fc1_bias_grad.mean()}") + print(f"fc2.weight梯度平均值: {pt_fc2_weight_grad.mean()}") + print(f"fc2.bias梯度平均值: {pt_fc2_bias_grad.mean()}") + + # MindSpore计算梯度 + def ms_forward_fn(inputs, targets): + return ms_net(inputs, targets) + + ms_grad_fn = ms.value_and_grad(ms_forward_fn, None, ms_net.trainable_params()) + ms_loss, ms_grads = ms_grad_fn(ms_net_input, ms_net_target) + + print("\nMindSpore网络梯度:") + for i, param in enumerate(ms_net.trainable_params()): + print(f"{param.name} 梯度平均值: {ms_grads[i].asnumpy().mean()}") + + # 比较具体参数的梯度 (注意可能需要转置比较) + print("\n比较fc2.weight梯度:") + ms_fc2_weight_grad = None + for i, param in enumerate(ms_net.trainable_params()): + if 'fc2.weight' in param.name: + ms_fc2_weight_grad = ms_grads[i] + break + + if ms_fc2_weight_grad is not None: + # 注意MindSpore和PyTorch的权重矩阵可能需要转置后比较 + ms_fc2_weight_grad_np = ms_fc2_weight_grad.asnumpy() + max_diff = np.max(np.abs(ms_fc2_weight_grad_np - pt_fc2_weight_grad)) + print(f"fc2.weight梯度最大差异: {max_diff}") + if max_diff < TOLERANCE: + print(f"✓ fc2.weight梯度在容差范围内一致 (< {TOLERANCE})") + else: + print(f"✗ fc2.weight梯度超出容差范围 (> {TOLERANCE})") + +if __name__ == "__main__": + # 运行所有测试 + test_dtype_support() + test_random_inputs() + test_param_support() + # test_error_handling() + test_nn_implementation() + test_gradient() + test_error_handling() diff --git a/test/test_binary_cross_entropy_with_logits.py b/test/test_binary_cross_entropy_with_logits.py new file mode 100644 index 0000000..eab86ff --- /dev/null +++ b/test/test_binary_cross_entropy_with_logits.py @@ -0,0 +1,616 @@ +''' +dtype: + pytorch - float16, float32, float64, bfloat16 + mindspore - float16, float32, bfloat16 + +pos_weight: + pos_weight shape与target shape不一致时 mindspore直到使用输出值时才报错 + 代码: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, pos_weight=ms_wrong_pos_weight) + print("MindSpore支持不匹配的pos_weight尺寸") + print(f"结果为{ms_output}") # 在这一行才报错 +''' + + +import numpy as np +import torch +import torch.nn.functional as F +import mindspore as ms +from mindspore import Tensor +import mindspore.mint.nn.functional as mint_F +import traceback + +# 设置全局精度容差 +TOLERANCE = 1e-3 + +def print_header(title): + print(f"\n{'='*80}\n{title}\n{'='*80}") + +def compare_outputs(pytorch_out, mindspore_out, name="输出"): + """比较两个框架的输出是否在容差范围内""" + pytorch_np = pytorch_out.detach().cpu().numpy() + mindspore_np = mindspore_out.asnumpy() + + max_diff = np.max(np.abs(pytorch_np - mindspore_np)) + mean_diff = np.mean(np.abs(pytorch_np - mindspore_np)) + + print(f"{name} 最大差异: {max_diff}") + print(f"{name} 平均差异: {mean_diff}") + + if max_diff < TOLERANCE: + print(f"✓ {name}在容差范围内一致 (< {TOLERANCE})") + return True + else: + print(f"✗ {name}超出容差范围 (> {TOLERANCE})") + return False + +def test_dtype_support(): + """测试不同数据类型的支持度""" + print_header("1.a) 测试不同数据类型(dtype)的支持度") + + shape = (3, 4) + # 添加bf16数据类型 + dtypes_pytorch = [torch.float16, torch.float32, torch.float64, torch.bfloat16] + dtypes_mindspore = [ms.float16, ms.float32, ms.float64, ms.bfloat16] + dtype_names = ["float16", "float32", "float64", "bfloat16"] + + for pt_dtype, ms_dtype, dtype_name in zip(dtypes_pytorch, dtypes_mindspore, dtype_names): + print(f"\n测试数据类型: PyTorch {dtype_name}, MindSpore {dtype_name}") + + # 生成随机输入 (对于logits可以是任何实数) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) # 目标在[0,1]范围内 + + try: + # PyTorch + pt_input = torch.tensor(np_input, dtype=pt_dtype) + pt_target = torch.tensor(np_target, dtype=pt_dtype) + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}, shape: {pt_output.shape}") + pt_support = "支持" + except Exception as e: + print(f"PyTorch 错误: {type(e).__name__}: {str(e)}") + pt_support = "不支持" + + try: + # MindSpore + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_target = Tensor(np_target, dtype=ms_dtype) + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}, shape: {ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {type(e).__name__}: {str(e)}") + ms_support = "不支持" + + print(f"PyTorch {dtype_name}: {pt_support}, MindSpore {dtype_name}: {ms_support}") + + +def test_random_inputs(): + """测试随机输入值的输出一致性""" + print_header("1.b) 测试随机输入值的输出一致性") + + shapes = [(2, 3), (3, 4, 5), (2, 3, 4, 5)] + + for shape in shapes: + print(f"\n测试shape: {shape}") + + # 生成随机输入 + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}") + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}") + + compare_outputs(pt_output, ms_output) + + # 测试不同的reduction + for reduction in ['none', 'sum', 'mean']: + print(f"\n测试reduction: {reduction}") + + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, reduction=reduction) + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, reduction=reduction) + + compare_outputs(pt_output, ms_output, f"reduction={reduction}") + +def test_param_support(): + """测试不同参数类型的支持度""" + print_header("1.c) 测试不同参数类型的支持度") + + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) + np_weight = np.random.random(shape).astype(np.float32) + np_pos_weight = np.random.random(shape[1:]).astype(np.float32) * 2 # pos_weight通常在最后一维匹配 + + # 基本输入 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + pt_weight = torch.tensor(np_weight) + pt_pos_weight = torch.tensor(np_pos_weight) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_weight = Tensor(np_weight, dtype=ms.float32) + ms_pos_weight = Tensor(np_pos_weight, dtype=ms.float32) + + # 测试不同reduction参数 + reductions = ['none', 'sum', 'mean', 'INVALID'] + + for reduction in reductions: + print(f"\n测试reduction参数: '{reduction}'") + + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, weight=pt_weight, reduction=reduction) + print(f"PyTorch: 支持 reduction='{reduction}'") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, weight=ms_weight, reduction=reduction) + print(f"MindSpore: 支持 reduction='{reduction}'") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 测试权重参数 + print("\n测试weight参数:") + + # 使用随机权重进行多次测试 + num_tests = 3 # 测试3组不同的随机权重 + for i in range(num_tests): + # 生成新的随机权重 + np_random_weight = np.random.random(shape).astype(np.float32) + np_random_pos_weight = np.random.random(shape[1:]).astype(np.float32) * 5.0 # 0~5范围的随机值 + + pt_random_weight = torch.tensor(np_random_weight) + ms_random_weight = Tensor(np_random_weight, dtype=ms.float32) + pt_random_pos_weight = torch.tensor(np_random_pos_weight) + ms_random_pos_weight = Tensor(np_random_pos_weight, dtype=ms.float32) + + print(f"\n测试随机weight组 #{i+1}:") + print(f"随机weight平均值: {np_random_weight.mean():.4f}") + + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, weight=pt_random_weight) + print(f"PyTorch weight输出: {pt_output.item():.6f}") + pt_weight_ok = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_weight_ok = False + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, weight=ms_random_weight) + print(f"MindSpore weight输出: {ms_output.asnumpy().item():.6f}") + ms_weight_ok = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_weight_ok = False + + if pt_weight_ok and ms_weight_ok: + compare_outputs(pt_output, ms_output, f"随机weight #{i+1}输出") + + # 测试pos_weight + print(f"\n测试随机pos_weight组 #{i+1}:") + print(f"随机pos_weight平均值: {np_random_pos_weight.mean():.4f}") + + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, pos_weight=pt_random_pos_weight) + print(f"PyTorch pos_weight输出: {pt_output.item():.6f}") + pt_pos_weight_ok = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_pos_weight_ok = False + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, pos_weight=ms_random_pos_weight) + print(f"MindSpore pos_weight输出: {ms_output.asnumpy().item():.6f}") + ms_pos_weight_ok = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_pos_weight_ok = False + + if pt_pos_weight_ok and ms_pos_weight_ok: + compare_outputs(pt_output, ms_output, f"随机pos_weight #{i+1}输出") + + # 同时测试weight和pos_weight + print(f"\n同时测试随机weight和pos_weight组 #{i+1}:") + + try: + pt_output = F.binary_cross_entropy_with_logits( + pt_input, pt_target, weight=pt_random_weight, pos_weight=pt_random_pos_weight) + print(f"PyTorch weight+pos_weight输出: {pt_output.item():.6f}") + pt_both_ok = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_both_ok = False + + try: + ms_output = mint_F.binary_cross_entropy_with_logits( + ms_input, ms_target, weight=ms_random_weight, pos_weight=ms_random_pos_weight) + print(f"MindSpore weight+pos_weight输出: {ms_output.asnumpy().item():.6f}") + ms_both_ok = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_both_ok = False + + if pt_both_ok and ms_both_ok: + compare_outputs(pt_output, ms_output, f"随机weight+pos_weight #{i+1}输出") + + # 基本的支持测试 + print("\n基本权重支持测试:") + + # 有权重 + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, weight=pt_weight) + print("PyTorch: 支持带权重") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, weight=ms_weight) + print("MindSpore: 支持带权重") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 无权重 + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, weight=None) + print("PyTorch: 支持无权重") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, weight=None) + print("MindSpore: 支持无权重") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + +def test_error_handling(): + """测试错误处理的准确性""" + print_header("1.d) 测试错误处理的准确性") + + # 测试输入和目标形状不匹配 + print("\n测试输入和目标形状不匹配:") + + pt_input = torch.randn(2, 3) + pt_target = torch.randn(3, 2) + + ms_input = Tensor(np.random.randn(2, 3), dtype=ms.float32) + ms_target = Tensor(np.random.randn(3, 2), dtype=ms.float32) + + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target) + print("PyTorch结果:", pt_output.item()) + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target) + print("MindSpore结果:", ms_output.asnumpy().item()) + except Exception as e: + print(f"MindSpore错误: {str(e)}") + + # 测试错误的输入类型 + print("\n测试错误的输入类型:") + + # 字符串输入 + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, "wrong_target") + print("PyTorch支持字符串输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, "wrong_target") + print("MindSpore支持字符串输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试不正确尺寸的pos_weight (binary_cross_entropy_with_logits特有) + print("\n测试不正确尺寸的pos_weight:") + + pt_input = torch.randn(2, 3) + pt_target = torch.rand(2, 3) + pt_wrong_pos_weight = torch.rand(2, 4) # 错误的尺寸 + + ms_input = Tensor(np.random.randn(2, 3), dtype=ms.float32) + ms_target = Tensor(np.random.rand(2, 3), dtype=ms.float32) + ms_wrong_pos_weight = Tensor(np.random.rand(2, 4), dtype=ms.float32) # 错误的尺寸 + + try: + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, pos_weight=pt_wrong_pos_weight) + print("PyTorch支持不匹配的pos_weight尺寸") + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mint_F.binary_cross_entropy_with_logits(ms_input, ms_target, pos_weight=ms_wrong_pos_weight) + print("MindSpore支持不匹配的pos_weight尺寸") + print(f"结果为{ms_output}") + except Exception as e: + print(f"MindSpore错误: {str(e)}") + +def test_nn_implementation(): + """测试神经网络实现""" + print_header("2.a/b) 测试神经网络实现和推理结果") + + # 简单的二分类网络 + class PTBinaryClassifier(torch.nn.Module): + def __init__(self, input_dim): + super(PTBinaryClassifier, self).__init__() + self.linear = torch.nn.Linear(input_dim, 1) + + def forward(self, x, target=None): + logits = self.linear(x) + if target is not None: + loss = F.binary_cross_entropy_with_logits(logits, target) + return loss + return logits + + class MSBinaryClassifier(ms.nn.Cell): + def __init__(self, input_dim): + super(MSBinaryClassifier, self).__init__() + self.linear = ms.nn.Dense(input_dim, 1) + + def construct(self, x, target=None): + logits = self.linear(x) + if target is not None: + loss = mint_F.binary_cross_entropy_with_logits(logits, target) + return loss + return logits + + # 固定输入和权重 + input_dim = 5 + batch_size = 3 + + np_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_target = np.random.random((batch_size, 1)).astype(np.float32) + + # 创建模型 + pt_model = PTBinaryClassifier(input_dim) + ms_model = MSBinaryClassifier(input_dim) + + # 固定权重 + np_weight = np.random.randn(input_dim, 1).astype(np.float32) + np_bias = np.random.randn(1).astype(np.float32) + + pt_model.linear.weight.data = torch.tensor(np_weight).t() + pt_model.linear.bias.data = torch.tensor(np_bias) + + ms_model.linear.weight.set_data(Tensor(np_weight.T, dtype=ms.float32)) + ms_model.linear.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 前向传播测试 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 输出测试 + pt_output = pt_model(pt_input) + ms_output = ms_model(ms_input) + + print("测试模型输出 (logits):") + compare_outputs(pt_output, ms_output, "模型输出") + + # 损失测试 + pt_loss = pt_model(pt_input, pt_target) + ms_loss = ms_model(ms_input, ms_target) + + print("\n测试模型损失:") + compare_outputs(pt_loss, ms_loss, "损失值") + +def test_gradient(): + """测试反向传播和梯度计算""" + print_header("2.c) 测试反向传播和梯度计算") + + # 函数的梯度测试 + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.random(shape).astype(np.float32) # 使用浮点值作为目标 + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target) + pt_output.backward() + pt_grad = pt_input.grad + + print("PyTorch梯度:") + print(f"形状: {pt_grad.shape}") + print(f"平均值: {pt_grad.mean().item()}") + + # MindSpore - 创建一个计算图用于计算梯度 + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + + def forward_fn(x, target): + return mint_F.binary_cross_entropy_with_logits(x, target) + + grad_fn = ms.grad(forward_fn) + ms_grad = grad_fn(ms_input, ms_target) + + print("\nMindSpore梯度:") + print(f"形状: {ms_grad.shape}") + print(f"平均值: {ms_grad.asnumpy().mean()}") + + # 比较梯度 + compare_outputs(pt_grad, ms_grad, "梯度") + + # 测试带权重的梯度 + print("\n测试带权重的梯度:") + np_weight = np.random.random(shape).astype(np.float32) + + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_weight = torch.tensor(np_weight) + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, weight=pt_weight) + pt_output.backward() + pt_grad_weighted = pt_input.grad + + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_weight = Tensor(np_weight, dtype=ms.float32) + + def forward_fn_weighted(x, target, weight): + return mint_F.binary_cross_entropy_with_logits(x, target, weight=weight) + + grad_fn_weighted = ms.grad(forward_fn_weighted, 0) + ms_grad_weighted = grad_fn_weighted(ms_input, ms_target, ms_weight) + + compare_outputs(pt_grad_weighted, ms_grad_weighted, "带权重的梯度") + + # 测试带pos_weight的梯度 (binary_cross_entropy_with_logits特有) + print("\n测试带pos_weight的梯度:") + np_pos_weight = np.random.random(shape[1:]).astype(np.float32) * 2 + + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_pos_weight = torch.tensor(np_pos_weight) + pt_output = F.binary_cross_entropy_with_logits(pt_input, pt_target, pos_weight=pt_pos_weight) + pt_output.backward() + pt_grad_pos_weighted = pt_input.grad + + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_pos_weight = Tensor(np_pos_weight, dtype=ms.float32) + + def forward_fn_pos_weighted(x, target, pos_weight): + return mint_F.binary_cross_entropy_with_logits(x, target, pos_weight=pos_weight) + + grad_fn_pos_weighted = ms.grad(forward_fn_pos_weighted, 0) + ms_grad_pos_weighted = grad_fn_pos_weighted(ms_input, ms_target, ms_pos_weight) + + compare_outputs(pt_grad_pos_weighted, ms_grad_pos_weighted, "带pos_weight的梯度") + + # 神经网络的参数梯度测试 + print("\n测试神经网络参数梯度:") + + input_dim = 5 + hidden_dim = 3 + batch_size = 4 + + class PTSimpleNet(torch.nn.Module): + def __init__(self): + super(PTSimpleNet, self).__init__() + self.fc1 = torch.nn.Linear(input_dim, hidden_dim) + self.fc2 = torch.nn.Linear(hidden_dim, 1) + + def forward(self, x, target=None): + h = torch.nn.functional.relu(self.fc1(x)) + logits = self.fc2(h) + if target is not None: + loss = F.binary_cross_entropy_with_logits(logits, target) + return loss + return logits + + class MSSimpleNet(ms.nn.Cell): + def __init__(self): + super(MSSimpleNet, self).__init__() + self.fc1 = ms.nn.Dense(input_dim, hidden_dim) + self.fc2 = ms.nn.Dense(hidden_dim, 1) + self.relu = ms.nn.ReLU() + + def construct(self, x, target=None): + h = self.relu(self.fc1(x)) + logits = self.fc2(h) + if target is not None: + loss = mint_F.binary_cross_entropy_with_logits(logits, target) + return loss + return logits + + # 创建模型 + pt_net = PTSimpleNet() + ms_net = MSSimpleNet() + + # 固定权重 + np_fc1_weight = np.random.randn(hidden_dim, input_dim).astype(np.float32) + np_fc1_bias = np.random.randn(hidden_dim).astype(np.float32) + np_fc2_weight = np.random.randn(1, hidden_dim).astype(np.float32) + np_fc2_bias = np.random.randn(1).astype(np.float32) + + pt_net.fc1.weight.data = torch.tensor(np_fc1_weight) + pt_net.fc1.bias.data = torch.tensor(np_fc1_bias) + pt_net.fc2.weight.data = torch.tensor(np_fc2_weight) + pt_net.fc2.bias.data = torch.tensor(np_fc2_bias) + + ms_net.fc1.weight.set_data(Tensor(np_fc1_weight, dtype=ms.float32)) + ms_net.fc1.bias.set_data(Tensor(np_fc1_bias, dtype=ms.float32)) + ms_net.fc2.weight.set_data(Tensor(np_fc2_weight, dtype=ms.float32)) + ms_net.fc2.bias.set_data(Tensor(np_fc2_bias, dtype=ms.float32)) + + # 准备输入和目标 + np_net_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_net_target = np.random.random((batch_size, 1)).astype(np.float32) + + pt_net_input = torch.tensor(np_net_input) + pt_net_target = torch.tensor(np_net_target) + + ms_net_input = Tensor(np_net_input, dtype=ms.float32) + ms_net_target = Tensor(np_net_target, dtype=ms.float32) + + # PyTorch计算梯度 + pt_optimizer = torch.optim.SGD(pt_net.parameters(), lr=0.1) + pt_optimizer.zero_grad() + pt_loss = pt_net(pt_net_input, pt_net_target) + pt_loss.backward() + + # 获取PyTorch梯度 + pt_fc1_weight_grad = pt_net.fc1.weight.grad.numpy() + pt_fc1_bias_grad = pt_net.fc1.bias.grad.numpy() + pt_fc2_weight_grad = pt_net.fc2.weight.grad.numpy() + pt_fc2_bias_grad = pt_net.fc2.bias.grad.numpy() + + print("PyTorch网络梯度:") + print(f"fc1.weight梯度平均值: {pt_fc1_weight_grad.mean()}") + print(f"fc1.bias梯度平均值: {pt_fc1_bias_grad.mean()}") + print(f"fc2.weight梯度平均值: {pt_fc2_weight_grad.mean()}") + print(f"fc2.bias梯度平均值: {pt_fc2_bias_grad.mean()}") + + # MindSpore计算梯度 + def ms_forward_fn(inputs, targets): + return ms_net(inputs, targets) + + ms_grad_fn = ms.value_and_grad(ms_forward_fn, None, ms_net.trainable_params()) + ms_loss, ms_grads = ms_grad_fn(ms_net_input, ms_net_target) + + print("\nMindSpore网络梯度:") + for i, param in enumerate(ms_net.trainable_params()): + print(f"{param.name} 梯度平均值: {ms_grads[i].asnumpy().mean()}") + + # 比较具体参数的梯度 + print("\n比较fc2.weight梯度:") + ms_fc2_weight_grad = None + for i, param in enumerate(ms_net.trainable_params()): + if 'fc2.weight' in param.name: + ms_fc2_weight_grad = ms_grads[i] + break + + if ms_fc2_weight_grad is not None: + # 注意MindSpore和PyTorch的权重矩阵可能需要转置后比较 + ms_fc2_weight_grad_np = ms_fc2_weight_grad.asnumpy() + max_diff = np.max(np.abs(ms_fc2_weight_grad_np - pt_fc2_weight_grad)) + print(f"fc2.weight梯度最大差异: {max_diff}") + if max_diff < TOLERANCE: + print(f"✓ fc2.weight梯度在容差范围内一致 (< {TOLERANCE})") + else: + print(f"✗ fc2.weight梯度超出容差范围 (> {TOLERANCE})") + +if __name__ == "__main__": + # 运行所有测试 + test_dtype_support() + test_random_inputs() + test_param_support() + test_error_handling() + test_nn_implementation() + test_gradient() diff --git a/test/test_grid_sample.py b/test/test_grid_sample.py new file mode 100644 index 0000000..29f81e1 --- /dev/null +++ b/test/test_grid_sample.py @@ -0,0 +1,913 @@ +''' +dtype: + pytorch - float32, float64 + mindspore - float32, float64 + +mode: + pytorch - bilinear, nearest, bicubic + mindspore - bilinear, nearest + +grid: + grid参数最后一维不为2时,mindspore直到使用输出值时才爆错 + 代码: + ms_output = mint_F.grid_sample(ms_input, ms_wrong_grid_dim) + print("MindSpore支持网格维度不是2") + print(ms_output) + +报错信息: + PyTorch错误: ValueError: nn.functional.grid_sample(): expected padding_mode to be 'zeros', 'border', or 'reflection', but got: 'invalid_padding' + MindSpore错误: ValueError: Failed to convert the value "invalid_padding" of input 'padding_mode' of 'GridSampler2D' to enum. + + PyTorch错误: ValueError: nn.functional.grid_sample(): expected mode to be 'bilinear', 'nearest' or 'bicubic', but got: 'invalid_mode' + MindSpore错误: ValueError: Failed to convert the value "invalid_mode" of input 'interpolation_mode' of 'GridSampler2D' to enum. + +''' + +import numpy as np +import torch +import torch.nn.functional as F +import mindspore as ms +from mindspore import Tensor +import mindspore.mint.nn.functional as mint_F +import traceback + +# 设置全局精度容差 +TOLERANCE = 1e-3 + +def print_header(title): + print(f"\n{'='*80}\n{title}\n{'='*80}") + +def compare_outputs(pytorch_out, mindspore_out, name="输出"): + """比较两个框架的输出是否在容差范围内""" + pytorch_np = pytorch_out.detach().cpu().numpy() + mindspore_np = mindspore_out.asnumpy() + + max_diff = np.max(np.abs(pytorch_np - mindspore_np)) + mean_diff = np.mean(np.abs(pytorch_np - mindspore_np)) + + print(f"{name} 最大差异: {max_diff}") + print(f"{name} 平均差异: {mean_diff}") + + if max_diff < TOLERANCE: + print(f"✓ {name}在容差范围内一致 (< {TOLERANCE})") + return True + else: + print(f"✗ {name}超出容差范围 (> {TOLERANCE})") + return False + +# 定义 MindSpore 的 grid_sample 替代函数,以防原始函数不可用 +def mindspore_grid_sample(input, grid, mode='bilinear', padding_mode='zeros', align_corners=False): + raise NotImplementedError("手动实现grid_sample非常复杂,本测试使用内置函数") + +def test_dtype_support(): + """测试不同数据类型的支持度""" + print_header("1.a) 测试不同数据类型(dtype)的支持度") + + batch_size = 2 + channels = 3 + height = 8 + width = 8 + + # 创建网格大小 + grid_height = 6 + grid_width = 6 + + # dtypes_pytorch = [torch.bfloat16, torch.float16, torch.float32, torch.float64] + # dtypes_mindspore = [ms.bfloat16, ms.float16, ms.float32, ms.float64] + # dtype_names = ["bfloat16", "float16", "float32", "float64"] + dtypes_pytorch = [torch.float16, torch.float32, torch.float64] + dtypes_mindspore = [ms.float16, ms.float32, ms.float64] + dtype_names = ["float16", "float32", "float64"] + + for pt_dtype, ms_dtype, dtype_name in zip(dtypes_pytorch, dtypes_mindspore, dtype_names): + print(f"\n测试数据类型: PyTorch {dtype_name}, MindSpore {dtype_name}") + + # 生成随机输入 + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + + # 创建采样网格:范围在[-1,1]之间的随机值 + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + try: + # PyTorch + pt_input = torch.tensor(np_input, dtype=pt_dtype) + pt_grid = torch.tensor(np_grid, dtype=pt_dtype) + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + print(f"PyTorch 输出: shape={pt_output.shape}") + pt_support = "支持" + except Exception as e: + print(f"PyTorch 错误: {type(e).__name__}: {str(e)}") + pt_support = "不支持" + + try: + # 首先尝试 mint.nn.functional.grid_sample + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_grid = Tensor(np_grid, dtype=ms_dtype) + ms_using_mint = True + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + except (AttributeError, RuntimeError) as e: + print(f"mint.nn.functional.grid_sample 不可用: {type(e).__name__}: {str(e)}") + print("使用替代实现...") + ms_output = mindspore_grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + ms_using_mint = False + + print(f"MindSpore 输出 ({'mint API' if ms_using_mint else '替代实现'}): shape={ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {type(e).__name__}: {str(e)}") + ms_support = "不支持" + + print(f"PyTorch {dtype_name}: {pt_support}, MindSpore {dtype_name}: {ms_support}") + +def test_random_inputs(): + """测试随机输入值的输出一致性""" + print_header("1.b) 测试随机输入值的输出一致性") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 不同的输入尺寸 + input_shapes = [ + (2, 1, 8, 8), # 单通道 + (2, 3, 16, 16), # 三通道,较大尺寸 + (1, 3, 32, 32) # 批次为1 + ] + + grid_shapes = [ + (2, 6, 6, 2), # 较小采样网格 + (2, 10, 10, 2), # 较大采样网格 + (1, 16, 16, 2) # 批次为1 + ] + + for i, (input_shape, grid_shape) in enumerate(zip(input_shapes, grid_shapes)): + print(f"\n测试输入尺寸 #{i+1}: 输入={input_shape}, 网格={grid_shape}") + + # 生成随机输入 + np_input = np.random.randn(*input_shape).astype(np.float32) + np_grid = np.random.uniform(-1, 1, grid_shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input) + pt_grid = torch.tensor(np_grid) + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + print(f"PyTorch 输出: shape={pt_output.shape}") + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + print(f"MindSpore 输出: shape={ms_output.shape}") + + compare_outputs(pt_output, ms_output, f"输入尺寸 #{i+1}") + +def test_param_support(): + """测试不同参数的支持度""" + print_header("1.c) 测试不同参数的支持度") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 固定输入形状 + batch_size = 2 + channels = 3 + height = 16 + width = 16 + grid_height = 10 + grid_width = 10 + + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + pt_input = torch.tensor(np_input) + pt_grid = torch.tensor(np_grid) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + + # 测试不同的插值模式 (mode) + modes = ['bilinear', 'nearest', 'bicubic'] + + for mode in modes: + print(f"\n测试mode='{mode}':") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, mode=mode, padding_mode='zeros', align_corners=False) + print(f"PyTorch mode='{mode}': 支持") + pt_mode_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_mode_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode=mode, padding_mode='zeros', align_corners=False) + print(f"MindSpore mode='{mode}': 支持") + ms_mode_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_mode_supported = False + + if pt_mode_supported and ms_mode_supported: + compare_outputs(pt_output, ms_output, f"mode='{mode}'") + + # 测试不同的填充模式 (padding_mode) + padding_modes = ['zeros', 'border', 'reflection'] + + for padding_mode in padding_modes: + print(f"\n测试padding_mode='{padding_mode}':") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode=padding_mode, align_corners=False) + print(f"PyTorch padding_mode='{padding_mode}': 支持") + pt_padding_mode_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_padding_mode_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode=padding_mode, align_corners=False) + print(f"MindSpore padding_mode='{padding_mode}': 支持") + ms_padding_mode_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_padding_mode_supported = False + + if pt_padding_mode_supported and ms_padding_mode_supported: + compare_outputs(pt_output, ms_output, f"padding_mode='{padding_mode}'") + + # 测试align_corners参数 + for align_corners in [True, False]: + print(f"\n测试align_corners={align_corners}:") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode='zeros', align_corners=align_corners) + print(f"PyTorch align_corners={align_corners}: 支持") + pt_align_corners_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_align_corners_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=align_corners) + print(f"MindSpore align_corners={align_corners}: 支持") + ms_align_corners_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_align_corners_supported = False + + if pt_align_corners_supported and ms_align_corners_supported: + compare_outputs(pt_output, ms_output, f"align_corners={align_corners}") + + # 测试各种参数组合 + param_combinations = [ + {'mode': 'bilinear', 'padding_mode': 'zeros', 'align_corners': False}, + {'mode': 'nearest', 'padding_mode': 'border', 'align_corners': True}, + {'mode': 'bicubic', 'padding_mode': 'reflection', 'align_corners': True} + ] + + for i, params in enumerate(param_combinations): + print(f"\n测试参数组合 #{i+1}: {params}") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, **params) + print(f"PyTorch 参数组合 #{i+1}: 支持") + pt_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, **params) + print(f"MindSpore 参数组合 #{i+1}: 支持") + ms_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_supported = False + + if pt_supported and ms_supported: + compare_outputs(pt_output, ms_output, f"参数组合 #{i+1}") + +def test_error_handling(): + """测试错误处理的准确性""" + print_header("1.d) 测试错误处理的准确性") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 创建基本有效输入 + batch_size = 2 + channels = 3 + height = 8 + width = 8 + grid_height = 6 + grid_width = 6 + + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + pt_input = torch.tensor(np_input) + pt_grid = torch.tensor(np_grid) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + + # 测试批次大小不匹配 + print("\n测试批次大小不匹配:") + + np_wrong_grid = np.random.uniform(-1, 1, (batch_size+1, grid_height, grid_width, 2)).astype(np.float32) + pt_wrong_grid = torch.tensor(np_wrong_grid) + ms_wrong_grid = Tensor(np_wrong_grid, dtype=ms.float32) + + try: + pt_output = F.grid_sample(pt_input, pt_wrong_grid) + print(f"PyTorch支持批次大小不匹配: 输出shape={pt_output.shape}") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.grid_sample(ms_input, ms_wrong_grid) + print(f"MindSpore支持批次大小不匹配: 输出shape={ms_output.shape}") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试网格最后一维不是2 + print("\n测试网格最后一维不是2:") + + np_wrong_grid_dim = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 3)).astype(np.float32) + pt_wrong_grid_dim = torch.tensor(np_wrong_grid_dim) + ms_wrong_grid_dim = Tensor(np_wrong_grid_dim, dtype=ms.float32) + + try: + pt_output = F.grid_sample(pt_input, pt_wrong_grid_dim) + print("PyTorch支持网格维度不是2") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.grid_sample(ms_input, ms_wrong_grid_dim) + print("MindSpore支持网格维度不是2") + print(ms_output) + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试无效的插值模式 + print("\n测试无效的插值模式:") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, mode='invalid_mode') + print("PyTorch支持无效的插值模式") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='invalid_mode') + print("MindSpore支持无效的插值模式") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试无效的填充模式 + print("\n测试无效的填充模式:") + + try: + pt_output = F.grid_sample(pt_input, pt_grid, padding_mode='invalid_padding') + print("PyTorch支持无效的填充模式") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.grid_sample(ms_input, ms_grid, padding_mode='invalid_padding') + print("MindSpore支持无效的填充模式") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + +def test_specific_cases(): + """测试特定的边界情况""" + print_header("1.e) 测试特定的边界情况") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + batch_size = 2 + channels = 3 + height = 8 + width = 8 + grid_height = 6 + grid_width = 6 + + # 创建基本有效输入 + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + + # 测试极限值的网格 + print("\n测试网格坐标为极限值 (-1, -1) 和 (1, 1):") + + # 创建坐标为极限值的网格 + np_extreme_grid = np.zeros((batch_size, grid_height, grid_width, 2), dtype=np.float32) + # 左上角为 (-1, -1) + np_extreme_grid[0, 0, 0] = [-1, -1] + # 右下角为 (1, 1) + np_extreme_grid[0, -1, -1] = [1, 1] + + pt_input = torch.tensor(np_input) + pt_extreme_grid = torch.tensor(np_extreme_grid) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_extreme_grid = Tensor(np_extreme_grid, dtype=ms.float32) + + try: + pt_output = F.grid_sample(pt_input, pt_extreme_grid) + print(f"PyTorch 支持极限值网格, 输出shape={pt_output.shape}") + pt_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_extreme_grid) + print(f"MindSpore 支持极限值网格, 输出shape={ms_output.shape}") + ms_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_supported = False + + if pt_supported and ms_supported: + compare_outputs(pt_output, ms_output, "极限值网格") + + # 测试超出范围的网格坐标 + print("\n测试超出范围的网格坐标:") + + # 创建超出[-1,1]范围的网格 + np_out_of_range_grid = np.random.uniform(-2, 2, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + pt_out_of_range_grid = torch.tensor(np_out_of_range_grid) + ms_out_of_range_grid = Tensor(np_out_of_range_grid, dtype=ms.float32) + + try: + pt_output = F.grid_sample(pt_input, pt_out_of_range_grid) + print(f"PyTorch 支持超出范围的网格坐标, 输出shape={pt_output.shape}") + pt_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_out_of_range_grid) + print(f"MindSpore 支持超出范围的网格坐标, 输出shape={ms_output.shape}") + ms_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_supported = False + + if pt_supported and ms_supported: + compare_outputs(pt_output, ms_output, "超出范围的网格坐标") + + # 测试网格中包含NaN值 + print("\n测试网格中包含NaN值:") + + # 创建包含NaN的网格 + np_nan_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + np_nan_grid[0, 0, 0, 0] = np.nan # 设置一个NaN值 + + pt_nan_grid = torch.tensor(np_nan_grid) + ms_nan_grid = Tensor(np_nan_grid, dtype=ms.float32) + + try: + pt_output = F.grid_sample(pt_input, pt_nan_grid) + print(f"PyTorch 支持网格中包含NaN值, 输出shape={pt_output.shape}") + pt_supported = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_supported = False + + try: + ms_output = mint_F.grid_sample(ms_input, ms_nan_grid) + print(f"MindSpore 支持网格中包含NaN值, 输出shape={ms_output.shape}") + ms_supported = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_supported = False + + if pt_supported and ms_supported: + compare_outputs(pt_output, ms_output, "网格中包含NaN值") + +def test_nn_implementation(): + """测试神经网络实现""" + print_header("2.a/b) 测试神经网络实现和推理结果") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 定义使用grid_sample的简单网络 + class PTSpatialTransformerNet(torch.nn.Module): + def __init__(self): + super(PTSpatialTransformerNet, self).__init__() + self.conv = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) + + def forward(self, x, grid): + x = self.conv(x) + return F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + class MSSpatialTransformerNet(ms.nn.Cell): + def __init__(self): + super(MSSpatialTransformerNet, self).__init__() + self.conv = ms.nn.Conv2d(3, 3, kernel_size=3, padding=1, pad_mode='pad', has_bias=True) + + def construct(self, x, grid): + x = self.conv(x) + return mint_F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + # 创建模型 + pt_model = PTSpatialTransformerNet() + ms_model = MSSpatialTransformerNet() + + # 固定权重 + np_weight = np.random.randn(3, 3, 3, 3).astype(np.float32) * 0.1 + np_bias = np.random.randn(3).astype(np.float32) * 0.1 + + pt_model.conv.weight.data = torch.tensor(np_weight) + pt_model.conv.bias.data = torch.tensor(np_bias) + + ms_model.conv.weight.set_data(Tensor(np_weight, dtype=ms.float32)) + ms_model.conv.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 创建输入 + batch_size = 2 + height = 16 + width = 16 + grid_height = 12 + grid_width = 12 + + np_input = np.random.randn(batch_size, 3, height, width).astype(np.float32) + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + pt_input = torch.tensor(np_input) + pt_grid = torch.tensor(np_grid) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + + # 前向传播 + pt_output = pt_model(pt_input, pt_grid) + ms_output = ms_model(ms_input, ms_grid) + + print(f"PyTorch模型输出shape: {pt_output.shape}") + print(f"MindSpore模型输出shape: {ms_output.shape}") + + compare_outputs(pt_output, ms_output, "模型输出") + +def test_gradient(): + """测试反向传播和梯度计算""" + print_header("2.c) 测试反向传播和梯度计算") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 输入数据 + batch_size = 2 + channels = 3 + height = 8 + width = 8 + grid_height = 6 + grid_width = 6 + + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + np_grid = np.random.uniform(-0.9, 0.9, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + # PyTorch计算输入的梯度 + pt_input = torch.tensor(np_input, requires_grad=True) + pt_grid = torch.tensor(np_grid, requires_grad=True) + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + # 设置输出梯度为全1 + pt_grad_output = torch.ones_like(pt_output) + pt_output.backward(pt_grad_output) + + pt_input_grad = pt_input.grad + pt_grid_grad = pt_grid.grad + + print("PyTorch梯度:") + print(f"输入梯度shape: {pt_input_grad.shape}, 平均值: {pt_input_grad.mean().item()}") + print(f"网格梯度shape: {pt_grid_grad.shape}, 平均值: {pt_grid_grad.mean().item()}") + + # MindSpore计算输入的梯度 + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_grid = ms.Parameter(Tensor(np_grid, dtype=ms.float32)) + + # 使用MindSpore计算输入图像的梯度 + def forward_fn_input(x, grid): + return mint_F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + grad_fn_input = ms.grad(forward_fn_input, 0) # 对输入参数求导 + ms_input_grad = grad_fn_input(ms_input, ms_grid) + + print("\nMindSpore输入梯度:") + print(f"输入梯度shape: {ms_input_grad.shape}, 平均值: {ms_input_grad.asnumpy().mean()}") + + # 比较输入梯度 + compare_outputs(pt_input_grad, ms_input_grad, "输入梯度") + + # 使用MindSpore计算网格的梯度 + def forward_fn_grid(x, grid): + return mint_F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + grad_fn_grid = ms.grad(forward_fn_grid, 1) # 对网格参数求导 + ms_grid_grad = grad_fn_grid(ms_input, ms_grid) + + print("\nMindSpore网格梯度:") + print(f"网格梯度shape: {ms_grid_grad.shape}, 平均值: {ms_grid_grad.asnumpy().mean()}") + + # 比较网格梯度 + compare_outputs(pt_grid_grad, ms_grid_grad, "网格梯度") + + # 测试网络模型中的梯度 + print("\n测试神经网络中的梯度:") + + class PTSpatialTransformerNet(torch.nn.Module): + def __init__(self): + super(PTSpatialTransformerNet, self).__init__() + self.conv = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) + + def forward(self, x, grid): + x = self.conv(x) + return F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + class MSSpatialTransformerNet(ms.nn.Cell): + def __init__(self): + super(MSSpatialTransformerNet, self).__init__() + self.conv = ms.nn.Conv2d(3, 3, kernel_size=3, padding=1, pad_mode='pad', has_bias=True) + + def construct(self, x, grid): + x = self.conv(x) + return mint_F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False) + + # 创建模型 + pt_model = PTSpatialTransformerNet() + ms_model = MSSpatialTransformerNet() + + # 固定权重 + np_weight = np.random.randn(3, 3, 3, 3).astype(np.float32) * 0.1 + np_bias = np.random.randn(3).astype(np.float32) * 0.1 + + pt_model.conv.weight.data = torch.tensor(np_weight) + pt_model.conv.bias.data = torch.tensor(np_bias) + + ms_model.conv.weight.set_data(Tensor(np_weight, dtype=ms.float32)) + ms_model.conv.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 创建输入和目标 + batch_size = 2 + height = 16 + width = 16 + grid_height = 12 + grid_width = 12 + + np_input = np.random.randn(batch_size, 3, height, width).astype(np.float32) + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + + # 生成伪标签/目标 + np_target = np.random.randn(batch_size, 3, grid_height, grid_width).astype(np.float32) + + # PyTorch设置 + pt_input = torch.tensor(np_input, requires_grad=True) + pt_grid = torch.tensor(np_grid, requires_grad=True) + pt_target = torch.tensor(np_target) + + # 前向传播 + pt_output = pt_model(pt_input, pt_grid) + pt_loss = torch.nn.functional.mse_loss(pt_output, pt_target) + + # 反向传播 + pt_loss.backward() + + # 获取权重梯度 + pt_conv_weight_grad = pt_model.conv.weight.grad + pt_conv_bias_grad = pt_model.conv.bias.grad + + print("\nPyTorch网络梯度:") + print(f"conv.weight梯度平均值: {pt_conv_weight_grad.mean().item()}") + print(f"conv.bias梯度平均值: {pt_conv_bias_grad.mean().item()}") + + # MindSpore设置 + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 定义损失函数和前向计算 + def forward_fn(model, x, grid, target): + output = model(x, grid) + loss = ms.nn.MSELoss()(output, target) + return loss + + # 计算梯度 + grad_fn = ms.value_and_grad(forward_fn, None, ms_model.trainable_params(), has_aux=False) + loss, grads = grad_fn(ms_model, ms_input, ms_grid, ms_target) + + print("\nMindSpore网络梯度:") + for i, param in enumerate(ms_model.trainable_params()): + print(f"{param.name}梯度平均值: {grads[i].asnumpy().mean()}") + + # 比较卷积权重梯度 + ms_conv_weight_grad = None + for i, param in enumerate(ms_model.trainable_params()): + if 'conv.weight' in param.name: + ms_conv_weight_grad = grads[i] + break + + if ms_conv_weight_grad is not None: + print("\n比较conv.weight梯度:") + compare_outputs(pt_conv_weight_grad, ms_conv_weight_grad, "conv.weight梯度") + +def test_real_world_usage(): + """测试实际应用场景""" + print_header("2.d) 测试实际应用场景") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.grid_sample + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.grid_sample 不可用,无法进行测试...") + return # 退出测试函数 + + # 实际应用场景 1: 仿射变换 + print("\n测试场景1: 仿射变换") + + batch_size = 1 + channels = 3 + height = 32 + width = 32 + + # 创建输入图像 + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + + # 创建仿射变换矩阵 (旋转45度) + angle = 45.0 * np.pi / 180.0 + cos_val = np.cos(angle).astype(np.float32) # 确保是float32 + sin_val = np.sin(angle).astype(np.float32) # 确保是float32 + + # 旋转矩阵 + affine_matrix = np.array([ + [cos_val, -sin_val, 0], + [sin_val, cos_val, 0] + ], dtype=np.float32) + + # 生成采样网格 + grid_x, grid_y = np.meshgrid( + np.linspace(-1, 1, width, dtype=np.float32), # 指定dtype=np.float32 + np.linspace(-1, 1, height, dtype=np.float32) # 指定dtype=np.float32 + ) + grid = np.stack([grid_x, grid_y], axis=2) + grid = np.broadcast_to(grid, (batch_size, height, width, 2)) + + # 应用仿射变换到网格 + grid_x = grid[:, :, :, 0].reshape(batch_size, -1) + grid_y = grid[:, :, :, 1].reshape(batch_size, -1) + ones = np.ones_like(grid_x) + points = np.stack([grid_x, grid_y, ones], axis=2) + + # 应用变换 + transformed_points = np.matmul(points, affine_matrix.T) + transformed_grid = transformed_points.reshape(batch_size, height, width, 2) + + # 确保变换网格是float32类型 + transformed_grid = transformed_grid.astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, dtype=torch.float32) # 明确指定dtype + pt_grid = torch.tensor(transformed_grid, dtype=torch.float32) # 明确指定dtype + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(transformed_grid, dtype=ms.float32) + + # 执行采样 + pt_output = F.grid_sample(pt_input, pt_grid, mode='bilinear', padding_mode='zeros', align_corners=True) + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=True) + + print("仿射变换结果:") + compare_outputs(pt_output, ms_output, "旋转45度") + + # 实际应用场景 2: 图像缩放 + print("\n测试场景2: 图像缩放") + + # 缩小一半 + scale = 0.5 + + # 创建缩放网格 + grid_x, grid_y = np.meshgrid( + np.linspace(-scale, scale, width, dtype=np.float32), # 指定dtype=np.float32 + np.linspace(-scale, scale, height, dtype=np.float32) # 指定dtype=np.float32 + ) + scaled_grid = np.stack([grid_x, grid_y], axis=2) + scaled_grid = np.broadcast_to(scaled_grid, (batch_size, height, width, 2)) + scaled_grid = scaled_grid.astype(np.float32) # 确保是float32 + + # PyTorch + pt_scaled_grid = torch.tensor(scaled_grid, dtype=torch.float32) # 明确指定dtype + + # MindSpore + ms_scaled_grid = Tensor(scaled_grid, dtype=ms.float32) + + # 执行采样 + pt_scaled_output = F.grid_sample(pt_input, pt_scaled_grid, mode='bilinear', padding_mode='zeros', align_corners=True) + ms_scaled_output = mint_F.grid_sample(ms_input, ms_scaled_grid, mode='bilinear', padding_mode='zeros', align_corners=True) + + print("缩放图像结果:") + compare_outputs(pt_scaled_output, ms_scaled_output, "缩小一半") + + +def test_bfloat16(): + batch_size = 2 + channels = 3 + height = 8 + width = 8 + grid_height = 6 + grid_width = 6 + np_input = np.random.randn(batch_size, channels, height, width).astype(np.float32) + np_grid = np.random.uniform(-1, 1, (batch_size, grid_height, grid_width, 2)).astype(np.float32) + ms_dtype=ms.bfloat16 + try: + # 首先尝试 mint.nn.functional.grid_sample + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_grid = Tensor(np_grid, dtype=ms_dtype) + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + print(f"MindSpore 输出 ({'mint API' if ms_using_mint else '替代实现'}): shape={ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {type(e).__name__}: {str(e)}") + ms_support = "不支持" + + input_shape = (2, 1, 8, 8) # 单通道 + + grid_shape = (2, 6, 6, 2) + + print(f"\n测试输入尺寸: 输入={input_shape}, 网格={grid_shape}") + + # 生成随机输入 + np_input = np.random.randn(*input_shape).astype(np.float32) + np_grid = np.random.uniform(-1, 1, grid_shape).astype(np.float32) + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_grid = Tensor(np_grid, dtype=ms.float32) + ms_output = mint_F.grid_sample(ms_input, ms_grid, mode='bilinear', padding_mode='zeros', align_corners=False) + print(f"MindSpore 输出: shape={ms_output.shape}") + + mindspore_np = mindspore_out.asnumpy() + + +if __name__ == "__main__": + + mint_available = True + + + if mint_available: + # 运行所有测试 + test_dtype_support() + test_random_inputs() + test_param_support() + test_error_handling() + test_specific_cases() + test_nn_implementation() + test_gradient() + test_real_world_usage() + test_bfloat16() + else: + print("由于API不可用,测试已跳过") diff --git a/test/test_l1_loss.py b/test/test_l1_loss.py new file mode 100644 index 0000000..22c4b65 --- /dev/null +++ b/test/test_l1_loss.py @@ -0,0 +1,488 @@ +''' +dtype: + pytorch - float16, float32, float64, bfloat16 + mindspore - float16, float32, bfloat16 +''' + +import numpy as np +import torch +import torch.nn.functional as F +import mindspore as ms +from mindspore import Tensor +import mindspore.mint.nn.functional as mint_F +import traceback + +# 设置全局精度容差 +TOLERANCE = 1e-3 + +def print_header(title): + print(f"\n{'='*80}\n{title}\n{'='*80}") + +def compare_outputs(pytorch_out, mindspore_out, name="输出"): + """比较两个框架的输出是否在容差范围内""" + pytorch_np = pytorch_out.detach().cpu().numpy() + mindspore_np = mindspore_out.asnumpy() + + max_diff = np.max(np.abs(pytorch_np - mindspore_np)) + mean_diff = np.mean(np.abs(pytorch_np - mindspore_np)) + + print(f"{name} 最大差异: {max_diff}") + print(f"{name} 平均差异: {mean_diff}") + + if max_diff < TOLERANCE: + print(f"✓ {name}在容差范围内一致 (< {TOLERANCE})") + return True + else: + print(f"✗ {name}超出容差范围 (> {TOLERANCE})") + return False + +def test_dtype_support(): + """测试不同数据类型的支持度""" + print_header("1.a) 测试不同数据类型(dtype)的支持度") + + shape = (3, 4) + dtypes_pytorch = [torch.float16, torch.float32, torch.float64, torch.bfloat16] + dtypes_mindspore = [ms.float16, ms.float32, ms.float64, ms.bfloat16] + dtype_names = ["float16", "float32", "float64", "bfloat16"] + + for pt_dtype, ms_dtype, dtype_name in zip(dtypes_pytorch, dtypes_mindspore, dtype_names): + print(f"\n测试数据类型: PyTorch {dtype_name}, MindSpore {dtype_name}") + + # 生成随机输入 + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + try: + # PyTorch + pt_input = torch.tensor(np_input, dtype=pt_dtype) + pt_target = torch.tensor(np_target, dtype=pt_dtype) + pt_output = F.l1_loss(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}, shape: {pt_output.shape}") + pt_support = "支持" + except Exception as e: + print(f"PyTorch 错误: {type(e).__name__}: {str(e)}") + pt_support = "不支持" + + try: + # MindSpore + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_target = Tensor(np_target, dtype=ms_dtype) + ms_output = mint_F.l1_loss(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}, shape: {ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {type(e).__name__}: {str(e)}") + ms_support = "不支持" + + print(f"PyTorch {dtype_name}: {pt_support}, MindSpore {dtype_name}: {ms_support}") + +def test_random_inputs(): + """测试随机输入值的输出一致性""" + print_header("1.b) 测试随机输入值的输出一致性") + + shapes = [(2, 3), (3, 4, 5), (2, 3, 4, 5)] + + for shape in shapes: + print(f"\n测试shape: {shape}") + + # 生成随机输入 + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.l1_loss(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}") + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_output = mint_F.l1_loss(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出: {ms_output.asnumpy().item()}") + + compare_outputs(pt_output, ms_output) + + # 测试不同的reduction + for reduction in ['none', 'sum', 'mean']: + print(f"\n测试reduction: {reduction}") + + pt_output = F.l1_loss(pt_input, pt_target, reduction=reduction) + ms_output = mint_F.l1_loss(ms_input, ms_target, reduction=reduction) + + compare_outputs(pt_output, ms_output, f"reduction={reduction}") + +def test_param_support(): + """测试不同参数类型的支持度""" + print_header("1.c) 测试不同参数类型的支持度") + + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # 基本输入 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 测试不同reduction参数 + reductions = ['none', 'sum', 'mean', 'INVALID'] + + for reduction in reductions: + print(f"\n测试reduction参数: '{reduction}'") + + try: + pt_output = F.l1_loss(pt_input, pt_target, reduction=reduction) + print(f"PyTorch: 支持 reduction='{reduction}'") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mint_F.l1_loss(ms_input, ms_target, reduction=reduction) + print(f"MindSpore: 支持 reduction='{reduction}'") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 使用随机权重进行多次测试 + num_tests = 3 # 测试3组不同的输入 + for i in range(num_tests): + # 生成新的随机输入 + np_random_input = np.random.randn(*shape).astype(np.float32) + np_random_target = np.random.randn(*shape).astype(np.float32) + + pt_random_input = torch.tensor(np_random_input) + ms_random_input = Tensor(np_random_input, dtype=ms.float32) + pt_random_target = torch.tensor(np_random_target) + ms_random_target = Tensor(np_random_target, dtype=ms.float32) + + print(f"\n测试随机输入组 #{i+1}:") + print(f"随机输入平均值: {np_random_input.mean():.4f}, 随机目标平均值: {np_random_target.mean():.4f}") + + for reduction in ['none', 'sum', 'mean']: + try: + pt_output = F.l1_loss(pt_random_input, pt_random_target, reduction=reduction) + if reduction != 'none': + print(f"PyTorch {reduction}输出: {pt_output.item():.6f}") + else: + print(f"PyTorch {reduction}输出形状: {pt_output.shape}") + pt_ok = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_ok = False + + try: + ms_output = mint_F.l1_loss(ms_random_input, ms_random_target, reduction=reduction) + if reduction != 'none': + print(f"MindSpore {reduction}输出: {ms_output.asnumpy().item():.6f}") + else: + print(f"MindSpore {reduction}输出形状: {ms_output.shape}") + ms_ok = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_ok = False + + if pt_ok and ms_ok: + compare_outputs(pt_output, ms_output, f"随机输入 #{i+1}, reduction={reduction}") + +def test_error_handling(): + """测试错误处理的准确性""" + print_header("1.d) 测试错误处理的准确性") + + # 测试输入和目标形状不匹配 + print("\n测试输入和目标形状不匹配:") + + pt_input = torch.randn(2, 3) + pt_target = torch.randn(3, 2) + + ms_input = Tensor(np.random.randn(2, 3), dtype=ms.float32) + ms_target = Tensor(np.random.randn(3, 2), dtype=ms.float32) + + try: + pt_output = F.l1_loss(pt_input, pt_target) + print("PyTorch结果:", pt_output.item()) + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mint_F.l1_loss(ms_input, ms_target) + print("MindSpore结果:", ms_output.asnumpy().item()) + except Exception as e: + print(f"MindSpore错误: {str(e)}") + + # 测试错误的输入类型 + print("\n测试错误的输入类型:") + + # 字符串输入 + try: + pt_output = F.l1_loss(pt_input, "wrong_target") + print("PyTorch支持字符串输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.l1_loss(ms_input, "wrong_target") + print("MindSpore支持字符串输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试各种奇怪输入 + print("\n测试各种奇怪输入:") + + # 测试None输入 + try: + pt_output = F.l1_loss(None, pt_target) + print("PyTorch支持None输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.l1_loss(None, ms_target) + print("MindSpore支持None输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试0维张量 + try: + pt_output = F.l1_loss(torch.tensor(5.0), torch.tensor(3.0)) + print(f"PyTorch支持0维张量, 输出: {pt_output.item()}") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mint_F.l1_loss(Tensor(5.0, ms.float32), Tensor(3.0, ms.float32)) + print(f"MindSpore支持0维张量, 输出: {ms_output.asnumpy().item()}") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + +def test_nn_implementation(): + """测试神经网络实现""" + print_header("2.a/b) 测试神经网络实现和推理结果") + + # 简单的回归网络 + class PTRegressionNet(torch.nn.Module): + def __init__(self, input_dim, output_dim): + super(PTRegressionNet, self).__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x, target=None): + output = self.linear(x) + if target is not None: + loss = F.l1_loss(output, target) + return loss + return output + + class MSRegressionNet(ms.nn.Cell): + def __init__(self, input_dim, output_dim): + super(MSRegressionNet, self).__init__() + self.linear = ms.nn.Dense(input_dim, output_dim) + + def construct(self, x, target=None): + output = self.linear(x) + if target is not None: + loss = mint_F.l1_loss(output, target) + return loss + return output + + # 固定输入和权重 + input_dim = 5 + output_dim = 2 + batch_size = 3 + + np_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_target = np.random.randn(batch_size, output_dim).astype(np.float32) + + # 创建模型 + pt_model = PTRegressionNet(input_dim, output_dim) + ms_model = MSRegressionNet(input_dim, output_dim) + + # 固定权重 - 注意这里不需要转置 + np_weight = np.random.randn(output_dim, input_dim).astype(np.float32) + np_bias = np.random.randn(output_dim).astype(np.float32) + + pt_model.linear.weight.data = torch.tensor(np_weight) + pt_model.linear.bias.data = torch.tensor(np_bias) + + ms_model.linear.weight.set_data(Tensor(np_weight, dtype=ms.float32)) + ms_model.linear.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 前向传播测试 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 输出测试 + pt_output = pt_model(pt_input) + ms_output = ms_model(ms_input) + + print("测试模型输出:") + compare_outputs(pt_output, ms_output, "模型输出") + + # 损失测试 + pt_loss = pt_model(pt_input, pt_target) + ms_loss = ms_model(ms_input, ms_target) + + print("\n测试模型损失:") + compare_outputs(pt_loss, ms_loss, "损失值") + +def test_gradient(): + """测试反向传播和梯度计算""" + print_header("2.c) 测试反向传播和梯度计算") + + # 函数的梯度测试 + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.l1_loss(pt_input, pt_target) + pt_output.backward() + pt_grad = pt_input.grad + + print("PyTorch梯度:") + print(f"形状: {pt_grad.shape}") + print(f"平均值: {pt_grad.mean().item()}") + + # MindSpore - 创建一个计算图用于计算梯度 + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + + def forward_fn(x, target): + return mint_F.l1_loss(x, target) + + grad_fn = ms.grad(forward_fn) + ms_grad = grad_fn(ms_input, ms_target) + + print("\nMindSpore梯度:") + print(f"形状: {ms_grad.shape}") + print(f"平均值: {ms_grad.asnumpy().mean()}") + + # 比较梯度 + compare_outputs(pt_grad, ms_grad, "梯度") + + # 神经网络的参数梯度测试 + print("\n测试神经网络参数梯度:") + + input_dim = 5 + hidden_dim = 3 + output_dim = 2 + batch_size = 4 + + class PTSimpleNet(torch.nn.Module): + def __init__(self): + super(PTSimpleNet, self).__init__() + self.fc1 = torch.nn.Linear(input_dim, hidden_dim) + self.fc2 = torch.nn.Linear(hidden_dim, output_dim) + + def forward(self, x, target=None): + h = torch.nn.functional.relu(self.fc1(x)) + output = self.fc2(h) + if target is not None: + loss = F.l1_loss(output, target) + return loss + return output + + class MSSimpleNet(ms.nn.Cell): + def __init__(self): + super(MSSimpleNet, self).__init__() + self.fc1 = ms.nn.Dense(input_dim, hidden_dim) + self.fc2 = ms.nn.Dense(hidden_dim, output_dim) + self.relu = ms.nn.ReLU() + + def construct(self, x, target=None): + h = self.relu(self.fc1(x)) + output = self.fc2(h) + if target is not None: + loss = mint_F.l1_loss(output, target) + return loss + return output + + # 创建模型 + pt_net = PTSimpleNet() + ms_net = MSSimpleNet() + + # 固定权重 - 注意这里不需要转置 + np_fc1_weight = np.random.randn(hidden_dim, input_dim).astype(np.float32) + np_fc1_bias = np.random.randn(hidden_dim).astype(np.float32) + np_fc2_weight = np.random.randn(output_dim, hidden_dim).astype(np.float32) + np_fc2_bias = np.random.randn(output_dim).astype(np.float32) + + pt_net.fc1.weight.data = torch.tensor(np_fc1_weight) + pt_net.fc1.bias.data = torch.tensor(np_fc1_bias) + pt_net.fc2.weight.data = torch.tensor(np_fc2_weight) + pt_net.fc2.bias.data = torch.tensor(np_fc2_bias) + + ms_net.fc1.weight.set_data(Tensor(np_fc1_weight, dtype=ms.float32)) + ms_net.fc1.bias.set_data(Tensor(np_fc1_bias, dtype=ms.float32)) + ms_net.fc2.weight.set_data(Tensor(np_fc2_weight, dtype=ms.float32)) + ms_net.fc2.bias.set_data(Tensor(np_fc2_bias, dtype=ms.float32)) + + # 准备输入和目标 + np_net_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_net_target = np.random.randn(batch_size, output_dim).astype(np.float32) + + pt_net_input = torch.tensor(np_net_input) + pt_net_target = torch.tensor(np_net_target) + + ms_net_input = Tensor(np_net_input, dtype=ms.float32) + ms_net_target = Tensor(np_net_target, dtype=ms.float32) + + # PyTorch计算梯度 + pt_optimizer = torch.optim.SGD(pt_net.parameters(), lr=0.1) + pt_optimizer.zero_grad() + pt_loss = pt_net(pt_net_input, pt_net_target) + pt_loss.backward() + + # 获取PyTorch梯度 + pt_fc1_weight_grad = pt_net.fc1.weight.grad.numpy() + pt_fc1_bias_grad = pt_net.fc1.bias.grad.numpy() + pt_fc2_weight_grad = pt_net.fc2.weight.grad.numpy() + pt_fc2_bias_grad = pt_net.fc2.bias.grad.numpy() + + print("PyTorch网络梯度:") + print(f"fc1.weight梯度平均值: {pt_fc1_weight_grad.mean()}") + print(f"fc1.bias梯度平均值: {pt_fc1_bias_grad.mean()}") + print(f"fc2.weight梯度平均值: {pt_fc2_weight_grad.mean()}") + print(f"fc2.bias梯度平均值: {pt_fc2_bias_grad.mean()}") + + # MindSpore计算梯度 + def ms_forward_fn(inputs, targets): + return ms_net(inputs, targets) + + ms_grad_fn = ms.value_and_grad(ms_forward_fn, None, ms_net.trainable_params()) + ms_loss, ms_grads = ms_grad_fn(ms_net_input, ms_net_target) + + print("\nMindSpore网络梯度:") + for i, param in enumerate(ms_net.trainable_params()): + print(f"{param.name} 梯度平均值: {ms_grads[i].asnumpy().mean()}") + + # 比较具体参数的梯度 (不需要转置) + print("\n比较fc2.weight梯度:") + ms_fc2_weight_grad = None + for i, param in enumerate(ms_net.trainable_params()): + if 'fc2.weight' in param.name: + ms_fc2_weight_grad = ms_grads[i] + break + + if ms_fc2_weight_grad is not None: + ms_fc2_weight_grad_np = ms_fc2_weight_grad.asnumpy() + max_diff = np.max(np.abs(ms_fc2_weight_grad_np - pt_fc2_weight_grad)) + print(f"fc2.weight梯度最大差异: {max_diff}") + if max_diff < TOLERANCE: + print(f"✓ fc2.weight梯度在容差范围内一致 (< {TOLERANCE})") + else: + print(f"✗ fc2.weight梯度超出容差范围 (> {TOLERANCE})") + +if __name__ == "__main__": + # 运行所有测试 + test_dtype_support() + test_random_inputs() + test_param_support() + test_error_handling() + test_nn_implementation() + test_gradient() diff --git a/test/test_mse_loss.py b/test/test_mse_loss.py new file mode 100644 index 0000000..694eb2e --- /dev/null +++ b/test/test_mse_loss.py @@ -0,0 +1,575 @@ +''' +dtype: + pytorch - float16, float32, float64 + mindspore - float16, float32, bfloat16 +''' + +import numpy as np +import torch +import torch.nn.functional as F +import mindspore as ms +from mindspore import Tensor +import mindspore.mint.nn.functional as mint_F +import traceback + +# 设置全局精度容差 +TOLERANCE = 1e-3 + +def print_header(title): + print(f"\n{'='*80}\n{title}\n{'='*80}") + +def compare_outputs(pytorch_out, mindspore_out, name="输出"): + """比较两个框架的输出是否在容差范围内""" + pytorch_np = pytorch_out.detach().cpu().numpy() + mindspore_np = mindspore_out.asnumpy() + + max_diff = np.max(np.abs(pytorch_np - mindspore_np)) + mean_diff = np.mean(np.abs(pytorch_np - mindspore_np)) + + print(f"{name} 最大差异: {max_diff}") + print(f"{name} 平均差异: {mean_diff}") + + if max_diff < TOLERANCE: + print(f"✓ {name}在容差范围内一致 (< {TOLERANCE})") + return True + else: + print(f"✗ {name}超出容差范围 (> {TOLERANCE})") + return False + +# 定义 MindSpore 的 mse_loss 替代函数,以防原始函数不可用 +def mindspore_mse_loss(inputs, targets, reduction='mean'): + squared_diff = ms.ops.square(inputs - targets) + if reduction == 'none': + return squared_diff + elif reduction == 'mean': + return ms.ops.mean(squared_diff) + elif reduction == 'sum': + return ms.ops.sum(squared_diff) + else: + raise ValueError(f"Unsupported reduction mode: {reduction}") + +def test_dtype_support(): + """测试不同数据类型的支持度""" + print_header("1.a) 测试不同数据类型(dtype)的支持度") + + shape = (3, 4) + dtypes_pytorch = [torch.float16, torch.float32, torch.float64, torch.bfloat16] + dtypes_mindspore = [ms.float16, ms.float32, ms.float64, ms.bfloat16] + dtype_names = ["float16", "float32", "float64", "bfloat16"] + + for pt_dtype, ms_dtype, dtype_name in zip(dtypes_pytorch, dtypes_mindspore, dtype_names): + print(f"\n测试数据类型: PyTorch {dtype_name}, MindSpore {dtype_name}") + + # 生成随机输入 + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + try: + # PyTorch + pt_input = torch.tensor(np_input, dtype=pt_dtype) + pt_target = torch.tensor(np_target, dtype=pt_dtype) + pt_output = F.mse_loss(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}, shape: {pt_output.shape}") + pt_support = "支持" + except Exception as e: + print(f"PyTorch 错误: {type(e).__name__}: {str(e)}") + pt_support = "不支持" + + try: + # 首先尝试 mint.nn.functional.mse_loss + ms_input = Tensor(np_input, dtype=ms_dtype) + ms_target = Tensor(np_target, dtype=ms_dtype) + ms_using_mint = True + + try: + ms_output = mint_F.mse_loss(ms_input, ms_target, reduction='mean') + except (AttributeError, RuntimeError) as e: + print(f"mint.nn.functional.mse_loss 不可用: {type(e).__name__}: {str(e)}") + print("使用替代实现...") + ms_output = mindspore_mse_loss(ms_input, ms_target, reduction='mean') + ms_using_mint = False + + print(f"MindSpore 输出 ({'mint API' if ms_using_mint else '替代实现'}): {ms_output.asnumpy().item()}, shape: {ms_output.shape}") + ms_support = "支持" + except Exception as e: + print(f"MindSpore 错误: {type(e).__name__}: {str(e)}") + ms_support = "不支持" + + print(f"PyTorch {dtype_name}: {pt_support}, MindSpore {dtype_name}: {ms_support}") + +def test_random_inputs(): + """测试随机输入值的输出一致性""" + print_header("1.b) 测试随机输入值的输出一致性") + + shapes = [(2, 3), (3, 4, 5), (2, 3, 4, 5)] + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.mse_loss + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.mse_loss 不可用,使用替代实现...") + + mse_loss_fn = mint_F.mse_loss if ms_using_mint else mindspore_mse_loss + + for shape in shapes: + print(f"\n测试shape: {shape}") + + # 生成随机输入 + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.mse_loss(pt_input, pt_target, reduction='mean') + print(f"PyTorch 输出: {pt_output.item()}") + + # MindSpore + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + ms_output = mse_loss_fn(ms_input, ms_target, reduction='mean') + print(f"MindSpore 输出 ({'mint API' if ms_using_mint else '替代实现'}): {ms_output.asnumpy().item()}") + + compare_outputs(pt_output, ms_output) + + # 测试不同的reduction + for reduction in ['none', 'sum', 'mean']: + print(f"\n测试reduction: {reduction}") + + pt_output = F.mse_loss(pt_input, pt_target, reduction=reduction) + ms_output = mse_loss_fn(ms_input, ms_target, reduction=reduction) + + compare_outputs(pt_output, ms_output, f"reduction={reduction}") + +def test_param_support(): + """测试不同参数类型的支持度""" + print_header("1.c) 测试不同参数类型的支持度") + + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # 基本输入 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.mse_loss + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.mse_loss 不可用,使用替代实现...") + + mse_loss_fn = mint_F.mse_loss if ms_using_mint else mindspore_mse_loss + + # 测试不同reduction参数 + reductions = ['none', 'sum', 'mean', 'INVALID'] + + for reduction in reductions: + print(f"\n测试reduction参数: '{reduction}'") + + try: + pt_output = F.mse_loss(pt_input, pt_target, reduction=reduction) + print(f"PyTorch: 支持 reduction='{reduction}'") + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + + try: + ms_output = mse_loss_fn(ms_input, ms_target, reduction=reduction) + print(f"MindSpore ({'mint API' if ms_using_mint else '替代实现'}): 支持 reduction='{reduction}'") + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + + # 使用随机输入进行多次测试 + num_tests = 3 # 测试3组不同的输入 + for i in range(num_tests): + # 生成新的随机输入 + np_random_input = np.random.randn(*shape).astype(np.float32) + np_random_target = np.random.randn(*shape).astype(np.float32) + + pt_random_input = torch.tensor(np_random_input) + ms_random_input = Tensor(np_random_input, dtype=ms.float32) + pt_random_target = torch.tensor(np_random_target) + ms_random_target = Tensor(np_random_target, dtype=ms.float32) + + print(f"\n测试随机输入组 #{i+1}:") + print(f"随机输入平均值: {np_random_input.mean():.4f}, 随机目标平均值: {np_random_target.mean():.4f}") + + for reduction in ['none', 'sum', 'mean']: + try: + pt_output = F.mse_loss(pt_random_input, pt_random_target, reduction=reduction) + if reduction != 'none': + print(f"PyTorch {reduction}输出: {pt_output.item():.6f}") + else: + print(f"PyTorch {reduction}输出形状: {pt_output.shape}") + pt_ok = True + except Exception as e: + print(f"PyTorch 错误: {str(e)}") + pt_ok = False + + try: + ms_output = mse_loss_fn(ms_random_input, ms_random_target, reduction=reduction) + if reduction != 'none': + print(f"MindSpore {reduction}输出: {ms_output.asnumpy().item():.6f}") + else: + print(f"MindSpore {reduction}输出形状: {ms_output.shape}") + ms_ok = True + except Exception as e: + print(f"MindSpore 错误: {str(e)}") + ms_ok = False + + if pt_ok and ms_ok: + compare_outputs(pt_output, ms_output, f"随机输入 #{i+1}, reduction={reduction}") + +def test_error_handling(): + """测试错误处理的准确性""" + print_header("1.d) 测试错误处理的准确性") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.mse_loss + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.mse_loss 不可用,使用替代实现...") + + mse_loss_fn = mint_F.mse_loss if ms_using_mint else mindspore_mse_loss + + # 测试输入和目标形状不匹配 + print("\n测试输入和目标形状不匹配:") + + pt_input = torch.randn(2, 3) + pt_target = torch.randn(3, 2) + + ms_input = Tensor(np.random.randn(2, 3), dtype=ms.float32) + ms_target = Tensor(np.random.randn(3, 2), dtype=ms.float32) + + try: + pt_output = F.mse_loss(pt_input, pt_target) + print("PyTorch结果:", pt_output.item()) + except Exception as e: + print(f"PyTorch错误: {str(e)}") + + try: + ms_output = mse_loss_fn(ms_input, ms_target) + print("MindSpore结果:", ms_output.asnumpy().item()) + except Exception as e: + print(f"MindSpore错误: {str(e)}") + + # 测试错误的输入类型 + print("\n测试错误的输入类型:") + + # 字符串输入 + try: + pt_output = F.mse_loss(pt_input, "wrong_target") + print("PyTorch支持字符串输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mse_loss_fn(ms_input, "wrong_target") + print("MindSpore支持字符串输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试各种奇怪输入 + print("\n测试各种奇怪输入:") + + # 测试None输入 + try: + pt_output = F.mse_loss(None, pt_target) + print("PyTorch支持None输入") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mse_loss_fn(None, ms_target) + print("MindSpore支持None输入") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + + # 测试0维张量 + try: + pt_output = F.mse_loss(torch.tensor(5.0), torch.tensor(3.0)) + print(f"PyTorch支持0维张量, 输出: {pt_output.item()}") + except Exception as e: + print(f"PyTorch错误: {type(e).__name__}: {str(e)}") + + try: + ms_output = mse_loss_fn(Tensor(5.0, ms.float32), Tensor(3.0, ms.float32)) + print(f"MindSpore支持0维张量, 输出: {ms_output.asnumpy().item()}") + except Exception as e: + print(f"MindSpore错误: {type(e).__name__}: {str(e)}") + +def test_nn_implementation(): + """测试神经网络实现""" + print_header("2.a/b) 测试神经网络实现和推理结果") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.mse_loss + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.mse_loss 不可用,使用替代实现...") + + mse_loss_fn = mint_F.mse_loss if ms_using_mint else mindspore_mse_loss + + # 简单的回归网络 + class PTRegressionNet(torch.nn.Module): + def __init__(self, input_dim, output_dim): + super(PTRegressionNet, self).__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x, target=None): + output = self.linear(x) + if target is not None: + loss = F.mse_loss(output, target) + return loss + return output + + class MSRegressionNet(ms.nn.Cell): + def __init__(self, input_dim, output_dim, use_mint=True): + super(MSRegressionNet, self).__init__() + self.linear = ms.nn.Dense(input_dim, output_dim) + self.use_mint = use_mint + + def construct(self, x, target=None): + output = self.linear(x) + if target is not None: + if self.use_mint: + loss = mint_F.mse_loss(output, target) + else: + loss = mindspore_mse_loss(output, target) + return loss + return output + + # 固定输入和权重 + input_dim = 5 + output_dim = 2 + batch_size = 3 + + np_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_target = np.random.randn(batch_size, output_dim).astype(np.float32) + + # 创建模型 + pt_model = PTRegressionNet(input_dim, output_dim) + ms_model = MSRegressionNet(input_dim, output_dim, use_mint=ms_using_mint) + + # 固定权重 - 不需要转置 + np_weight = np.random.randn(output_dim, input_dim).astype(np.float32) + np_bias = np.random.randn(output_dim).astype(np.float32) + + pt_model.linear.weight.data = torch.tensor(np_weight) + pt_model.linear.bias.data = torch.tensor(np_bias) + + ms_model.linear.weight.set_data(Tensor(np_weight, dtype=ms.float32)) + ms_model.linear.bias.set_data(Tensor(np_bias, dtype=ms.float32)) + + # 前向传播测试 + pt_input = torch.tensor(np_input) + pt_target = torch.tensor(np_target) + + ms_input = Tensor(np_input, dtype=ms.float32) + ms_target = Tensor(np_target, dtype=ms.float32) + + # 输出测试 + pt_output = pt_model(pt_input) + ms_output = ms_model(ms_input) + + print("测试模型输出:") + compare_outputs(pt_output, ms_output, "模型输出") + + # 损失测试 + pt_loss = pt_model(pt_input, pt_target) + ms_loss = ms_model(ms_input, ms_target) + + print("\n测试模型损失:") + compare_outputs(pt_loss, ms_loss, "损失值") + +def test_gradient(): + """测试反向传播和梯度计算""" + print_header("2.c) 测试反向传播和梯度计算") + + # 检查mint API是否可用 + ms_using_mint = True + try: + _ = mint_F.mse_loss + except (AttributeError, RuntimeError): + ms_using_mint = False + print("mint.nn.functional.mse_loss 不可用,使用替代实现...") + + mse_loss_fn = mint_F.mse_loss if ms_using_mint else mindspore_mse_loss + + # 函数的梯度测试 + shape = (3, 4) + np_input = np.random.randn(*shape).astype(np.float32) + np_target = np.random.randn(*shape).astype(np.float32) + + # PyTorch + pt_input = torch.tensor(np_input, requires_grad=True) + pt_target = torch.tensor(np_target) + pt_output = F.mse_loss(pt_input, pt_target) + pt_output.backward() + pt_grad = pt_input.grad + + print("PyTorch梯度:") + print(f"形状: {pt_grad.shape}") + print(f"平均值: {pt_grad.mean().item()}") + + # MindSpore - 创建一个计算图用于计算梯度 + ms_input = ms.Parameter(Tensor(np_input, dtype=ms.float32)) + ms_target = Tensor(np_target, dtype=ms.float32) + + def forward_fn(x, target): + return mse_loss_fn(x, target) + + grad_fn = ms.grad(forward_fn) + ms_grad = grad_fn(ms_input, ms_target) + + print("\nMindSpore梯度:") + print(f"形状: {ms_grad.shape}") + print(f"平均值: {ms_grad.asnumpy().mean()}") + + # 比较梯度 + compare_outputs(pt_grad, ms_grad, "梯度") + + # 神经网络的参数梯度测试 + print("\n测试神经网络参数梯度:") + + input_dim = 5 + hidden_dim = 3 + output_dim = 2 + batch_size = 4 + + class PTSimpleNet(torch.nn.Module): + def __init__(self): + super(PTSimpleNet, self).__init__() + self.fc1 = torch.nn.Linear(input_dim, hidden_dim) + self.fc2 = torch.nn.Linear(hidden_dim, output_dim) + + def forward(self, x, target=None): + h = torch.nn.functional.relu(self.fc1(x)) + output = self.fc2(h) + if target is not None: + loss = F.mse_loss(output, target) + return loss + return output + + class MSSimpleNet(ms.nn.Cell): + def __init__(self, use_mint=True): + super(MSSimpleNet, self).__init__() + self.fc1 = ms.nn.Dense(input_dim, hidden_dim) + self.fc2 = ms.nn.Dense(hidden_dim, output_dim) + self.relu = ms.nn.ReLU() + self.use_mint = use_mint + + def construct(self, x, target=None): + h = self.relu(self.fc1(x)) + output = self.fc2(h) + if target is not None: + if self.use_mint: + loss = mint_F.mse_loss(output, target) + else: + loss = mindspore_mse_loss(output, target) + return loss + return output + + # 创建模型 + pt_net = PTSimpleNet() + ms_net = MSSimpleNet(use_mint=ms_using_mint) + + # 固定权重 - 不需要转置 + np_fc1_weight = np.random.randn(hidden_dim, input_dim).astype(np.float32) + np_fc1_bias = np.random.randn(hidden_dim).astype(np.float32) + np_fc2_weight = np.random.randn(output_dim, hidden_dim).astype(np.float32) + np_fc2_bias = np.random.randn(output_dim).astype(np.float32) + + pt_net.fc1.weight.data = torch.tensor(np_fc1_weight) + pt_net.fc1.bias.data = torch.tensor(np_fc1_bias) + pt_net.fc2.weight.data = torch.tensor(np_fc2_weight) + pt_net.fc2.bias.data = torch.tensor(np_fc2_bias) + + ms_net.fc1.weight.set_data(Tensor(np_fc1_weight, dtype=ms.float32)) + ms_net.fc1.bias.set_data(Tensor(np_fc1_bias, dtype=ms.float32)) + ms_net.fc2.weight.set_data(Tensor(np_fc2_weight, dtype=ms.float32)) + ms_net.fc2.bias.set_data(Tensor(np_fc2_bias, dtype=ms.float32)) + + # 准备输入和目标 + np_net_input = np.random.randn(batch_size, input_dim).astype(np.float32) + np_net_target = np.random.randn(batch_size, output_dim).astype(np.float32) + + pt_net_input = torch.tensor(np_net_input) + pt_net_target = torch.tensor(np_net_target) + + ms_net_input = Tensor(np_net_input, dtype=ms.float32) + ms_net_target = Tensor(np_net_target, dtype=ms.float32) + + # PyTorch计算梯度 + pt_optimizer = torch.optim.SGD(pt_net.parameters(), lr=0.1) + pt_optimizer.zero_grad() + pt_loss = pt_net(pt_net_input, pt_net_target) + pt_loss.backward() + + # 获取PyTorch梯度 + pt_fc1_weight_grad = pt_net.fc1.weight.grad.numpy() + pt_fc1_bias_grad = pt_net.fc1.bias.grad.numpy() + pt_fc2_weight_grad = pt_net.fc2.weight.grad.numpy() + pt_fc2_bias_grad = pt_net.fc2.bias.grad.numpy() + + print("PyTorch网络梯度:") + print(f"fc1.weight梯度平均值: {pt_fc1_weight_grad.mean()}") + print(f"fc1.bias梯度平均值: {pt_fc1_bias_grad.mean()}") + print(f"fc2.weight梯度平均值: {pt_fc2_weight_grad.mean()}") + print(f"fc2.bias梯度平均值: {pt_fc2_bias_grad.mean()}") + + # MindSpore计算梯度 + def ms_forward_fn(inputs, targets): + return ms_net(inputs, targets) + + ms_grad_fn = ms.value_and_grad(ms_forward_fn, None, ms_net.trainable_params()) + ms_loss, ms_grads = ms_grad_fn(ms_net_input, ms_net_target) + + print("\nMindSpore网络梯度:") + for i, param in enumerate(ms_net.trainable_params()): + print(f"{param.name} 梯度平均值: {ms_grads[i].asnumpy().mean()}") + + # 比较具体参数的梯度 (不需要转置) + print("\n比较fc2.weight梯度:") + ms_fc2_weight_grad = None + for i, param in enumerate(ms_net.trainable_params()): + if 'fc2.weight' in param.name: + ms_fc2_weight_grad = ms_grads[i] + break + + if ms_fc2_weight_grad is not None: + ms_fc2_weight_grad_np = ms_fc2_weight_grad.asnumpy() + max_diff = np.max(np.abs(ms_fc2_weight_grad_np - pt_fc2_weight_grad)) + print(f"fc2.weight梯度最大差异: {max_diff}") + if max_diff < TOLERANCE: + print(f"✓ fc2.weight梯度在容差范围内一致 (< {TOLERANCE})") + else: + print(f"✗ fc2.weight梯度超出容差范围 (> {TOLERANCE})") + +if __name__ == "__main__": + # 检查mint.nn.functional.mse_loss是否可用 + try: + _ = mint_F.mse_loss + print("mint.nn.functional.mse_loss 可用,使用官方API进行测试") + except (AttributeError, RuntimeError) as e: + print(f"mint.nn.functional.mse_loss 不可用: {e}") + print("将使用自定义实现进行测试...") + + # 运行所有测试 + test_dtype_support() + test_random_inputs() + test_param_support() + test_error_handling() + test_nn_implementation() + test_gradient()