forked from InterDigitalInc/DialogSummary-VideoQA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi_head_attention.py
97 lines (79 loc) · 3.49 KB
/
multi_head_attention.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""This code is taken from https://github.com/CyberZHG/torch-multi-head-attention/blob/master/torch_multi_head_attention/multi_head_attention.py"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
__all__ = ['MultiHeadAttention', 'ScaledDotProductAttention']
class ScaledDotProductAttention(nn.Module):
def forward(self, query, key, value, mask=None):
dk = query.size()[-1]
scores = query.matmul(key.transpose(-2, -1)) / math.sqrt(dk)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention = F.softmax(scores, dim=-1)
return attention.matmul(value)
class MultiHeadAttention(nn.Module):
def __init__(self,
in_features,
head_num,
bias=True,
activation=F.relu):
"""Multi-head attention.
:param in_features: Size of each input sample.
:param head_num: Number of heads.
:param bias: Whether to use the bias term.
:param activation: The activation after each linear transformation.
"""
super(MultiHeadAttention, self).__init__()
if in_features % head_num != 0:
raise ValueError('`in_features`({}) should be divisible by `head_num`({})'.format(in_features, head_num))
self.in_features = in_features
self.head_num = head_num
self.activation = activation
self.bias = bias
self.linear_q = nn.Linear(in_features, in_features, bias)
self.linear_k = nn.Linear(in_features, in_features, bias)
self.linear_v = nn.Linear(in_features, in_features, bias)
self.linear_o = nn.Linear(in_features, in_features, bias)
def forward(self, q, k, v, mask=None):
q, k, v = self.linear_q(q), self.linear_k(k), self.linear_v(v)
if self.activation is not None:
q = self.activation(q)
k = self.activation(k)
v = self.activation(v)
q = self._reshape_to_batches(q)
k = self._reshape_to_batches(k)
v = self._reshape_to_batches(v)
if mask is not None:
mask = mask.repeat(self.head_num, 1, 1)
y = ScaledDotProductAttention()(q, k, v, mask)
y = self._reshape_from_batches(y)
y = self.linear_o(y)
if self.activation is not None:
y = self.activation(y)
return y
@staticmethod
def gen_history_mask(x):
"""Generate the mask that only uses history data.
:param x: Input tensor.
:return: The mask.
"""
batch_size, seq_len, _ = x.size()
return torch.tril(torch.ones(seq_len, seq_len)).view(1, seq_len, seq_len).repeat(batch_size, 1, 1)
def _reshape_to_batches(self, x):
batch_size, seq_len, in_feature = x.size()
sub_dim = in_feature // self.head_num
return x.reshape(batch_size, seq_len, self.head_num, sub_dim)\
.permute(0, 2, 1, 3)\
.reshape(batch_size * self.head_num, seq_len, sub_dim)
def _reshape_from_batches(self, x):
batch_size, seq_len, in_feature = x.size()
batch_size //= self.head_num
out_dim = in_feature * self.head_num
return x.reshape(batch_size, self.head_num, seq_len, in_feature)\
.permute(0, 2, 1, 3)\
.reshape(batch_size, seq_len, out_dim)
def extra_repr(self):
return 'in_features={}, head_num={}, bias={}, activation={}'.format(
self.in_features, self.head_num, self.bias, self.activation,
)