在易学智能平台上基于分布式tensorflow训练mnist
本帖最后由 AIcomes 于 2018-10-26 09:29 编辑一、分布式tensorflow
tensorflow在训练时除了支持单机单卡、单机多卡以外,还支持分布式(多机多卡)训练。分布式训练里面又分参数同步更新和异步更新。(同步更新是指各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,再把梯度值统一送到参数服务器中,由参数服务器求取梯度平均值,更新参数服务器上的参数。异步更新是指参数服务器只要收到一台机器计算的梯度值,就直接进行参数更新,无需等待其它机器)。这里我以异步更新为例,介绍如何在易学智能平台上基于分布式tensorflow来训练mnist。工程文件包括dist_tf_mnist_async.py和mnist_inference.py,完整代码见附录。
二、搭建分布式集群
先在平台上依次租赁2台不同节点上的机器,1台1050ti用来做参数服务器,1台1080ti用来当worker。具体流程见图2-1和图2-2:
图2-1 申请参数服务器
图2-2 申请1台计算服务器(worker) 分布式tensorflow用GRPC作为通信框架,我们除了需要提供ip地址外,还需要使用接口。每台机器的端口查看方式如图2-3(点击“远程调试”即可查看端口)
图2-3 查看端口
将这些机器的ip地址和端口信息整理如下: #ps 60.205.219.151:10003
#worker 60.205.219.151:10005三、上传代码 打开jupyter notebook,上传工程代码,具体流程见图3-1 图3-1 上传工程代码四、开始分布式训练
4-1 在参数服务器运行代码
打开1050ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下。然后执行下面的命令(具体流程见图4-1):
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'
图4-1 在参数服务器上运行代码
4-2 在worker上运行代码
打开端口号为10005的1080ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下,执行下面的命令(具体流程见图4-2):
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' -worker_hosts='localhost:10005'
图4-2 在worker上运行代码
温馨提提示:在该虚拟机执行命令时,对应的ip地址变为localhost。
#1-ps(端口为10003)
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'
#2-worker
worker1(端口为10005)
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' --worker_hosts='localhost:10005'
五、训练过程截图
图5-1 worker训练日志
附录:
dist_tf_mnist_async.py完整代码
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import mnist_inference
# 配置神经网络的参数。
BATCH_SIZE = 128
LEARNING_RATE_BASE = 0.01
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99
# 模型保存的路径和文件名。
MODEL_SAVE_PATH = "log_async"
DATA_PATH = "MNIST_data"
FLAGS = tf.app.flags.FLAGS
# 指定当前程序是参数服务器还是计算服务器。
# 参数服务器负责TensorFlow中变量的维护和管理
# 计算服务器负责每一轮迭代时运行反向传播过程
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
# 指定集群中的参数服务器地址。
tf.app.flags.DEFINE_string('ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
# 指定集群中的计算服务器地址。
tf.app.flags.DEFINE_string(
'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
# 指定当前程序的任务ID。
# TensorFlow会自动根据参数服务器/计算服务器列表中的端口号来启动服务。
# 注意参数服务器和计算服务器的编号都是从0开始的。
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')
# 定义TensorFlow的计算图,并返回每一轮迭代时需要运行的操作。
def train_model(x, y_, is_chief):
# 计算神经网络前向传播的结果
y = mnist_inference.build_model(x)
global_step = tf.Variable(0, trainable=False)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
loss = tf.reduce_mean(cross_entropy)
learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
# 定义每一轮迭代需要运行的操作。
if is_chief:
# 计算变量的滑动平均值。
variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
variables_averages_op = variable_averages.apply(tf.trainable_variables())
with tf.control_dependencies():
train_op = tf.no_op()
return global_step, loss, train_op
def main(argv=None):
# 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
print('PS hosts are: %s' % ps_hosts)
print('Worker hosts are: %s' % worker_hosts)
n_workers = len(worker_hosts)
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)
# 参数服务器只需要管理TensorFlow中的变量,不需要执行训练的过程。server.join()会一致停在这条语句上。
if FLAGS.job_name == 'ps':
with tf.device("/cpu:0"):
server.join()
# 定义计算服务器需要运行的操作。
# 在所有的计算服务器中有一个是主计算服务器
# 它除了负责计算反向传播的结果,还负责日志和保存模块
is_chief = (FLAGS.task_id == 0)
mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)
# 通过tf.train.replica_device_setter函数来指定执行每一个运算的设备。
# tf.train.replica_device_setter函数会自动将所有的参数分配到参数服务器上,而
# 计算分配到当前的计算服务器上,
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)):
# 定义输入并得到每一轮迭代需要运行的操作。
x = tf.placeholder(tf.float32, , name='x-input')
y_ = tf.placeholder(tf.float32, , name='y-input')
global_step, loss, train_op = train_model(x, y_, is_chief)
# 定义用于保存模型的saver。
saver = tf.train.Saver()
# 定义日志输出操作。
summary_op = tf.summary.merge_all()
# 定义变量初始化操作。
init_op = tf.global_variables_initializer()
# 通过tf.train.Supervisor管理训练深度学习模型时的通用功能。
# tf.train.Supervisor能统一管理队列操作、模型保存、日志输出以及会话的生成
sv = tf.train.Supervisor(
is_chief=is_chief, # 定义当前计算服务器是否为祝计算服务器,只有主服务器会保存模型以及输出日志
logdir=MODEL_SAVE_PATH, # 指定保存模型和输出日志的地址
init_op=init_op, # 指定初始化操作
summary_op=summary_op, # 指定日志生成操作
saver=saver, # 指定用于保存模型的saver
global_step=global_step, # 指定当前迭代的轮次,这个会用于保存模型文件
save_model_secs=60, # 指定保存模型的时间间隔
save_summaries_secs=60) # 指定日志输出的时间间隔
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
# 通过tf.train.Supervisor生成会话。
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
step = 0
start_time = time.time()
# 执行迭代过程。在迭代过程中,Supervisor会帮助输出日志并保存模型,不需要直接调用
while not sv.should_stop():
xs, ys = mnist.train.next_batch(BATCH_SIZE)
_, loss_value, global_step_value = sess.run(, feed_dict={x: xs, y_: ys})
if global_step_value >= TRAINING_STEPS:
break
# 每隔一段时间输出训练信息。
if step > 0 and step % 10 == 0:
duration = time.time() - start_time
sec_per_batch = duration / global_step_value
format_str = ('After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)')
print(format_str %(step, global_step_value, loss_value, sec_per_batch))
step += 1
sv.stop()
if __name__ == "__main__":
tf.app.run()mnist_inference.py完整代码
import tensorflow as tf
INPUT_NODE=784
OUTPUT_NODE=10
def get_weight_varible(name,shape):
return tf.get_variable(name, shape=shape,
initializer=tf.contrib.layers.xavier_initializer())
def get_bias_varible(name,shape):
return tf.get_variable(name, shape=shape,
initializer=tf.contrib.layers.xavier_initializer())
#filter_shape:
def conv2d(layer_name, x, filter_shape):
with tf.variable_scope(layer_name):
w = get_weight_varible('w', filter_shape)
b = get_bias_varible('b', filter_shape[-1])
y = tf.nn.bias_add(tf.nn.conv2d(input=x, filter=w, strides=, padding='SAME'), b)
return y
def pool2d(layer_name, x):
with tf.variable_scope(layer_name):
y = tf.nn.max_pool(x, ksize=, strides=, padding='SAME')
return y
#inp_shape:
#out_shape:
def fc(layer_name, x, inp_shape, out_shape):
with tf.variable_scope(layer_name):
inp_dim = inp_shape[-1]
out_dim = out_shape[-1]
y = tf.reshape(x, shape=inp_shape)
w = get_weight_varible('w', )
b = get_bias_varible('b', )
y = tf.add(tf.matmul(y, w), b)
return y
def build_model(x):
y = tf.reshape(x,shape=[-1, 28, 28, 1])
#layer 1
y = conv2d('conv_1', y, )
y = pool2d('pool_1', y)
#layer 2
y = conv2d('conv_2', y, )
y = pool2d('pool_2', y)
#layer fc
y = fc('fc', y, [-1, 7*7*16], [-1, 10])
print('y',y)
return y
太好了
找了好久,
终于找到这样的文章了,
分布式训练,yeah
页:
[1]