参考:https://colah.github.io/posts/2015-08-Understanding-LSTMs/ ,写得非常详细,有精致的图例和清楚的公式,建议先阅读该篇文章

LSTM,长短期记忆神经网络,它克服了RNN难以记录长期信息的特点。

举个例子,“天上红色的是__”,RNN可以一定程度上利用“天上”、“红色”等信息来预测当前应填的单词,可能是“太阳”。

但是,如果有效信息相隔非常远,例如:“我是一个中国人,……,我会说___,英语和日语”,这些久远的信息很难留存在RNN的hidden state中。

LSTM在结构上做出了改进,如下图所示。它包含两种hidden state,一个是上方的$C_t$,它代表着模型的记忆,在训练过程中改动较少,一个是下方的$h_t$,代表着输入内容的抽象信息(?也许,其实没有这么明确的物理意义,只是我期望可以这么认为)。它们通过一些方式互相影响并更新,下面一一介绍。

遗忘门

LSTM的第一步是决定要让长期记忆$C_{t-1}$去遗忘什么。它拼接当前输入$x_t$和上一时刻的状态$h_{t-1}$,并送入sigmoid层,得到一个介于0与1之间的输出向量。随后,和$C_{t-1}$做Hadamard积。如果sigmoid层对应输出越接近0,则对应信息遗忘得越多,反之则继续记忆。

形象化的理解可能就是,用当前状态去判断哪些记忆不再适用了,比如说之前还一直在谈论A,现在突然转到谈论B了,一些有关A的信息就不再需要了。

输入门

第二步是决定要让$C_{t-1}$记住什么。当前输入首先会经过一个sigmoid层得到$i_t$,这决定了接下来哪些信息是有必要被记住的。实际上的更新向量$\tilde{C_t}$是通过tanh激活函数得到的,至于为什么选tanh,我找到了一些说法:(来自:https://stackoverflow.com/questions/40761185/what-is-the-intuition-of-using-tanh-in-lstm)

  • 为了防止梯度消失问题,我们需要一个二次导数在大范围内不为0的函数,而tanh函数可以满足这一点

  • 为了便于凸优化,我们需要一个单调函数

  • tanh函数一般收敛的更快

  • tanh函数的求导占用系统的资源更少

随后,$i_t$与$\tilde{C_t}$相乘,得到正式的update内容。

输出门

我们要决定输出什么内容。总之就是输入经过一个sigmoid层,加上记忆$C_{t}$的影响(经过tanh),

自己实现

先不用PyTorch定义好的LSTM,直接根据网络结构一步步从头搭建一个

注意到原来公式中有一步拼接$h_{t-1}$和$x_t$的操作,然后和权重矩阵$W$相乘。由线性代数知识,我们完全可以把$W \cdot [h_{t-1},x_t]$ 变成 $U \cdot x_t + V \cdot h_{t-1}$

这样,原来LSTM中的三个sigmoid层,一个tanh层的公式就应该是:

现在需要理清楚一下维度:

输入 $x$ 应该是 [batch_size, sequence_len(时间序列长度), feature_size]

这样 $x_t$ 就是 [feature_size], 同理 $h_t$ 是 [hidden_size]

所以需要训练的参数为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class myLstm(nn.Module):
def __init__(self, input_sz, hidden_sz):
super().__init__()
self.input_size = input_sz
self.hidden_size = hidden_sz

#f_t
self.U_f = nn.Parameter(torch.Tensor(input_sz, hidden_sz))
self.V_f = nn.Parameter(torch.Tensor(hidden_sz, hidden_sz))
self.b_f = nn.Parameter(torch.Tensor(hidden_sz))

#i_t
self.U_i = nn.Parameter(torch.Tensor(input_sz,hidden_sz))
self.V_i = nn.Parameter(torch.Tensor(hidden_sz,hidden_sz))
self.b_i = nn.parameter(torch.Tensor(hidden_sz))

#c_t
self.U_c = nn.Parameter(torch.Tensor(input_sz, hidden_sz))
self.V_c = nn.Parameter(torch.Tensor(hidden_sz, hidden_sz))
self.b_c = nn.Parameter(torch.Tensor(hidden_sz))

#o_t
self.U_o = nn.Parameter(torch.Tensor(input_sz, hidden_sz))
self.V_o = nn.Parameter(torch.Tensor(hidden_sz, hidden_sz))
self.b_o = nn.Parameter(torch.Tensor(hidden_sz))

前向传播定义如下:

在训练时,某些样本在时间上可能是连续的,而有些是不相干的,有时我们需要权重进行预测。我们可以通过参数init_states来决定是否要继承上次的最终输出$h_t$和$c_t$,还是直接重新初始化。

随后,遍历时间步,在整个时间序列遍历完后再更新权重。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def forward(self, x, init_states=None):
batch_size,seq_sz,_=x.size()

if init_states is None:
h_t,c_t=(
torch.zeros(batch_size, self.hidden_size).to(x.device),
torch.zeros(batch_size, self.hidden_size).to(x.device)
)
else:
h_t, c_t = init_states

for t in range(seq_sz):
x_t = x[:, t, :]

# 和nn.parameter计算时,自动处理x的batch这一维度
i_t = torch.sigmoid(x_t @ self.U_i + h_t @ self.V_i + self.b_i)
f_t = torch.sigmoid(x_t @ self.U_f + h_t @ self.V_f + self.b_f)
g_t = torch.tanh(x_t @ self.U_c + h_t @ self.V_c + self.b_c)
o_t = torch.sigmoid(x_t @ self.U_o + h_t @ self.V_o + self.b_o)
c_t = f_t * c_t + i_t * g_t
h_t = o_t * torch.tanh(c_t)

return h_t, c_t

这样,我们能得到最后一次输出的权重。比如说要预测接下来16个时间步的内容,我们就可以利用这些数据,进一步在模型中forward,得到新的输出 $y_{pred}$,和真实的 $y$ 计算误差。

进一步优化

注意到四个门的计算都是互相并行的,我们可以用一次矩阵运算来加速操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class CustomLSTM(nn.Module):
def __init__(self, input_sz, hidden_sz):
super().__init__()
self.input_sz = input_sz
self.hidden_size = hidden_sz
self.W = nn.Parameter(torch.Tensor(input_sz, hidden_sz * 4))
self.U = nn.Parameter(torch.Tensor(hidden_sz, hidden_sz * 4))
self.bias = nn.Parameter(torch.Tensor(hidden_sz * 4))
self.init_weights()

# 初始化模型权重
def init_weights(self):
stdv = 1.0 / math.sqrt(self.hidden_size)
for weight in self.parameters():
weight.data.uniform_(-stdv, stdv)

def forward(self, x,
init_states=None):
"""Assumes x is of shape (batch, sequence, feature)"""
bs, seq_sz, _ = x.size()
hidden_seq = []
if init_states is None:
h_t, c_t = (torch.zeros(bs, self.hidden_size).to(x.device),
torch.zeros(bs, self.hidden_size).to(x.device))
else:
h_t, c_t = init_states

HS = self.hidden_size
for t in range(seq_sz):
x_t = x[:, t, :]
# batch the computations into a single matrix multiplication
gates = x_t @ self.W + h_t @ self.U + self.bias
i_t, f_t, g_t, o_t = (
torch.sigmoid(gates[:, :HS]), # input
torch.sigmoid(gates[:, HS:HS*2]), # forget
torch.tanh(gates[:, HS*2:HS*3]),
torch.sigmoid(gates[:, HS*3:]), # output
)
c_t = f_t * c_t + i_t * g_t
h_t = o_t * torch.tanh(c_t)
hidden_seq.append(h_t.unsqueeze(0))
hidden_seq = torch.cat(hidden_seq, dim=0)
# reshape from shape (sequence, batch, feature) to (batch, sequence, feature)
hidden_seq = hidden_seq.transpose(0, 1).contiguous()
return hidden_seq, (h_t, c_t)

我也不太清楚算出每一步的 hidden_seq 有什么用,为什么只存 $h_t$ 而不存 $C_t$ ?

问了 ChatGPT,它是这么回答 hidden_seq 的作用的:

  • 序列分类:如果你的任务是对整个序列进行分类,你可以使用 hidden_seq 中的最后一个时间步的隐藏状态 h_t 或者对整个序列的隐藏状态进行池化操作(例如平均池化或最大池化),然后将其传递给分类器进行分类预测。

  • 序列标注:在自然语言处理中,你可以使用 hidden_seq 来生成每个时间步的标注结果,例如词性标注或命名实体识别。

  • 序列生成:如果你的任务是生成新的序列,如文本生成,你可以使用 hidden_seq 作为生成器的输入,以生成接下来的序列内容。

  • 注意力机制:hidden_seq 可用于计算注意力权重,以确定序列中不同时间步的重要性,然后对序列的不同部分进行加权汇总。

  • 可视化和分析:hidden_seq 可以用于可视化和分析模型在输入序列上的学习过程。你可以查看隐藏状态的变化,了解模型如何处理不同时间步的信息。

嗯,暂时还不理解。

PyTorch API

也可以直接调PyTorch的API,会方便很多

我们用 https://github.com/L1aoXingyu/code-of-learn-deep-learning-with-pytorch/blob/master/chapter5_RNN/time-series/lstm-time-series.ipynb 中的例子来实践一下

十年飞机客流量数据:https://github.com/L1aoXingyu/code-of-learn-deep-learning-with-pytorch/blob/master/chapter5_RNN/time-series/data.csv

1
2
3
4
5
6
7
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

data_csv = pd.read_csv('./data.csv', usecols=[1])
plt.plot(data_csv)

数据如下图所示:

我们的任务目标是通过前几个月的客流量数据去预测后几个月的客流量数据。比如说我们要用前两个月的数据去预测后一个月的数据。

首先进行数据处理,除掉空数据(虽然这个数据集里好像没有),然后执行归一化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 数据预处理
data_csv = data_csv.dropna()
dataset = data_csv.values
dataset = dataset.astype('float32')
max_value = np.max(dataset)
min_value = np.min(dataset)
scalar = max_value - min_value
dataset = list(map(lambda x: x / scalar, dataset))

def create_dataset(dataset, look_back=2):
dataX, dataY = [], []
for i in range(len(dataset) - look_back):
a = dataset[i:(i + look_back)]
dataX.append(a)
dataY.append(dataset[i + look_back])
return np.array(dataX), np.array(dataY)

# 创建好输入输出
data_X, data_Y = create_dataset(dataset)

# 划分训练集和测试集,70% 作为训练集
train_size = int(len(data_X) * 0.7)
test_size = len(data_X) - train_size
train_X = data_X[:train_size]
train_Y = data_Y[:train_size]
test_X = data_X[train_size:]
test_Y = data_Y[train_size:]

看下网络的定义,先来介绍一下 nn.LSTM 的参数:

  • input_size – 输入的特征维度
  • hidden_size – 隐状态的特征维度
  • num_layers – 堆叠LSTM的层数
  • batch_first – 如果为True,那么输入和输出Tensor的形状为(batch, seq, feature),默认为false,是(seq, batch, feature)
  • bidirectional – 是否双向传播,默认为False

LSTM输出: output, (h_n, c_n)

output (seq_len, batch, hidden_size num_directions):LSTM的输出序列,对于每个时间步,它包含了每个样本的隐藏状态。
h_n (num_layers
num_directions, batch, hidden_size):保存着LSTM最后一个时间步的隐状态
c_n (num_layers * num_directions, batch, hidden_size):保存着LSTM最后一个时间步的细胞状态

其中 h_n 应该就是 output[-1] (最后一项)

1
2
# 例
lstm = nn.LSTM(10, 512, 2, bidirectional=False)

所以,我们要把输入的数据先做一下维度变换:

1
2
3
4
5
6
7
8
9
import torch

train_X = train_X.reshape(-1, 1, 2)
train_Y = train_Y.reshape(-1, 1, 1)
test_X = test_X.reshape(-1, 1, 2)

train_x = torch.from_numpy(train_X)
train_y = torch.from_numpy(train_Y)
test_x = torch.from_numpy(test_X)

然后我们定义模型,开始训练。

注意最后一层线性层,我们需要把当前的输出hidden_size转化成我们需要的output_size。

线性层一般接收一个二维输入,忽略第一维的batch_size,转换hidden_size,不过因为前面一层是LSTM的输出,所以是三维的,我们不妨把batch_size和seq_len先“拼起来”,毕竟我们的目的只是想把隐藏层输出转化为目标输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from torch import nn
from torch.autograd import Variable

# 定义模型
class lstm_reg(nn.Module):
def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
super().__init__()

self.lstm = nn.LSTM(input_size, hidden_size, num_layers) # lstm
self.reg = nn.Linear(hidden_size, output_size) # 回归

def forward(self, x):
x, _ = self.lstm(x) # (seq, batch, hidden)
s, b, h = x.shape
x = x.view(s*b, h) # 转换成线性层的输入格式
x = self.reg(x)
x = x.view(s, b, -1)
return x

model = lstm_reg(2, 4)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

# 开始训练
for step in range(1000):
var_x = Variable(train_x)
var_y = Variable(train_y)
# 前向传播
out = model(var_x)
loss = criterion(out, var_y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
'''
if (e + 1) % 100 == 0: # 每 100 次输出结果
print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.data[0]))
'''
if (step + 1) % 1000 == 0:
print('Step:', '%04d' % (step + 1), 'cost =', '{:.6f}'.format(loss))

训练好之后,我们就可以去做预测了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
net = net.eval() # 转换成测试模式

data_X = data_X.reshape(-1, 1, 2)
data_X = torch.from_numpy(data_X)
var_data = Variable(data_X)
pred_test = net(var_data) # 测试集的预测结果

# 改变输出的格式
pred_test = pred_test.view(-1).data.numpy()

# 画出实际结果和预测的结果
plt.plot(pred_test, 'r', label='prediction')
plt.plot(dataset, 'b', label='real')
plt.legend(loc='best')

结果如下图所示:

https://colah.github.io/posts/2015-08-Understanding-LSTMs/

https://zhuanlan.zhihu.com/p/451985132

https://github.com/L1aoXingyu/code-of-learn-deep-learning-with-pytorch/blob/master/chapter5_RNN/time-series/lstm-time-series.ipynb