Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempt to add ParallelExecutor to transformer model #804

Open
wants to merge 2 commits into
base: dev-static
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fluid/neural_machine_translation/transformer/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class TrainTaskConfig(object):
use_gpu = False
use_gpu = True
# the epoch number to train.
pass_num = 2

Expand Down
98 changes: 28 additions & 70 deletions fluid/neural_machine_translation/transformer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,60 +387,34 @@ def transformer(
src_pad_idx,
trg_pad_idx,
pos_pad_idx, ):
# The shapes here act as placeholder.
# The shapes set here is to pass the infer-shape in compile time. The actual
# shape of src_word in run time is:
# [batch_size * max_src_length_in_a_batch, 1].
src_word = layers.data(
name=input_data_names[0],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of src_pos in runtime is:
# [batch_size * max_src_length_in_a_batch, 1].
src_pos = layers.data(
name=input_data_names[1],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of trg_word is in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
trg_word = layers.data(
name=input_data_names[2],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of trg_pos in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
trg_pos = layers.data(
name=input_data_names[3],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of src_slf_attn_bias in runtime is:
# [batch_size, n_head, max_src_length_in_a_batch, max_src_length_in_a_batch].
# This input is used to remove attention weights on paddings.
src_slf_attn_bias = layers.data(
name=input_data_names[4],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
# The actual shape of trg_slf_attn_bias in runtime is:
# [batch_size, n_head, max_trg_length_in_batch, max_trg_length_in_batch].
# This is used to remove attention weights on paddings and subsequent words.
trg_slf_attn_bias = layers.data(
name=input_data_names[5],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
# The actual shape of trg_src_attn_bias in runtime is:
# [batch_size, n_head, max_trg_length_in_batch, max_src_length_in_batch].
# This is used to remove attention weights on paddings.
trg_src_attn_bias = layers.data(
name=input_data_names[6],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
file_obj = fluid.layers.open_recordio_file(
filename='./wmt16.recordio',
shapes=[
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size, n_head, max_length, max_length],
[batch_size, n_head, max_length, max_length],
[batch_size, n_head, max_length, max_length],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
],
dtypes=[
'int64',
'int64',
'int64',
'int64',
'float32',
'float32',
'float32',
'int64',
'float32',
],
lod_levels=[0] * 9)

src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = fluid.layers.read_file(
file_obj)

enc_input = prepare_encoder(
src_word,
Expand Down Expand Up @@ -492,22 +466,6 @@ def transformer(
num_flatten_dims=2),
shape=[-1, trg_vocab_size],
act="softmax")
# The actual shape of gold in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
gold = layers.data(
name=input_data_names[7],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
cost = layers.cross_entropy(input=predict, label=gold)
# The actual shape of weights in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
# Padding index do not contribute to the total loss. This Weight is used to
# cancel padding index in calculating the loss.
weights = layers.data(
name=input_data_names[8],
shape=[batch_size * max_length, 1],
dtype="float32",
append_batch_size=False)
weighted_cost = cost * weights
return layers.reduce_sum(weighted_cost)
100 changes: 52 additions & 48 deletions fluid/neural_machine_translation/transformer/train.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import numpy as np
import sys
import time

import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.dataset.wmt16 as wmt16

from model import transformer, position_encoding_init
from optim import LearningRateScheduler
from config import TrainTaskConfig, ModelHyperParams, \
pos_enc_param_names, input_data_names


def prepare_batch_input(insts, input_data_names, src_pad_idx, trg_pad_idx,
max_length, n_head, place):
def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias. Then, convert the numpy
Expand All @@ -25,9 +28,9 @@ def __pad_batch_data(insts,
return_attn_bias=True,
return_max_len=True):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
return_list = []
max_len = max(len(inst) for inst in insts)
inst_data = np.array(
Expand Down Expand Up @@ -63,13 +66,6 @@ def __pad_batch_data(insts,
return_list += [max_len]
return return_list if len(return_list) > 1 else return_list[0]

def data_to_tensor(data_list, name_list, input_dict, place):
assert len(data_list) == len(name_list)
for i in range(len(name_list)):
tensor = fluid.LoDTensor()
tensor.set(data_list[i], place)
input_dict[name_list[i]] = tensor

src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data(
[inst[0] for inst in insts], src_pad_idx, is_target=False)
trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data(
Expand All @@ -80,18 +76,13 @@ def data_to_tensor(data_list, name_list, input_dict, place):
False, False, False)
lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1])

data_to_tensor([
return [
src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias,
trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight
], input_data_names, input_dict, place)

return input_dict
]


def main():
place = fluid.CUDAPlace(0) if TrainTaskConfig.use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place)

cost = transformer(
ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
Expand All @@ -101,11 +92,8 @@ def main():
ModelHyperParams.dropout, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx)

lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps, place,
TrainTaskConfig.learning_rate)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_scheduler.learning_rate,
learning_rate=TrainTaskConfig.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
Expand All @@ -118,32 +106,48 @@ def main():
buf_size=100000),
batch_size=TrainTaskConfig.batch_size)

# Initialize the parameters.
exe.run(fluid.framework.default_startup_program())
for pos_enc_param_name in pos_enc_param_names:
pos_enc_param = fluid.global_scope().find_var(
pos_enc_param_name).get_tensor()
pos_enc_param.set(
position_encoding_init(ModelHyperParams.max_length + 1,
ModelHyperParams.d_model), place)

reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size)

with fluid.recordio_writer.create_recordio_writer(
"./wmt16.recordio") as writer:
for batch in reader():
for tensor in prepare_batch_input(
batch, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head):
t = fluid.LoDTensor()
t.set(tensor, fluid.CPUPlace())
writer.append_tensor(t)
writer.complete_append_tensor()

exe = fluid.ParallelExecutor(loss_name=cost.name, use_cuda=True)
def fn(pass_id, batch_id):
t1 = time.time()
outs = exe.run([cost.name])
cost_val = np.array(outs[0])
print("pass_id = " + str(pass_id) + " batch = " + str(batch_id) +
" cost = " + str(cost_val))
return time.time() - t1

# with open('/tmp/program', 'w') as f:
# f.write('%s' % fluid.framework.default_main_program())
total_time = 0.0
count = 0
for pass_id in xrange(TrainTaskConfig.pass_num):
for batch_id, data in enumerate(train_data()):
# The current program desc is coupled with batch_size, thus all
# mini-batches must have the same number of instances currently.
if len(data) != TrainTaskConfig.batch_size:
continue
data_input = prepare_batch_input(
data, input_data_names, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.max_length,
ModelHyperParams.n_head, place)
lr_scheduler.update_learning_rate(data_input)
outs = exe.run(fluid.framework.default_main_program(),
feed=data_input,
fetch_list=[cost])
cost_val = np.array(outs[0])
print("pass_id = " + str(pass_id) + " batch = " + str(batch_id) +
" cost = " + str(cost_val))
for batch_id in xrange(10000):
if batch_id == 1:
with profiler.profiler('All', 'total', '/tmp/transformer'):
duration = fn(pass_id, batch_id)
duration = fn(pass_id, batch_id)
else:
duration = fn(pass_id, batch_id)
count += 1
total_time += duration
print("avg: " + str(total_time / count) + " cur: " + str(duration))
sys.stdout.flush()


if __name__ == "__main__":
Expand Down