查看: 2731|回复: 1

[远程开发] 在易学智能平台上基于分布式tensorflow训练mnist

[复制链接]

66

主题

151

帖子

1033

积分

xdtech

Rank: 5Rank: 5

积分
1033
发表于 2018-10-24 11:52:40 | 显示全部楼层 |阅读模式
本帖最后由 AIcomes 于 2018-10-26 09:29 编辑

一、分布式tensorflow   
    tensorflow在训练时除了支持单机单卡、单机多卡以外,还支持分布式(多机多卡)训练。分布式训练里面又分参数同步更新和异步更新。(同步更新是指各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,再把梯度值统一送到参数服务器中,由
参数服务器求取梯度平均值,更新参数服务器上的参数。异步更新是指参数服务器只要收到一台机器计算的梯度值,就直接进行参数更新,无需等待其它机器)。这里我以异步更新为例,介绍如何在易学智能平台上基于分布式tensorflow来训练mnist。工程文件包括dist_tf_mnist_async.py和mnist_inference.py,完整代码见附录。
二、搭建分布式集群
    先在平台上依次租赁2台不同节点上的机器,1台1050ti用来做参数服务器,1台1080ti用来当worker。具体流程见图2-1和图2-2:   
   
申请参数服务器.gif
图2-1 申请参数服务器
申请1台worker.gif

图2-2 申请1台计算服务器(worker)
    分布式tensorflow用GRPC作为通信框架,我们除了需要提供ip地址外,还需要使用接口。每台机器的端口查看方式如图2-3(点击“远程调试”即可查看端口)
查看端口.gif
图2-3 查看端口

    将这些机器的ip地址和端口信息整理如下:
    #ps
    60.205.219.151:10003

    #worker
    60.205.219.151:10005
三、上传代码
    打开jupyter notebook,上传工程代码,具体流程见图3-1
    上传代码.gif
图3-1 上传工程代码
四、开始分布式训练
    4-1 在参数服务器运行代码
        打开1050ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下。然后执行下面的命令(具体流程见图4-1):
  1. CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'
复制代码
    在参数服务器上运行代码.gif
图4-1 在参数服务器上运行代码

    4-2 在worker上运行代码
        打开端口号为10005的1080ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下,执行下面的命令(具体流程见图4-2):
  1. CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' -worker_hosts='localhost:10005'
复制代码
在worker上运行代码.gif
图4-2 在worker上运行代码

温馨提提示:在该虚拟机执行命令时,对应的ip地址变为localhost。
#1-ps(端口为10003)
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'

#2-worker
worker1(端口为10005)
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' --worker_hosts='localhost:10005'
五、训练过程截图
训练中间结果.png
图5-1 worker训练日志

附录:
dist_tf_mnist_async.py完整代码
  1. import time
  2. import tensorflow as tf
  3. from tensorflow.examples.tutorials.mnist import input_data

  4. import mnist_inference

  5. # 配置神经网络的参数。
  6. BATCH_SIZE = 128
  7. LEARNING_RATE_BASE = 0.01
  8. LEARNING_RATE_DECAY = 0.99
  9. REGULARAZTION_RATE = 0.0001
  10. TRAINING_STEPS = 1000
  11. MOVING_AVERAGE_DECAY = 0.99

  12. # 模型保存的路径和文件名。
  13. MODEL_SAVE_PATH = "log_async"
  14. DATA_PATH = "MNIST_data"

  15. FLAGS = tf.app.flags.FLAGS

  16. # 指定当前程序是参数服务器还是计算服务器。
  17. # 参数服务器负责TensorFlow中变量的维护和管理
  18. # 计算服务器负责每一轮迭代时运行反向传播过程
  19. tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
  20. # 指定集群中的参数服务器地址。
  21. tf.app.flags.DEFINE_string('ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
  22.     'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
  23. # 指定集群中的计算服务器地址。
  24. tf.app.flags.DEFINE_string(
  25.     'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
  26.     'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
  27. # 指定当前程序的任务ID。
  28. # TensorFlow会自动根据参数服务器/计算服务器列表中的端口号来启动服务。
  29. # 注意参数服务器和计算服务器的编号都是从0开始的。
  30. tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')


  31. # 定义TensorFlow的计算图,并返回每一轮迭代时需要运行的操作。
  32. def train_model(x, y_, is_chief):
  33.     # 计算神经网络前向传播的结果
  34.     y = mnist_inference.build_model(x)
  35.     global_step = tf.Variable(0, trainable=False)
  36.     cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
  37.     loss = tf.reduce_mean(cross_entropy)
  38.     learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
  39.     train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
  40.     # 定义每一轮迭代需要运行的操作。
  41.     if is_chief:
  42.         # 计算变量的滑动平均值。
  43.         variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
  44.         variables_averages_op = variable_averages.apply(tf.trainable_variables())
  45.         with tf.control_dependencies([variables_averages_op, train_op]):
  46.             train_op = tf.no_op()
  47.     return global_step, loss, train_op


  48. def main(argv=None):
  49.     # 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。
  50.     ps_hosts = FLAGS.ps_hosts.split(',')
  51.     worker_hosts = FLAGS.worker_hosts.split(',')
  52.     print('PS hosts are: %s' % ps_hosts)
  53.     print('Worker hosts are: %s' % worker_hosts)
  54.     n_workers = len(worker_hosts)

  55.     cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
  56.     # 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。
  57.     server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)

  58.     # 参数服务器只需要管理TensorFlow中的变量,不需要执行训练的过程。server.join()会一致停在这条语句上。
  59.     if FLAGS.job_name == 'ps':
  60.         with tf.device("/cpu:0"):
  61.             server.join()

  62.     # 定义计算服务器需要运行的操作。
  63.     # 在所有的计算服务器中有一个是主计算服务器
  64.     # 它除了负责计算反向传播的结果,还负责日志和保存模块
  65.     is_chief = (FLAGS.task_id == 0)
  66.     mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)

  67.     # 通过tf.train.replica_device_setter函数来指定执行每一个运算的设备。
  68.     # tf.train.replica_device_setter函数会自动将所有的参数分配到参数服务器上,而
  69.     # 计算分配到当前的计算服务器上,
  70.     with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)):

  71.         # 定义输入并得到每一轮迭代需要运行的操作。
  72.         x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
  73.         y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
  74.         global_step, loss, train_op = train_model(x, y_, is_chief)

  75.         # 定义用于保存模型的saver。
  76.         saver = tf.train.Saver()
  77.         # 定义日志输出操作。
  78.         summary_op = tf.summary.merge_all()
  79.         # 定义变量初始化操作。
  80.         init_op = tf.global_variables_initializer()
  81.         # 通过tf.train.Supervisor管理训练深度学习模型时的通用功能。
  82.         # tf.train.Supervisor能统一管理队列操作、模型保存、日志输出以及会话的生成
  83.         sv = tf.train.Supervisor(
  84.             is_chief=is_chief,          # 定义当前计算服务器是否为祝计算服务器,只有主服务器会保存模型以及输出日志
  85.             logdir=MODEL_SAVE_PATH,     # 指定保存模型和输出日志的地址
  86.             init_op=init_op,            # 指定初始化操作
  87.             summary_op=summary_op,      # 指定日志生成操作
  88.             saver=saver,                # 指定用于保存模型的saver
  89.             global_step=global_step,    # 指定当前迭代的轮次,这个会用于保存模型文件
  90.             save_model_secs=60,         # 指定保存模型的时间间隔
  91.             save_summaries_secs=60)     # 指定日志输出的时间间隔
  92.         sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
  93.         # 通过tf.train.Supervisor生成会话。
  94.         sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
  95.         step = 0
  96.         start_time = time.time()

  97.         # 执行迭代过程。在迭代过程中,Supervisor会帮助输出日志并保存模型,不需要直接调用
  98.         while not sv.should_stop():
  99.             xs, ys = mnist.train.next_batch(BATCH_SIZE)
  100.             _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x: xs, y_: ys})
  101.             if global_step_value >= TRAINING_STEPS:
  102.                 break
  103.             # 每隔一段时间输出训练信息。
  104.             if step > 0 and step % 10 == 0:
  105.                 duration = time.time() - start_time
  106.                 sec_per_batch = duration / global_step_value
  107.                 format_str = ('After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)')
  108.                 print(format_str %(step, global_step_value, loss_value, sec_per_batch))
  109.             step += 1
  110.     sv.stop()

  111. if __name__ == "__main__":
  112.     tf.app.run()
复制代码
mnist_inference.py完整代码
  1. import tensorflow as tf

  2. INPUT_NODE=784
  3. OUTPUT_NODE=10

  4. def get_weight_varible(name,shape):
  5.     return tf.get_variable(name, shape=shape,
  6.                        initializer=tf.contrib.layers.xavier_initializer())

  7. def get_bias_varible(name,shape):
  8.     return tf.get_variable(name, shape=shape,
  9.                        initializer=tf.contrib.layers.xavier_initializer())

  10. #filter_shape: [f_h, f_w, f_ic, f_oc]
  11. def conv2d(layer_name, x, filter_shape):
  12.     with tf.variable_scope(layer_name):
  13.         w = get_weight_varible('w', filter_shape)
  14.         b = get_bias_varible('b', filter_shape[-1])
  15.         y = tf.nn.bias_add(tf.nn.conv2d(input=x, filter=w, strides=[1, 1, 1, 1], padding='SAME'), b)
  16.         return y

  17. def pool2d(layer_name, x):
  18.     with tf.variable_scope(layer_name):
  19.         y = tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
  20.         return y

  21. #inp_shape: [N, L]
  22. #out_shape: [N, L]
  23. def fc(layer_name, x, inp_shape, out_shape):
  24.     with tf.variable_scope(layer_name):
  25.         inp_dim = inp_shape[-1]
  26.         out_dim = out_shape[-1]
  27.         y = tf.reshape(x, shape=inp_shape)
  28.         w = get_weight_varible('w', [inp_dim, out_dim])
  29.         b = get_bias_varible('b', [out_dim])
  30.         y = tf.add(tf.matmul(y, w), b)
  31.         return y

  32. def build_model(x):
  33.     y = tf.reshape(x,shape=[-1, 28, 28, 1])
  34.     #layer 1
  35.     y = conv2d('conv_1', y, [3, 3, 1, 8])
  36.     y = pool2d('pool_1', y)
  37.     #layer 2
  38.     y = conv2d('conv_2', y, [3, 3, 8, 16])
  39.     y = pool2d('pool_2', y)
  40.     #layer fc
  41.     y = fc('fc', y, [-1, 7*7*16], [-1, 10])
  42.     print('y',y)
  43.     return y
复制代码













回复

使用道具 举报

665

主题

1234

帖子

6670

积分

xdtech

Rank: 5Rank: 5

积分
6670
发表于 2018-10-28 21:45:13 | 显示全部楼层
太好了
找了好久,
终于找到这样的文章了,
分布式训练,yeah
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表