Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions app-packages/tensorflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Environments
* Redhat 7
* Linux Kernel 3.10.0
* Docker 1.12.3
* Hadoop 2.6.5
* Docker image ytensorflow:0.2.1, [Dockerfile](https://github.com/tensorflow/ecosystem/blob/master/docker/Dockerfile.hdfs)

# Use slider to run a tensorflow cluster
1. Make sure slider could work well, see the [Slider Start](https://slider.incubator.apache.org/docs/getting_started.html)
2. Download app-packages/tensorflow to $SLIDER_HOME/app-packages/tensorflow
3. Put your tensorflow scripts under app-packages/tensorflow/package/files
4. Set "site.global.hadoop.conf", "site.global.user.scripts.entry", "site.global.user.data.dir", "site.global.user.checkpoint.dir" according to your situation in "appConfig.default.json"
5. Set resource in resources.default.json if you need
6. As is often the case, there is no need to update metainfo.json
7. Start your tensorflow cluster
```
cd $SLIDER_HOME/app-packages/tensorflow
slider create [app-name] --appdef . --template appConfig-default.json --resources resources.default.json
```

# Use ytensorflow to run a tensorflow cluster
## Introduction
ytensorflow(tensorflow on YARN admin client), is used to submit and manage tensorflow cluster on YARN. It aims to make submit more easier.
## Command
```
ytensorflow cluster -start ./config.json -files ./mnist.py
ytensorflow cluster -stop <appName>
ytensorflow cluster -status <appName>
ytensorflow version
```

# User scripts requirements
The following arguments will be generated by the framework and passed to user script. You should use them in the right positon, just as the "mnist.py"
* job_name, worker or ps
* task_index
* ps_hosts
* worker_hosts
* ckp_dir, directory for storing the checkpoints
* work_dir, directory where the user script is stored
19 changes: 19 additions & 0 deletions app-packages/tensorflow/appConfig.default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"schema": "http://example.org/specification/v2.0.0",
"metadata": {
},
"global": {
"site.global.slider.allowed.ports": "25000-30000",
"site.global.hadoop.conf": "/etc/hadoop/conf",
"site.global.yarn.cgroup.root": "/hadoop-yarn",
"site.global.user.scripts.entry": "ymnist.py",
"site.global.user.checkpoint.prefix": "hdfs://hdpdev/user/${USER_NAME}/.slider/cluster",
"site.global.user.name": "${USER_NAME}",
"site.global.zookeeper.quorum": "${ZK_HOST}",
"site.global.docker.image": "ytensorflow:0.2.1",
"site.global.ps.port": "${ps.ALLOCATED_PORT}{PER_CONTAINER}",
"site.global.chiefworker.port": "${chiefworker.ALLOCATED_PORT}{PER_CONTAINER}",
"site.global.worker.port": "${worker.ALLOCATED_PORT}{PER_CONTAINER}",
"site.global.tensorboard.port": "${tensorboard.ALLOCATED_PORT}{PER_CONTAINER}"
}
}
85 changes: 85 additions & 0 deletions app-packages/tensorflow/metainfo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{
"schemaVersion": "2.1",
"application": {
"name": "tensorflow",
"version": "0.1.1",
"exportGroups": [
{
"name": "ps",
"exports": [
{
"name": "host_port",
"value": "${ps_HOST}:${site.global.ps.port}"
}
]
},
{
"name": "chiefworker",
"exports": [
{
"name": "host_port",
"value": "${chiefworker_HOST}:${site.global.chiefworker.port}"
}
]
},
{
"name": "worker",
"exports": [
{
"name": "host_port",
"value": "${worker_HOST}:${site.global.worker.port}"
}
]
},
{
"name": "tensorboard",
"exports": [
{
"name": "url",
"value": "http://${tensorboard_HOST}:${site.global.tensorboard.port}"
}
]
}
],
"components": [
{
"name": "ps",
"compExports": "ps-host_port",
"commandScript": {
"script": "scripts/tensorflow.py",
"scriptType": "PYTHON"
}
},
{
"name": "chiefworker",
"compExports": "chiefworker-host_port",
"commandScript": {
"script": "scripts/tensorflow.py",
"scriptType": "PYTHON"
}
},
{
"name": "worker",
"compExports": "worker-host_port",
"commandScript": {
"script": "scripts/tensorflow.py",
"scriptType": "PYTHON"
}
},
{
"name": "tensorboard",
"compExports": "tensorboard-url",
"commandScript": {
"script": "scripts/tensorflow.py",
"scriptType": "PYTHON"
}
}
],
"packages": [
{
"type": "folder",
"name": "files"
}
]
}
}
95 changes: 95 additions & 0 deletions app-packages/tensorflow/package/files/yarn_bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time
from abc import abstractmethod, ABCMeta
import tensorflow as tf

flags = tf.app.flags
# Flags for configuring the task
flags.DEFINE_string("job_name", None, "job name: worker or ps")
flags.DEFINE_integer("task_index", 0, "Worker task index, should be >= 0")
flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("ckp_dir", None, "Directory for storing the checkpoints")
flags.DEFINE_string("work_dir", "/tmp/tf_on_yarn", "Work directory")

FLAGS = flags.FLAGS

class YarnBootstrap(object):
def __init__(self):
pass

__metaclass__ = ABCMeta

@abstractmethod
def worker_do(self, server, cluster_spec, task_id):
pass

@abstractmethod
def ps_do(self, server, cluster_spec, task_id):
pass

def device_and_server(self):
# If FLAGS.job_name is not set, we're running single-machine TensorFlow.
# Don't set a device.
if FLAGS.job_name is None:
print("Running single-machine training")
return (None, "", "")

# Otherwise we're running distributed TensorFlow.
print("Running distributed training")
if FLAGS.task_index is None or FLAGS.task_index == "":
raise ValueError("Must specify an explicit `task_index`")
if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
raise ValueError("Must specify an explicit `ps_hosts`")
if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "":
raise ValueError("Must specify an explicit `worker_hosts`")

cluster_spec = tf.train.ClusterSpec({
"ps": FLAGS.ps_hosts.split(","),
"worker": FLAGS.worker_hosts.split(","),
})
server = tf.train.Server(
cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
time.sleep(60)
if FLAGS.job_name == "ps":
self.ps_do(server, cluster_spec, FLAGS.task_index)
server.join()

worker_device = "/job:worker/task:{}".format(FLAGS.task_index)
return (
tf.train.replica_device_setter(
worker_device=worker_device,
cluster=cluster_spec),
server, cluster_spec
)

def start(self, unused_args):
if FLAGS.ckp_dir is None or FLAGS.ckp_dir == "":
raise ValueError("Must specify an explicit `ckp_dir`")
device, server, cluster_spec = self.device_and_server()
with tf.device(device):
self.worker_do(server, cluster_spec, FLAGS.task_index)

88 changes: 88 additions & 0 deletions app-packages/tensorflow/package/files/ymnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

"""

from __future__ import division
from __future__ import print_function

import os

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import mnist
from tensorflow.python.training import training_util
from yarn_bootstrap import *

FLAGS = tf.app.flags.FLAGS

class Ymnist(YarnBootstrap):
def worker_do(self, server, cluster_spec, task_id):
print("Checkpoint dir: " + FLAGS.ckp_dir)
images, labels = self.inputs(100)
logits = mnist.inference(images, 128, 128)
loss = mnist.loss(logits, labels)
train_op = mnist.training(loss, 0.01)
target = "" if server == "" else server.target
with tf.train.MonitoredTrainingSession(
master=target,
is_chief=(task_id == 0),
checkpoint_dir=FLAGS.ckp_dir) as sess:
step = 0
while not sess.should_stop() and step < 1000000:
sess.run(train_op)
step = training_util.global_step(sess, training_util.get_global_step(sess.graph))
print("Global step " + str(step))

def ps_do(self, server, cluster_spec, task_id):
print("Starting ps " + str(task_id))

def read_and_decode(self, filename_queue):
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
# Defaults are not specified since both keys are required.
features={
'image_raw': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
})
image = tf.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([mnist.IMAGE_PIXELS])
# Convert from [0, 255] -> [-0.5, 0.5] floats.
image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
# Convert label from a scalar uint8 tensor to an int32 scalar.
label = tf.cast(features['label'], tf.int32)
return image, label

def inputs(self, batch_size):
filename = os.path.join("hdfs://hdpdev/user/danrtsey.wy/mnist-data", "train.tfrecords")
with tf.name_scope('input'):
filename_queue = tf.train.string_input_producer([filename])
image, label = self.read_and_decode(filename_queue)
images, sparse_labels = tf.train.shuffle_batch(
[image, label], batch_size=batch_size, num_threads=2,
capacity=1000 + 3 * batch_size,
# Ensures a minimum amount of shuffling of examples.
min_after_dequeue=1000)
return images, sparse_labels

def main(unused_argv):
Ymnist().start(unused_args=unused_argv)

if __name__ == "__main__":
tf.app.run()
Loading