-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlstm.py
107 lines (90 loc) · 3.29 KB
/
lstm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.autograd import Variable
data_csv=pd.read_csv('./international-airline-passengers.csv',usecols=[1])
plt.plot(data_csv)
# 数据预处理
data_csv = data_csv.dropna() # 滤除缺失数据
dataset = data_csv.values # 获得csv的值
# print("dataset1",dataset)
dataset = dataset.astype('float32')
max_value = np.max(dataset) # 获得最大值
min_value = np.min(dataset) # 获得最小值
scalar = max_value - min_value # 获得间隔数量
dataset = list(map(lambda x: x / scalar, dataset)) # 归一化
# 设置X,Y数据集,以look_back2为准,取第一个和第二个为数组,形成data_X,取第三个作为预测值,形成data_Y,完成训练集的提取
def create_dataset(dataset, look_back=2):
dataX, dataY = [], []
for i in range(len(dataset) - look_back):
a = dataset[i:(i + look_back)]
dataX.append(a)
dataY.append(dataset[i + look_back])
return np.array(dataX), np.array(dataY)
# 创建好输入输出
data_X, data_Y = create_dataset(dataset)
# 划分训练集和测试集,70% 作为训练集
train_size = int(len(data_X) * 0.7)
test_size = len(data_X) - train_size
train_X = data_X[:train_size]
train_Y = data_Y[:train_size]
test_X = data_X[train_size:]
test_Y = data_Y[train_size:]
# 设置LSTM能识别的数据类型,形成tran_X的一维两个参数的数组,train_Y的一维一个参数的数组。并转化为tensor类型
train_X = train_X.reshape(-1, 1, 2)
train_Y = train_Y.reshape(-1, 1, 1)
test_X = test_X.reshape(-1, 1, 2)
train_x = torch.from_numpy(train_X)
train_y = torch.from_numpy(train_Y)
test_x = torch.from_numpy(test_X)
class lstm(nn.Module):
def __init__(self, input_size=2, hidden_size=4, output_size=1, num_layer=2):
super(lstm, self).__init__()
self.layer1 = nn.LSTM(input_size, hidden_size, num_layer)
self.layer2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x, _ = self.layer1(x)
s, b, h = x.size()
x = x.view(s * b, h)
x = self.layer2(x)
x = x.view(s, b, -1)
return x
model = lstm(2, 4, 1, 2)
# 设置交叉熵损失函数和自适应梯度下降算法
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
# 开始训练
for e in range(1000):
var_x = Variable(train_x)
var_y = Variable(train_y)
# 前向传播
out = model(var_x)
loss = criterion(out, var_y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (e + 1) % 100 == 0: # 每 100 次输出结果
print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.item()))
# 模型预测
model = model.eval() # 转换成测试模式
data_X = data_X.reshape(-1, 1, 2)
data_X = torch.from_numpy(data_X)
var_data = Variable(data_X)
pred_test = model(var_data) # 测试集的预测结果
# 改变输出的格式
pred_test = pred_test.view(-1).data.numpy()
# 反归一化
pred_test=list(map(lambda x: x*scalar, pred_test))
dataset=list(map(lambda x: x *scalar, dataset))
print("predict",pred_test)
print(dataset)
# 画图
# 画出实际结果和预测的结果
plt.plot(pred_test, 'r', label='prediction')
# plt.plot(train_Y,'g',label='train_Y')
plt.plot(dataset, 'b', label='real')
plt.legend(loc='best')
plt.show()