Skip to content

Commit

Permalink
Feat: multiple data sources for trainer (#976)
Browse files Browse the repository at this point in the history
* feat(common/stats): add stats for trainer

* feat(trainer): support multiple data source

* feat(trainer): support multiple data sources

* polish

* fix: fix script input

* fix shuffle in day.

* fix trainer worker bug

* fix: remove type echo

* fix(trainer): output log to app directory

* disable bridge suicide

* sleep 5 second for waiting data block

* feat[trainer]: support multiple local workers

* fix: remove stats hook

* fix: add multiple local workers to cluster_spec

* fix: use fake bridge for local worker

* fix: wait local worker bug

* feat: add local train worker

* fix(trainer): split local worker out

* fix(trainer): use dynamic port

* fix: use SERVICE_ID for PEER_ADDR

* feat: sort data blocks by end_time

* feat: use two queue for local and remote data

* fix: remove unused code

* feat: support local data path

* polish

Co-authored-by: whisylan <[email protected]>
  • Loading branch information
nolanliou and whisylan authored Aug 30, 2022
1 parent 7463a12 commit 9aa6198
Show file tree
Hide file tree
Showing 19 changed files with 1,310 additions and 127 deletions.
90 changes: 90 additions & 0 deletions deploy/scripts/trainer/run_trainer_local_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/bash

# Copyright 2020 The FedLearner Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -ex

export CUDA_VISIBLE_DEVICES=
export MODEL_NAME=${APPLICATION_ID}

source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
source /app/deploy/scripts/env_to_args.sh

if [[ -n "${CODE_KEY}" ]]; then
pull_code ${CODE_KEY} $PWD
else
pull_code ${CODE_TAR} $PWD
fi

cd ${ROLE}

mode=$(normalize_env_to_args "--mode" "$MODE")
sparse_estimator=$(normalize_env_to_args "--sparse-estimator" "$SPARSE_ESTIMATOR")
batch_size=$(normalize_env_to_args "--batch-size" "$BATCH_SIZE")
learning_rate=$(normalize_env_to_args "--learning-rate" "$LEARNING_RATE")

if [ -n "$CLUSTER_SPEC" ]; then
# get master address from clusteSpec["master"]
MASTER_HOST=`python -c "
import json
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
if 'Master' in cluster_spec:
print(cluster_spec['Master'][0].split(':')[0])
"`
NUM_WORKER=`python -c """
import json
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
print(len(cluster_spec.get('Worker', [])))
"""`

# rewrite tensorflow ClusterSpec for compatibility
# master port 50051 is used for fedlearner master server, so rewrite to 50052
# worker port 50051 is used for fedlearner worker server, so rewrite to 50052
CLUSTER_SPEC=`python -c """
import json
def rewrite_port(address, old, new):
(host, port) = address.rsplit(':', 1)
if port == old:
return host + ':' + new
return address
cluster_spec = json.loads('$CLUSTER_SPEC')['clusterSpec']
for i, ps in enumerate(cluster_spec.get('PS', [])):
cluster_spec['PS'][i] = rewrite_port(ps, '50051', '50052')
for i, master in enumerate(cluster_spec.get('Master', [])):
cluster_spec['Master'][i] = rewrite_port(master, '50051', '50052')
for i, worker in enumerate(cluster_spec.get('Worker', [])):
cluster_spec['Worker'][i] = rewrite_port(worker, '50051', '50052')
if 'LocalWorker' in cluster_spec:
for i, worker in enumerate(cluster_spec.get('LocalWorker', [])):
cluster_spec['Worker'].append(rewrite_port(worker, '50051', '50052'))
del cluster_spec['LocalWorker']
print(json.dumps({'clusterSpec': cluster_spec}))
"""`
fi

server_port=$(normalize_env_to_args "--server-port" "$PORT1")

WORKER_RANK=`python -c "print($INDEX + $NUM_WORKER)"`

python main.py --worker \
--local-worker \
--application-id="$APPLICATION_ID" \
--master-addr="$MASTER_HOST:50051" \
--cluster-spec="$CLUSTER_SPEC" \
--worker-rank="$WORKER_RANK" \
$server_port $mode $batch_size \
$sparse_estimator $learning_rate
15 changes: 12 additions & 3 deletions deploy/scripts/trainer/run_trainer_master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ summary_save_secs=$(normalize_env_to_args "--summary-save-secs" "$SUMMARY_SAVE_S
epoch_num=$(normalize_env_to_args "--epoch-num" $EPOCH_NUM)
start_date=$(normalize_env_to_args "--start-date" $START_DATE)
end_date=$(normalize_env_to_args "--end-date" $END_DATE)
shuffle=$(normalize_env_to_args "--shuffle" $SUFFLE_DATA_BLOCK)
extra_params=$(normalize_env_to_args "--extra-params" "$EXTRA_PARAMS")
export_model=$(normalize_env_to_args "--export-model" $EXPORT_MODEL)
shuffle=$(normalize_env_to_args "--shuffle" $SUFFLE_DATA_BLOCK)
shuffle_in_day=$(normalize_env_to_args "--shuffle-in-day" $SHUFFLE_IN_DAY)
local_data_source=$(normalize_env_to_args "--local-data-source" $LOCAL_DATA_SOURCE)
local_start_date=$(normalize_env_to_args "--local-start-date" $LOCAL_START_DATE)
local_end_date=$(normalize_env_to_args "--local-end-date" $LOCAL_END_DATE)

if [ -n "$CHECKPOINT_PATH" ]; then
checkpoint_path="--checkpoint-path=$CHECKPOINT_PATH"
Expand Down Expand Up @@ -69,6 +73,10 @@ for i, master in enumerate(cluster_spec.get('Master', [])):
cluster_spec['Master'][i] = rewrite_port(master, '50051', '50052')
for i, worker in enumerate(cluster_spec.get('Worker', [])):
cluster_spec['Worker'][i] = rewrite_port(worker, '50051', '50052')
if 'LocalWorker' in cluster_spec:
for i, worker in enumerate(cluster_spec.get('LocalWorker', [])):
cluster_spec['Worker'].append(rewrite_port(worker, '50051', '50052'))
del cluster_spec['LocalWorker']
print(json.dumps({'clusterSpec': cluster_spec}))
"""`
fi
Expand Down Expand Up @@ -101,5 +109,6 @@ python main.py --master \
$mode $sparse_estimator \
$save_checkpoint_steps $save_checkpoint_secs \
$summary_save_steps $summary_save_secs \
$epoch_num $start_date $end_date $shuffle $extra_params \
$export_model
$local_data_source $local_start_date $local_end_date \
$epoch_num $start_date $end_date $shuffle $shuffle_in_day \
$extra_params $export_model
29 changes: 7 additions & 22 deletions deploy/scripts/trainer/run_trainer_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,7 @@ source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
source /app/deploy/scripts/env_to_args.sh

# When the WORKER_GROUPS is "2,4", this script would update the WORKER_RANK
# to the worker's index within their own group, e.g.
#
# + WORKER_RANK 0 -> 0
# + WORKER_RANK 1 -> 1
# + WORKER_RANK 2 -> 0
# + WORKER_RANK 3 -> 1
# + WORKER_RANK 4 -> 2
# + WORKER_RANK 5 -> 3
#
if [ -n "$WORKER_GROUPS" ]; then
IFS=',' read -ra WORKER_GROUPS <<< "$WORKER_GROUPS"
for i in "${WORKER_GROUPS[@]}"; do
if (( $WORKER_RANK - $i < 0 )); then
break
else
WORKER_RANK=$( expr $WORKER_RANK - $i )
fi
done
fi
PEER_ADDR=$SERVICE_ID

if [[ -n "${CODE_KEY}" ]]; then
pull_code ${CODE_KEY} $PWD
Expand Down Expand Up @@ -85,6 +66,10 @@ for i, master in enumerate(cluster_spec.get('Master', [])):
cluster_spec['Master'][i] = rewrite_port(master, '50051', '50052')
for i, worker in enumerate(cluster_spec.get('Worker', [])):
cluster_spec['Worker'][i] = rewrite_port(worker, '50051', '50052')
if 'LocalWorker' in cluster_spec:
for i, worker in enumerate(cluster_spec.get('LocalWorker', [])):
cluster_spec['Worker'].append(rewrite_port(worker, '50051', '50052'))
del cluster_spec['LocalWorker']
print(json.dumps({'clusterSpec': cluster_spec}))
"""`
fi
Expand All @@ -102,6 +87,6 @@ python main.py --worker \
--cluster-spec="$CLUSTER_SPEC" \
--local-addr="$POD_IP:${LISTEN_PORT}" \
--peer-addr="$PEER_ADDR" \
--worker-rank="$WORKER_RANK" \
--worker-rank="$INDEX" \
$server_port $mode $batch_size \
$sparse_estimator $learning_rate $extra_params
$sparse_estimator $learning_rate
13 changes: 10 additions & 3 deletions example/sparse_model/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import logging

import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

Expand All @@ -25,6 +27,8 @@
help='Training batch size.')
parser.add_argument('--fid_version', type=int, default=1,
help="the version of fid")
parser.add_argument('--local-worker', action='store_true',
help="is local worker")
args = parser.parse_args()

def input_fn(bridge, trainer_master=None):
Expand All @@ -35,6 +39,7 @@ def parse_fn(example):
feature_map = dict()
feature_map['fids'] = tf.VarLenFeature(tf.int64)
feature_map['example_id'] = tf.FixedLenFeature([], tf.string)
feature_map["act1_f"] = tf.FixedLenFeature([64], tf.float32)
feature_map["y"] = tf.FixedLenFeature([], tf.int64)
features = tf.parse_example(example, features=feature_map)
return features, dict(y=features.pop('y'))
Expand Down Expand Up @@ -114,12 +119,14 @@ def model_fn(model, features, labels, mode):
act1_l = tf.nn.relu(tf.nn.bias_add(tf.matmul(embed_output, w1l), b1l))
act2_l = tf.nn.bias_add(tf.matmul(act1_l, w2), b2)

if mode == tf.estimator.ModeKeys.TRAIN:
if mode == tf.estimator.ModeKeys.TRAIN and not args.local_worker:
act1_f = model.recv('act1_f', tf.float32, require_grad=True)
else:
act1_f = features['act1_f']
output = tf.concat([act2_l, act1_f], axis=1)
logits = tf.matmul(output, w3)
if mode == tf.estimator.ModeKeys.PREDICT:
return model.make_spec(mode, predictions=logits)

if mode == tf.estimator.ModeKeys.TRAIN:
y = labels['y']
Expand All @@ -134,10 +141,10 @@ def model_fn(model, features, labels, mode):
train_op = model.minimize(optimizer, loss, global_step=global_step)
return model.make_spec(mode, loss=loss, train_op=train_op,
training_hooks=[logging_hook])
if mode == tf.estimator.ModeKeys.PREDICT:
return model.make_spec(mode, predictions=logits)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
5 changes: 4 additions & 1 deletion example/sparse_model/make_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from tensorflow.core.example.example_pb2 import Example
from tensorflow.core.example.feature_pb2 import Features, Feature, \
Int64List, BytesList
Int64List, BytesList, FloatList

current_dir = os.path.dirname(__file__)
shutil.rmtree(os.path.join(current_dir, 'data'), ignore_errors=True)
Expand Down Expand Up @@ -82,6 +82,9 @@ def _fake_sample(slots):
Feature(int64_list=Int64List(value=[random.randint(0, 1)]))
features_l['fids'] = \
Feature(int64_list=Int64List(value=_fake_sample(LEADER_SLOTS)))
features_l['act1_f'] = \
Feature(float_list=FloatList(value=np.random.uniform(
low=0., high=1., size=(64,))))
fl.write(Example(features=Features(feature=features_l))
.SerializeToString())

Expand Down
19 changes: 19 additions & 0 deletions example/sparse_model/test_local_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

cd "$( dirname "${BASH_SOURCE[0]}" )"
rm -rf data model

export CUDA_VISIBLE_DEVICES=""
set -e

rm -rf data model
python make_data.py --fid_version=2
python leader.py --local-addr=localhost:50011 \
--local-worker \
--data-path=data/leader/ \
--checkpoint-path=model/leader \
--save-checkpoint-steps=100 \
--export-path=model/leader/saved_model \
--sparse-estimator=True \
--fid_version=2
rm -rf data model
60 changes: 60 additions & 0 deletions fedlearner/trainer/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,63 @@ def func():
return tf.convert_to_tensor(self.receive(name), dtype=dtype)

return tf.py_function(func=func, inp=[], Tout=dtype, name='recv_'+name)


class FakeBridge(object):
def __init__(self):
self._condition = threading.Condition()
self._current_iter_id = None
self._next_iter_id = 0
self._iter_started_at = 0
self._termiated_at = None

@property
def terminated_at(self):
return self._termiated_at

def connect(self):
fl_logging.debug("[Fake Bridge] connected")

def start(self):
with self._condition:
fl_logging.debug("[Fake Bridge] started")
self._current_iter_id = self._next_iter_id
self._next_iter_id += 1
self._iter_started_at = time.time()

def commit(self):
with self._condition:
fl_logging.debug("[Fake Bridge] send commit iter_id: %d",
self._current_iter_id)
iter_id = self._current_iter_id
duration = (time.time() - self._iter_started_at) * 1000
self._current_iter_id = None

with _gctx.stats_client.pipeline() as pipe:
pipe.gauge("trainer.fake_bridge.iterator_step", iter_id)
pipe.timing("trainer.fake_bridge.iterator_timing", duration)

def terminate(self):
fl_logging.debug("[Fake Bridge] terminated")
with self._condition:
self._termiated_at = int(time.time())

def load_data_block(self, count, block_id):
fl_logging.debug("[Fake Bridge] load DataBlock with id %s", block_id)
return True

def send_op(self, name, x):
def func(x):
raise RuntimeError("Unexcepted call send op")

out = tf.py_function(func=func, inp=[x], Tout=[], name='send_' + name)
return out

def receive_op(self, name, dtype):
def func():
raise RuntimeError("Unexcepted call receive op")

return tf.py_function(func=func, inp=[], Tout=[dtype])[0]

def register_data_block_handler(self, handler):
pass
5 changes: 3 additions & 2 deletions fedlearner/trainer/data/data_block_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def get_next_block(self):
block = self._block_queue.get()
return block

def make_dataset(self):
def make_dataset(self, compression_type=None):
def gen():
while True:
block = self.get_next_block()
Expand All @@ -75,7 +75,8 @@ def gen():
yield block.data_path

dataset = tf.data.Dataset.from_generator(gen, tf.string)
dataset = tf.data.TFRecordDataset(dataset)
dataset = tf.data.TFRecordDataset(dataset,
compression_type=compression_type)
dataset = dataset.batch(self._batch_size, drop_remainder=True)
dataset = dataset.prefetch(1)
return dataset
Expand Down
Loading

0 comments on commit 9aa6198

Please sign in to comment.