AIcomes 发表于 2018-10-24 11:52:40

在易学智能平台上基于分布式tensorflow训练mnist

本帖最后由 AIcomes 于 2018-10-26 09:29 编辑

一、分布式tensorflow   
    tensorflow在训练时除了支持单机单卡、单机多卡以外,还支持分布式(多机多卡)训练。分布式训练里面又分参数同步更新和异步更新。(同步更新是指各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,再把梯度值统一送到参数服务器中,由参数服务器求取梯度平均值,更新参数服务器上的参数。异步更新是指参数服务器只要收到一台机器计算的梯度值,就直接进行参数更新,无需等待其它机器)。这里我以异步更新为例,介绍如何在易学智能平台上基于分布式tensorflow来训练mnist。工程文件包括dist_tf_mnist_async.py和mnist_inference.py,完整代码见附录。
二、搭建分布式集群
    先在平台上依次租赁2台不同节点上的机器,1台1050ti用来做参数服务器,1台1080ti用来当worker。具体流程见图2-1和图2-2:   
   
图2-1 申请参数服务器

图2-2 申请1台计算服务器(worker)    分布式tensorflow用GRPC作为通信框架,我们除了需要提供ip地址外,还需要使用接口。每台机器的端口查看方式如图2-3(点击“远程调试”即可查看端口)

图2-3 查看端口
    将这些机器的ip地址和端口信息整理如下:    #ps    60.205.219.151:10003
    #worker    60.205.219.151:10005三、上传代码    打开jupyter notebook,上传工程代码,具体流程见图3-1    图3-1 上传工程代码四、开始分布式训练
    4-1 在参数服务器运行代码
      打开1050ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下。然后执行下面的命令(具体流程见图4-1):
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'   
图4-1 在参数服务器上运行代码
    4-2 在worker上运行代码
      打开端口号为10005的1080ti虚拟机终端,先进入/home/ubuntu/MyFiles目录下,执行下面的命令(具体流程见图4-2):
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' -worker_hosts='localhost:10005'
图4-2 在worker上运行代码
温馨提提示:在该虚拟机执行命令时,对应的ip地址变为localhost。
#1-ps(端口为10003)
CUDA_VISIBLE_DEVICES='' python3 dist_tf_mnist_async.py --job_name='ps' --task_id=0 --ps_hosts='localhost:10003' --worker_hosts='60.205.219.151:10005'

#2-worker
worker1(端口为10005)
CUDA_VISIBLE_DEVICES=0 python3 dist_tf_mnist_async.py --job_name='worker' --task_id=0 --ps_hosts='60.205.219.151:10003' --worker_hosts='localhost:10005'
五、训练过程截图
图5-1 worker训练日志
附录:
dist_tf_mnist_async.py完整代码
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import mnist_inference

# 配置神经网络的参数。
BATCH_SIZE = 128
LEARNING_RATE_BASE = 0.01
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99

# 模型保存的路径和文件名。
MODEL_SAVE_PATH = "log_async"
DATA_PATH = "MNIST_data"

FLAGS = tf.app.flags.FLAGS

# 指定当前程序是参数服务器还是计算服务器。
# 参数服务器负责TensorFlow中变量的维护和管理
# 计算服务器负责每一轮迭代时运行反向传播过程
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
# 指定集群中的参数服务器地址。
tf.app.flags.DEFINE_string('ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
    'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
# 指定集群中的计算服务器地址。
tf.app.flags.DEFINE_string(
    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
    'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
# 指定当前程序的任务ID。
# TensorFlow会自动根据参数服务器/计算服务器列表中的端口号来启动服务。
# 注意参数服务器和计算服务器的编号都是从0开始的。
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')


# 定义TensorFlow的计算图,并返回每一轮迭代时需要运行的操作。
def train_model(x, y_, is_chief):
    # 计算神经网络前向传播的结果
    y = mnist_inference.build_model(x)
    global_step = tf.Variable(0, trainable=False)
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
    loss = tf.reduce_mean(cross_entropy)
    learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
    train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
    # 定义每一轮迭代需要运行的操作。
    if is_chief:
      # 计算变量的滑动平均值。
      variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
      variables_averages_op = variable_averages.apply(tf.trainable_variables())
      with tf.control_dependencies():
            train_op = tf.no_op()
    return global_step, loss, train_op


def main(argv=None):
    # 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。
    ps_hosts = FLAGS.ps_hosts.split(',')
    worker_hosts = FLAGS.worker_hosts.split(',')
    print('PS hosts are: %s' % ps_hosts)
    print('Worker hosts are: %s' % worker_hosts)
    n_workers = len(worker_hosts)

    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    # 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。
    server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)

    # 参数服务器只需要管理TensorFlow中的变量,不需要执行训练的过程。server.join()会一致停在这条语句上。
    if FLAGS.job_name == 'ps':
      with tf.device("/cpu:0"):
            server.join()

    # 定义计算服务器需要运行的操作。
    # 在所有的计算服务器中有一个是主计算服务器
    # 它除了负责计算反向传播的结果,还负责日志和保存模块
    is_chief = (FLAGS.task_id == 0)
    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)

    # 通过tf.train.replica_device_setter函数来指定执行每一个运算的设备。
    # tf.train.replica_device_setter函数会自动将所有的参数分配到参数服务器上,而
    # 计算分配到当前的计算服务器上,
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)):

      # 定义输入并得到每一轮迭代需要运行的操作。
      x = tf.placeholder(tf.float32, , name='x-input')
      y_ = tf.placeholder(tf.float32, , name='y-input')
      global_step, loss, train_op = train_model(x, y_, is_chief)

      # 定义用于保存模型的saver。
      saver = tf.train.Saver()
      # 定义日志输出操作。
      summary_op = tf.summary.merge_all()
      # 定义变量初始化操作。
      init_op = tf.global_variables_initializer()
      # 通过tf.train.Supervisor管理训练深度学习模型时的通用功能。
      # tf.train.Supervisor能统一管理队列操作、模型保存、日志输出以及会话的生成
      sv = tf.train.Supervisor(
            is_chief=is_chief,          # 定义当前计算服务器是否为祝计算服务器,只有主服务器会保存模型以及输出日志
            logdir=MODEL_SAVE_PATH,   # 指定保存模型和输出日志的地址
            init_op=init_op,            # 指定初始化操作
            summary_op=summary_op,      # 指定日志生成操作
            saver=saver,                # 指定用于保存模型的saver
            global_step=global_step,    # 指定当前迭代的轮次,这个会用于保存模型文件
            save_model_secs=60,         # 指定保存模型的时间间隔
            save_summaries_secs=60)   # 指定日志输出的时间间隔
      sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
      # 通过tf.train.Supervisor生成会话。
      sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
      step = 0
      start_time = time.time()

      # 执行迭代过程。在迭代过程中,Supervisor会帮助输出日志并保存模型,不需要直接调用
      while not sv.should_stop():
            xs, ys = mnist.train.next_batch(BATCH_SIZE)
            _, loss_value, global_step_value = sess.run(, feed_dict={x: xs, y_: ys})
            if global_step_value >= TRAINING_STEPS:
                break
            # 每隔一段时间输出训练信息。
            if step > 0 and step % 10 == 0:
                duration = time.time() - start_time
                sec_per_batch = duration / global_step_value
                format_str = ('After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)')
                print(format_str %(step, global_step_value, loss_value, sec_per_batch))
            step += 1
    sv.stop()

if __name__ == "__main__":
    tf.app.run()mnist_inference.py完整代码
import tensorflow as tf

INPUT_NODE=784
OUTPUT_NODE=10

def get_weight_varible(name,shape):
    return tf.get_variable(name, shape=shape,
                     initializer=tf.contrib.layers.xavier_initializer())

def get_bias_varible(name,shape):
    return tf.get_variable(name, shape=shape,
                     initializer=tf.contrib.layers.xavier_initializer())

#filter_shape:
def conv2d(layer_name, x, filter_shape):
    with tf.variable_scope(layer_name):
      w = get_weight_varible('w', filter_shape)
      b = get_bias_varible('b', filter_shape[-1])
      y = tf.nn.bias_add(tf.nn.conv2d(input=x, filter=w, strides=, padding='SAME'), b)
      return y

def pool2d(layer_name, x):
    with tf.variable_scope(layer_name):
      y = tf.nn.max_pool(x, ksize=, strides=, padding='SAME')
      return y

#inp_shape:
#out_shape:
def fc(layer_name, x, inp_shape, out_shape):
    with tf.variable_scope(layer_name):
      inp_dim = inp_shape[-1]
      out_dim = out_shape[-1]
      y = tf.reshape(x, shape=inp_shape)
      w = get_weight_varible('w', )
      b = get_bias_varible('b', )
      y = tf.add(tf.matmul(y, w), b)
      return y

def build_model(x):
    y = tf.reshape(x,shape=[-1, 28, 28, 1])
    #layer 1
    y = conv2d('conv_1', y, )
    y = pool2d('pool_1', y)
    #layer 2
    y = conv2d('conv_2', y, )
    y = pool2d('pool_2', y)
    #layer fc
    y = fc('fc', y, [-1, 7*7*16], [-1, 10])
    print('y',y)
    return y












shaoheshaohe 发表于 2018-10-28 21:45:13

太好了
找了好久,
终于找到这样的文章了,
分布式训练,yeah
页: [1]
查看完整版本: 在易学智能平台上基于分布式tensorflow训练mnist