Skip to content

Commit

Permalink
feat(trainer): v1.2 (#799)
Browse files Browse the repository at this point in the history
* feat(fedlearner/trainer): master run tensorflow session for save checkpoint and export model

* feat(fedlearner/common/fl_logging): add fedlearner logging handler
  • Loading branch information
whisylan authored May 25, 2021
1 parent cdb36cf commit 2aa5692
Show file tree
Hide file tree
Showing 59 changed files with 2,536 additions and 2,219 deletions.
2 changes: 1 addition & 1 deletion deploy/scripts/env_to_args.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pull_code() {
python -c "import base64; f = open('code.tar.gz', 'wb'); f.write(base64.b64decode('$1'[9:])); f.close()"
else
cp $1 code.tar.gz
fi
fi
tar -zxvf code.tar.gz
cd $cwd
}
73 changes: 64 additions & 9 deletions deploy/scripts/trainer/run_trainer_master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,67 @@ export CUDA_VISIBLE_DEVICES=
source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/env_to_args.sh

epoch_num=$(normalize_env_to_args "--epoch_num" $EPOCH_NUM)
start_date=$(normalize_env_to_args "-start_date" $START_DATE)
end_date=$(normalize_env_to_args "-end_date" $END_DATE)

python -m fedlearner.trainer_master.${ROLE}_tm \
-app_id=$APPLICATION_ID \
-data_source=$DATA_SOURCE \
-p 50051 \
$start_date $end_date $epoch_num $ONLINE_TRAINING $SUFFLE_DATA_BLOCK
sparse_estimator=$(normalize_env_to_args "--sparse-estimator" "$SPARSE_ESTIMATOR")
save_checkpoint_steps=$(normalize_env_to_args "--save-checkpoint-steps" "$SAVE_CHECKPOINT_STEPS")
save_checkpoint_secs=$(normalize_env_to_args "--save-checkpoint-secs" "$SAVE_CHECKPOINT_SECS")
summary_save_steps=$(normalize_env_to_args "--summary-save-steps" "$SUMMARY_SAVE_STEPS")
summary_save_secs=$(normalize_env_to_args "--summary-save-secs" "$SUMMARY_SAVE_SECS")
epoch_num=$(normalize_env_to_args "--epoch-num" $EPOCH_NUM)
start_date=$(normalize_env_to_args "--start-date" $START_DATE)
end_date=$(normalize_env_to_args "--end-date" $END_DATE)
shuffle=$(normalize_env_to_args "--shuffle" $SUFFLE_DATA_BLOCK)

if [ -n "$CHECKPOINT_PATH" ]; then
checkpoint_path="$CHECKPOINT_PATH"
else
checkpoint_path="$OUTPUT_BASE_DIR/checkpoints"
fi
load_checkpoint_filename=$(normalize_env_to_args "--load-checkpoint-filename" "$LOAD_CHECKPOINT_FILENAME")
load_checkpoint_filename_with_path=$(normalize_env_to_args "--load-checkpoint-filename-with-path" "$LOAD_CHECKPOINT_FILENAME_WITH_PATH")

if [[ -n "$EXPORT_PATH" ]]; then
export_path="$EXPORT_PATH"
else
export_path="$OUTPUT_BASE_DIR/exported_models"
fi

if [ -n "$CLUSTER_SPEC" ]; then
# rewrite tensorflow ClusterSpec for compatibility
# master port 50051 is used for fedlearner master server, so rewrite to 50052
# worker port 50051 is used for fedlearner worker server, so rewrite to 50052
CLUSTER_SPEC=`python -c """
import json
def rewrite_port(address, old, new):
(host, port) = address.rsplit(':', 1)
if port == old:
return host + ':' + new
return address
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
for i, master in enumerate(cluster_spec.get('Master', [])):
cluster_spec['Master'][i] = rewrite_port(master, '50051', '50052')
for i, worker in enumerate(cluster_spec.get('Worker', [])):
cluster_spec['Worker'][i] = rewrite_port(worker, '50051', '50052')
print(json.dumps({'clusterSpec': cluster_spec}))
"""`
fi

if [[ -n "${CODE_KEY}" ]]; then
pull_code ${CODE_KEY} $PWD
else
pull_code ${CODE_TAR} $PWD
fi
cd ${ROLE}

python main.py --master \
--application-id=$APPLICATION_ID \
--data-source=$DATA_SOURCE \
--master-addr=0.0.0.0:50051 \
--cluster-spec="$CLUSTER_SPEC" \
--checkpoint-path=$checkpoint_path \
$load_checkpoint_filename $load_checkpoint_filename_with_path \
--export-path=$export_path \
$sparse_estimator \
$save_checkpoint_steps $save_checkpoint_secs \
$summary_save_steps $summary_save_secs \
$epoch_num $start_date $end_date $shuffle
77 changes: 30 additions & 47 deletions deploy/scripts/trainer/run_trainer_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,62 +52,45 @@ fi
cd ${ROLE}

mode=$(normalize_env_to_args "--mode" "$MODE")
verbosity=$(normalize_env_to_args "--verbosity" "$VERBOSITY")
save_checkpoint_steps=$(normalize_env_to_args "--save-checkpoint-steps" "$SAVE_CHECKPOINT_STEPS")
save_checkpoint_secs=$(normalize_env_to_args "--save-checkpoint-secs" "$SAVE_CHECKPOINT_SECS")
sparse_estimator=$(normalize_env_to_args "--sparse-estimator" "$SPARSE_ESTIMATOR")
summary_save_steps=$(normalize_env_to_args "--summary-save-steps" "$SUMMARY_SAVE_STEPS")
batch_size=$(normalize_env_to_args "--batch-size" "$BATCH_SIZE")
learning_rate=$(normalize_env_to_args "--learning-rate" "$LEARNING_RATE")

if [ -n "$CHECKPOINT_PATH" ]; then
checkpoint_path="$CHECKPOINT_PATH"
else
checkpoint_path="$OUTPUT_BASE_DIR/checkpoints"
fi
load_checkpoint_filename=$(normalize_env_to_args "--load-checkpoint-filename" "$LOAD_CHECKPOINT_FILENAME")
load_checkpoint_filename_with_path=$(normalize_env_to_args "--load-checkpoint-filename-with-path" "$LOAD_CHECKPOINT_FILENAME_WITH_PATH")
if [ -n "$CLUSTER_SPEC" ]; then
# get master address from clusteSpec["master"]
MASTER_HOST=`python -c "
import json
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
if 'Master' in cluster_spec:
print(cluster_spec['Master'][0].split(':')[0])
"`

if [[ -n "$LOAD_CHECKPOINT_FROM" ]] && (( $WORKER_RANK == 0 )); then
python -c "
try:
import tensorflow.compat.v1 as tf
except ImportError:
import tensorflow as tf
import tensorflow_io
src = '${STORAGE_ROOT_PATH}/job_output/${LOAD_CHECKPOINT_FROM}/checkpoints'
dst = '${checkpoint_path}'
for root, _, files in tf.io.gfile.walk(src):
root = root[len(src):]
print('makedirs', dst + '/' + root)
tf.io.gfile.makedirs(dst + '/' + root)
for f in files:
src_file = src + '/' + root + '/' + f
dst_file = dst + '/' + root + '/' + f
print('copy', src_file, 'to', dst_file)
tf.io.gfile.copy(src_file, dst_file)
"
fi
# rewrite tensorflow ClusterSpec for compatibility
# master port 50051 is used for fedlearner master server, so rewrite to 50052
# worker port 50051 is used for fedlearner worker server, so rewrite to 50052
CLUSTER_SPEC=`python -c """
import json
def rewrite_port(address, old, new):
(host, port) = address.rsplit(':', 1)
if port == old:
return host + ':' + new
return address
if [[ -n "$EXPORT_PATH" ]]; then
export_path="$EXPORT_PATH"
else
export_path="$OUTPUT_BASE_DIR/exported_models"
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
for i, master in enumerate(cluster_spec.get('Master', [])):
cluster_spec['Master'][i] = rewrite_port(master, '50051', '50052')
for i, worker in enumerate(cluster_spec.get('Worker', [])):
cluster_spec['Worker'][i] = rewrite_port(worker, '50051', '50052')
print(json.dumps({'clusterSpec': cluster_spec}))
"""`
fi


python main.py \
--data-path="$DATA_PATH" \
python main.py --worker \
--application-id="$APPLICATION_ID" \
--master-addr="$MASTER_HOST:50051" \
--cluster-spec="$CLUSTER_SPEC" \
--tf-addr="$POD_IP:50052" \
--local-addr="$POD_IP:50051" \
--worker-rank="$WORKER_RANK" \
--peer-addr="$PEER_ADDR" \
--checkpoint-path=$checkpoint_path \
--export-path=$export_path \
$mode $verbosity \
$save_checkpoint_steps $sparse_estimator $summary_save_steps \
$save_checkpoint_secs $batch_size $learning_rate \
$load_checkpoint_filename \
$load_checkpoint_filename_with_path
--worker-rank="$WORKER_RANK" \
$mode $batch_size \
$sparse_estimator $learning_rate
17 changes: 7 additions & 10 deletions example/sparse_model/follower.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import logging
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'follower'
parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=256,
parser.add_argument('--batch-size', type=int, default=8,
help='Training batch size.')
parser.add_argument('--fid_version', type=int, default=1,
help="the version of fid")
Expand All @@ -41,15 +40,14 @@ def parse_fn(example):
return dataset

def serving_input_receiver_fn():
feature_map = {}
feature_map['fids_indices'] = tf.placeholder(dtype=tf.int64, shape=[None],
features = {}
features['fids_indices'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_indices')
feature_map['fids_values'] = tf.placeholder(dtype=tf.int64, shape=[None],
features['fids_values'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_values')
feature_map['fids_dense_shape'] = tf.placeholder(dtype=tf.int64,
shape=[None], name='fids_dense_shape')
return tf.estimator.export.ServingInputReceiver(
feature_map, feature_map)
features['fids_dense_shape'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_dense_shape')
return tf.estimator.export.build_raw_serving_input_receiver_fn(features)()

def model_fn(model, features, labels, mode):
global_step = tf.train.get_or_create_global_step()
Expand Down Expand Up @@ -102,7 +100,6 @@ def model_fn(model, features, labels, mode):


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
23 changes: 9 additions & 14 deletions example/sparse_model/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import logging
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'leader'

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=256,
parser.add_argument('--batch-size', type=int, default=8,
help='Training batch size.')
parser.add_argument('--fid_version', type=int, default=1,
help="the version of fid")
Expand All @@ -46,19 +45,16 @@ def parse_fn(example):


def serving_input_receiver_fn():
feature_map = {}
feature_map['fids_indices'] = tf.placeholder(dtype=tf.int64, shape=[None],
features = {}
features['fids_indices'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_indices')
feature_map['fids_values'] = tf.placeholder(dtype=tf.int64, shape=[None],
features['fids_values'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_values')
feature_map['fids_dense_shape'] = tf.placeholder(dtype=tf.int64,
shape=[None], name='fids_dense_shape')
feature_map['act1_f'] = tf.placeholder(dtype=tf.float32, name='act1_f')
receiver_tensors = {
'act1_f': feature_map['act1_f']
}
return tf.estimator.export.ServingInputReceiver(
feature_map, receiver_tensors)
features['fids_dense_shape'] = tf.placeholder(dtype=tf.int64, shape=[None],
name='fids_dense_shape')
features['act1_f'] = tf.placeholder(dtype=tf.float32, shape=[None, 64],
name='act1_f')
return tf.estimator.export.build_raw_serving_input_receiver_fn(features)()

def model_fn(model, features, labels, mode):
"""Model Builder of wide&deep learning models
Expand Down Expand Up @@ -137,7 +133,6 @@ def model_fn(model, features, labels, mode):
return model.make_spec(mode, predictions=logits)

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
7 changes: 2 additions & 5 deletions example/sparse_model/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ cd "$( dirname "${BASH_SOURCE[0]}" )"
rm -rf data model

export CUDA_VISIBLE_DEVICES=""
set -e

python make_data.py --fid_version=1
python follower.py --local-addr=localhost:50010 \
--peer-addr=localhost:50011 \
--worker-rank=0 \
--data-path=data/follower/ \
--checkpoint-path=model/follower \
--save-checkpoint-steps=100 \
Expand All @@ -17,7 +17,6 @@ python follower.py --local-addr=localhost:50010 \

python leader.py --local-addr=localhost:50011 \
--peer-addr=localhost:50010 \
--worker-rank=0 \
--data-path=data/leader/ \
--checkpoint-path=model/leader \
--save-checkpoint-steps=100 \
Expand All @@ -30,7 +29,6 @@ rm -rf data model
python make_data.py --fid_version=2
python follower.py --local-addr=localhost:50010 \
--peer-addr=localhost:50011 \
--worker-rank=0 \
--data-path=data/follower/ \
--checkpoint-path=model/follower \
--save-checkpoint-steps=100 \
Expand All @@ -40,12 +38,11 @@ python follower.py --local-addr=localhost:50010 \

python leader.py --local-addr=localhost:50011 \
--peer-addr=localhost:50010 \
--worker-rank=0 \
--data-path=data/leader/ \
--checkpoint-path=model/leader \
--save-checkpoint-steps=100 \
--export-path=model/leader/saved_model \
--sparse-estimator=True \
--fid_version=2
rm -rf data model
wait
wait
6 changes: 1 addition & 5 deletions example/wide_n_deep/follower.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import logging
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt


ROLE = 'follower'

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=256,
parser.add_argument('--batch-size', type=int, default=32,
help='Training batch size.')
args = parser.parse_args()

Expand Down Expand Up @@ -103,9 +102,6 @@ def model_fn(model, features, labels, mode):


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format="[%(levelname)s] %(asctime)s: %(message)s "
"in %(pathname)s:%(lineno)d")
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
20 changes: 14 additions & 6 deletions example/wide_n_deep/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import logging
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'leader'

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=256,
parser.add_argument('--batch-size', type=int, default=32,
help='Training batch size.')
args = parser.parse_args()

Expand Down Expand Up @@ -143,10 +142,19 @@ def model_fn(model, features, labels, mode):
return model.make_spec(mode, loss=loss, train_op=train_op,
training_hooks=[logging_hook])

class ExportModelHook(flt.trainer_worker.ExportModelHook):
def after_save(self, sess, model, export_dir, inputs, outputs):
print("**************export model hook**************")
print("sess :", sess)
print("model: ", model)
print("export_dir: ", export_dir)
print("inputs: ", inputs)
print("outpus: ", outputs)
print("*********************************************")


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format="[%(levelname)s] %(asctime)s: %(message)s "
"in %(pathname)s:%(lineno)d")
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
model_fn, serving_input_receiver_fn,
export_model_hook=ExportModelHook())
2 changes: 1 addition & 1 deletion example/wide_n_deep/make_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
os.makedirs(os.path.join(current_dir, 'data/follower'))

N = 10
chunk_size = 10000
chunk_size = 200

for i in range(N):
filename_l = os.path.join(current_dir, 'data/leader/%02d.tfrecord'%i)
Expand Down
Loading

0 comments on commit 2aa5692

Please sign in to comment.