|
本帖最后由 AIcomes 于 2018-10-26 09:29 编辑
一、分布式tensorflow
tensorflow在训练时除了支持单机单卡、单机多卡以外,还支持分布式(多机多卡)训练。分布式训练里面又分参数同步更新和异步更新。(同步更新是指各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,再把梯度值统一送到参数服务器中,由参数服务器求取梯度平均值,更新参数服务器上的参数。异步更新是指参数服务器只要收到一台机器计算的梯度值,就直接进行参数更新,无需等待其它机器)。这里我以异步更新为例,介绍如何在易学智能平台上基于分布式tensorflow来训练mnist。工程文件包括dist_tf_mnist_async.py和mnist_inference.py,完整代码见附录。
二、搭建分布式集群
先在平台上依次租赁2台不同节点上的机器,1台1050ti用来做参数服务器,1台1080ti用来当worker。具体流程见图2-1和图2-2:
图2-1 申请参数服务器
分布式tensorflow用GRPC作为通信框架,我们除了需要提供ip地址外,还需要使用接口。每台机器的端口查看方式如图2-3(点击“远程调试”即可查看端口)
图2-3 查看端口
将这些机器的ip地址和端口信息整理如下: #ps 60.205.219.151:10003
#worker 60.205.219.151:10005 三、上传代码 打开jupyter notebook,上传工程代码,具体流程见图3-1 四、开始分布式训练
4-1 在参数服务器运行代码
打开1050ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下。然后执行下面的命令(具体流程见图4-1):
- CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'
复制代码
图4-1 在参数服务器上运行代码
4-2 在worker上运行代码
打开端口号为10005的1080ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下,执行下面的命令(具体流程见图4-2):
- CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' -worker_hosts='localhost:10005'
复制代码
图4-2 在worker上运行代码
温馨提提示:在该虚拟机执行命令时,对应的ip地址变为localhost。
#1-ps(端口为10003)
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'
#2-worker
worker1(端口为10005)
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' --worker_hosts='localhost:10005'
五、训练过程截图
图5-1 worker训练日志
附录:
dist_tf_mnist_async.py完整代码
- import time
- import tensorflow as tf
- from tensorflow.examples.tutorials.mnist import input_data
- import mnist_inference
- # 配置神经网络的参数。
- BATCH_SIZE = 128
- LEARNING_RATE_BASE = 0.01
- LEARNING_RATE_DECAY = 0.99
- REGULARAZTION_RATE = 0.0001
- TRAINING_STEPS = 1000
- MOVING_AVERAGE_DECAY = 0.99
- # 模型保存的路径和文件名。
- MODEL_SAVE_PATH = "log_async"
- DATA_PATH = "MNIST_data"
- FLAGS = tf.app.flags.FLAGS
- # 指定当前程序是参数服务器还是计算服务器。
- # 参数服务器负责TensorFlow中变量的维护和管理
- # 计算服务器负责每一轮迭代时运行反向传播过程
- tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
- # 指定集群中的参数服务器地址。
- tf.app.flags.DEFINE_string('ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
- 'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
- # 指定集群中的计算服务器地址。
- tf.app.flags.DEFINE_string(
- 'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
- 'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
- # 指定当前程序的任务ID。
- # TensorFlow会自动根据参数服务器/计算服务器列表中的端口号来启动服务。
- # 注意参数服务器和计算服务器的编号都是从0开始的。
- tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')
- # 定义TensorFlow的计算图,并返回每一轮迭代时需要运行的操作。
- def train_model(x, y_, is_chief):
- # 计算神经网络前向传播的结果
- y = mnist_inference.build_model(x)
- global_step = tf.Variable(0, trainable=False)
- cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
- loss = tf.reduce_mean(cross_entropy)
- learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
- train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
- # 定义每一轮迭代需要运行的操作。
- if is_chief:
- # 计算变量的滑动平均值。
- variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
- variables_averages_op = variable_averages.apply(tf.trainable_variables())
- with tf.control_dependencies([variables_averages_op, train_op]):
- train_op = tf.no_op()
- return global_step, loss, train_op
- def main(argv=None):
- # 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。
- ps_hosts = FLAGS.ps_hosts.split(',')
- worker_hosts = FLAGS.worker_hosts.split(',')
- print('PS hosts are: %s' % ps_hosts)
- print('Worker hosts are: %s' % worker_hosts)
- n_workers = len(worker_hosts)
- cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
- # 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。
- server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)
- # 参数服务器只需要管理TensorFlow中的变量,不需要执行训练的过程。server.join()会一致停在这条语句上。
- if FLAGS.job_name == 'ps':
- with tf.device("/cpu:0"):
- server.join()
- # 定义计算服务器需要运行的操作。
- # 在所有的计算服务器中有一个是主计算服务器
- # 它除了负责计算反向传播的结果,还负责日志和保存模块
- is_chief = (FLAGS.task_id == 0)
- mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)
- # 通过tf.train.replica_device_setter函数来指定执行每一个运算的设备。
- # tf.train.replica_device_setter函数会自动将所有的参数分配到参数服务器上,而
- # 计算分配到当前的计算服务器上,
- with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)):
- # 定义输入并得到每一轮迭代需要运行的操作。
- x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
- y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
- global_step, loss, train_op = train_model(x, y_, is_chief)
- # 定义用于保存模型的saver。
- saver = tf.train.Saver()
- # 定义日志输出操作。
- summary_op = tf.summary.merge_all()
- # 定义变量初始化操作。
- init_op = tf.global_variables_initializer()
- # 通过tf.train.Supervisor管理训练深度学习模型时的通用功能。
- # tf.train.Supervisor能统一管理队列操作、模型保存、日志输出以及会话的生成
- sv = tf.train.Supervisor(
- is_chief=is_chief, # 定义当前计算服务器是否为祝计算服务器,只有主服务器会保存模型以及输出日志
- logdir=MODEL_SAVE_PATH, # 指定保存模型和输出日志的地址
- init_op=init_op, # 指定初始化操作
- summary_op=summary_op, # 指定日志生成操作
- saver=saver, # 指定用于保存模型的saver
- global_step=global_step, # 指定当前迭代的轮次,这个会用于保存模型文件
- save_model_secs=60, # 指定保存模型的时间间隔
- save_summaries_secs=60) # 指定日志输出的时间间隔
- sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
- # 通过tf.train.Supervisor生成会话。
- sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
- step = 0
- start_time = time.time()
- # 执行迭代过程。在迭代过程中,Supervisor会帮助输出日志并保存模型,不需要直接调用
- while not sv.should_stop():
- xs, ys = mnist.train.next_batch(BATCH_SIZE)
- _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x: xs, y_: ys})
- if global_step_value >= TRAINING_STEPS:
- break
- # 每隔一段时间输出训练信息。
- if step > 0 and step % 10 == 0:
- duration = time.time() - start_time
- sec_per_batch = duration / global_step_value
- format_str = ('After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)')
- print(format_str %(step, global_step_value, loss_value, sec_per_batch))
- step += 1
- sv.stop()
- if __name__ == "__main__":
- tf.app.run()
复制代码 mnist_inference.py完整代码
- import tensorflow as tf
- INPUT_NODE=784
- OUTPUT_NODE=10
- def get_weight_varible(name,shape):
- return tf.get_variable(name, shape=shape,
- initializer=tf.contrib.layers.xavier_initializer())
- def get_bias_varible(name,shape):
- return tf.get_variable(name, shape=shape,
- initializer=tf.contrib.layers.xavier_initializer())
- #filter_shape: [f_h, f_w, f_ic, f_oc]
- def conv2d(layer_name, x, filter_shape):
- with tf.variable_scope(layer_name):
- w = get_weight_varible('w', filter_shape)
- b = get_bias_varible('b', filter_shape[-1])
- y = tf.nn.bias_add(tf.nn.conv2d(input=x, filter=w, strides=[1, 1, 1, 1], padding='SAME'), b)
- return y
- def pool2d(layer_name, x):
- with tf.variable_scope(layer_name):
- y = tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
- return y
- #inp_shape: [N, L]
- #out_shape: [N, L]
- def fc(layer_name, x, inp_shape, out_shape):
- with tf.variable_scope(layer_name):
- inp_dim = inp_shape[-1]
- out_dim = out_shape[-1]
- y = tf.reshape(x, shape=inp_shape)
- w = get_weight_varible('w', [inp_dim, out_dim])
- b = get_bias_varible('b', [out_dim])
- y = tf.add(tf.matmul(y, w), b)
- return y
- def build_model(x):
- y = tf.reshape(x,shape=[-1, 28, 28, 1])
- #layer 1
- y = conv2d('conv_1', y, [3, 3, 1, 8])
- y = pool2d('pool_1', y)
- #layer 2
- y = conv2d('conv_2', y, [3, 3, 8, 16])
- y = pool2d('pool_2', y)
- #layer fc
- y = fc('fc', y, [-1, 7*7*16], [-1, 10])
- print('y',y)
- return y
复制代码
|
|