标签:方式 rail sof nes content min opd eva handle
同步和异步训练是由optimizer来决定的。
1. 同步训练
同步训练需要使用SyncReplicasOptimizer,参考https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer 。其他optimizer都属于异步训练方式。
同步训练实现在sync_replicas_optimizer.py文件中的def apply_gradient()方法中。假设有n个参数:
对于PS,需要创建n个参数收集器(每个参数对应一个收集器),每一个worker将自己计算得到的grad梯度推送到收集器上(推送是使用Send/Recv OP实现的)。每个参数收集器收集到所有的worker的推送值时,对所有的值求平均,然后更新参数的值。当所有的参数都更新完成之后,对global_step加1,并将global_step推送到每个worker的token_queue中,worker更新global_step,开始下一次训练。
对于Worker,从PS拉取需要的参数,计算grad梯度值,然后将grad推送到相应的参数收集器。推送之后从token_queue中拉取新的global_step(拉取不到新的global_step 就等待?),继续下一次训练。
2. 异步训练
训练代码中使用的是GradientDescentOptimizer(继承了Optimizer),调用其minimize()方法,minimize()方法就是先调用compute_gradients()然后调用apply_gradient()方法。
异步训练的实现在optimizer.py文件中的def apply_gradient()方法中(GradientDescentOptimizer没有重写Optimizer的apply_gradient()方法)。参考https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow。
对于Worker,worker从PS拉取需要的参数,拉取过程是没有锁的,因此拉取的值可能包含了其他worker的修改,也可能没包含。计算gard梯度值,然后将grad梯度值发送给相应PS。
对于PS,ps收到grad值之后根据优化算法(如,SGD, SGD with Momentum, Adagrad, Adam, etc.)来更新参数。
注:在异步训练中,假设worker1读取参数w1,worker2再读取参数w1,然后worker1更新梯度,worker2再更新梯度,worker1更新的梯度就被worker2覆盖掉了。如果想对修改做同步,GradientDescentOptimizer的构造函数提供了use_locking参数。
代码逻辑如下:
def apply_gradients(self, grads_and_vars, global_step=None, name=None):
"""Apply gradients to variables.
This is the second part of `minimize()`. It returns an `Operation` that
applies gradients.
Args:
grads_and_vars: List of (gradient, variable) pairs as returned by
`compute_gradients()`.
global_step: Optional `Variable` to increment by one after the
variables have been updated.
name: Optional name for the returned operation. Default to the
name passed to the `Optimizer` constructor.
Returns:
An `Operation` that applies the specified gradients. If `global_step`
was not None, that operation also increments `global_step`.
Raises:
TypeError: If `grads_and_vars` is malformed.
ValueError: If none of the variables have gradients.
RuntimeError: If you should use `_distributed_apply()` instead.
"""
# This is a default implementation of apply_gradients() that can be shared
# by most optimizers. It relies on the subclass implementing the following
# methods: _create_slots(), _prepare(), _apply_dense(), and _apply_sparse().
# Handle DistributionStrategy case.
if distribute_lib.get_cross_tower_context():
raise RuntimeError("Use `_distributed_apply()` instead of "
"`apply_gradients()` in a cross-tower context.")
# TODO(isaprykin): Get rid of `has_distribution_strategy()` check by
# always calling _distributed_apply(), using the default distribution
# as needed.
if distribute_lib.has_distribution_strategy():
grads_and_vars = get_filtered_grad_fn(lambda _: grads_and_vars)()
return distribute_lib.get_tower_context().merge_call(
self._distributed_apply, grads_and_vars, global_step, name)
# No DistributionStrategy case.
grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works.
if not grads_and_vars:
raise ValueError("No variables provided.")
converted_grads_and_vars = []
for g, v in grads_and_vars:
if g is not None:
try:
# Convert the grad to Tensor or IndexedSlices if necessary.
g = ops.convert_to_tensor_or_indexed_slices(g)
except TypeError:
raise TypeError(
"Gradient must be convertible to a Tensor"
" or IndexedSlices, or None: %s" % g)
if not isinstance(g, (ops.Tensor, ops.IndexedSlices)):
raise TypeError(
"Gradient must be a Tensor, IndexedSlices, or None: %s" % g)
p = _get_processor(v) # _RefVariableProcessor
converted_grads_and_vars.append((g, v, p)) # v._ref() = Tensor("weights/Variable:0", shape=(784, 10), dtype=float32_ref, device=/job:ps/task:0)
# ((<tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>, <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f6798012410>), (<tf.Tensor ‘train/gradients/softmax/Add_grad/tuple/control_dependency_1:0‘ shape=(10,) dtype=float32>, <tf.Variable ‘biases/Variable:0‘ shape=(10,) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f67980124d0>))
converted_grads_and_vars = tuple(converted_grads_and_vars)
var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]
if not var_list:
raise ValueError("No gradients provided for any variable: %s." %
([str(v) for _, _, v in converted_grads_and_vars],))
with ops.init_scope():
self._create_slots(var_list)
update_ops = []
with ops.name_scope(name, self._name) as name:
self._prepare()
for grad, var, processor in converted_grads_and_vars:
if grad is None:
continue
# We colocate all ops created in _apply_dense or _apply_sparse
# on the same device as the variable.
# TODO(apassos): figure out how to get the variable name here.
if context.executing_eagerly() or isinstance(
var,
resource_variable_ops.ResourceVariable) and not var._in_graph_mode: # pylint: disable=protected-access
scope_name = ""
else:
scope_name = var.op.name # var.op = {name: "weights/Variable", op: "VariableV2", device: "/job:ps/task:0"}
with ops.name_scope("update_" + scope_name), ops.colocate_with(var):
update_ops.append(processor.update_op(self, grad)) # 111行 def update_op() 更新op,worker->ps
if global_step is None:
apply_updates = self._finish(update_ops, name)
else:
with ops.control_dependencies([self._finish(update_ops, "update")]):
with ops.colocate_with(global_step):
if isinstance(global_step, resource_variable_ops.ResourceVariable):
# TODO(apassos): the implicit read in assign_add is slow; consider
# making it less so.
apply_updates = resource_variable_ops.assign_add_variable_op(
global_step.handle,
ops.convert_to_tensor(1, dtype=global_step.dtype),
name=name)
else:
apply_updates = state_ops.assign_add(global_step, 1, name=name)
if not context.executing_eagerly():
if isinstance(apply_updates, ops.Tensor):
apply_updates = apply_updates.op
train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
if apply_updates not in train_op:
train_op.append(apply_updates)
return apply_updates
apply_gradients()方法中调用了update_ops.append(processor.update_op(self, grad))方法:
def update_op(self, optimizer, g):
if isinstance(g, ops.Tensor): # update_op = {name: "train/GradientDescent/update_weights/Variable/ApplyGradientDescent",op: "ApplyGradientDescent", input: "weights/Variable", input: "train/GradientDescent/learning_rate", input: "train/gradients/softmax/MatMul_grad/tuple/control_dependency_1", device: "/job:ps/task:0"}
update_op = optimizer._apply_dense(g, self._v) # pylint: disable=protected-access
if self._v.constraint is not None:
with ops.control_dependencies([update_op]):
return self._v.assign(self._v.constraint(self._v))
else:
return update_op # return
else:
assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a "
"tensor nor IndexedSlices.")
if self._v.constraint is not None:
raise RuntimeError(
"Cannot use a constraint function on a sparse variable.")
# pylint: disable=protected-access
return optimizer._apply_sparse_duplicate_indices(g, self._v)
update_op(self, grad))方法调用了optimizer的_app_dense()方法,由于这里的optimizer是GradientDescentOptimizer,所以是调用GradientDescentOptimizer的_app_dense()方法:
def _apply_dense(self, grad, var):
return training_ops.apply_gradient_descent(
var, # <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>
math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype), # <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>
grad, # Tensor("train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0", shape=(784, 10), dtype=float32, device=/job:worker/task:0)
use_locking=self._use_locking).op # false
又调用了apply_gradient_descent()方法:
def apply_gradient_descent(var, alpha, delta, use_locking=False, name=None):
r"""Update ‘*var‘ by subtracting ‘alpha‘ * ‘delta‘ from it.
Args:
var: A mutable `Tensor`. Must be one of the following types: `float32`, `float64`, `int32`, `uint8`, `int16`, `int8`, `complex64`, `int64`, `qint8`, `quint8`, `qint32`, `bfloat16`, `uint16`, `complex128`, `half`, `uint32`, `uint64`.
Should be from a Variable().
alpha: A `Tensor`. Must have the same type as `var`.
Scaling factor. Must be a scalar.
delta: A `Tensor`. Must have the same type as `var`. The change.
use_locking: An optional `bool`. Defaults to `False`.
If `True`, the subtraction will be protected by a lock;
otherwise the behavior is undefined, but may exhibit less contention.
name: A name for the operation (optional).
Returns:
A mutable `Tensor`. Has the same type as `var`.
"""
_ctx = _context._context
if _ctx is None or not _ctx._eager_context.is_eager:
if use_locking is None:
use_locking = False
use_locking = _execute.make_bool(use_locking, "use_locking")
_, _, _op = _op_def_lib._apply_op_helper(
"ApplyGradientDescent", var=var, alpha=alpha, delta=delta,
use_locking=use_locking, name=name)
_result = _op.outputs[:]
_inputs_flat = _op.inputs
_attrs = ("T", _op.get_attr("T"), "use_locking",
_op.get_attr("use_locking"))
_execute.record_gradient(
"ApplyGradientDescent", _inputs_flat, _attrs, _result, name)
_result, = _result
return _result
else:
raise RuntimeError("apply_gradient_descent op does not support eager execution. Arg ‘out‘ is a ref.")
又调用了_apply_op_helper()方法:
# keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>}
def _apply_op_helper(self, op_type_name, name=None, **keywords):
"""Implementation of apply_op that returns output_structure, op."""
op_info = self._ops.get(op_type_name, None)
if op_info is None:
raise RuntimeError("Unrecognized Op name " + op_type_name)
op_def = op_info.op_def
# Fill in the list of default types for all "type" attrs. This
# will be used to choose a preferred dtype to convert to in the
# absence of input type information.
#
# TODO(b/31302892): Currently the defaults don‘t work in the right
# way if you have two inputs, one of whose type resolution depends
# on the other. Handling this will require restructuring this code
# significantly.
default_type_attr_map = {}
for attr_def in op_def.attr:
if attr_def.type != "type":
continue
key = attr_def.name
if attr_def.HasField("default_value"):
default_type_attr_map[key] = dtypes.as_dtype(
attr_def.default_value.type)
# Requires that op_def has passed validation (using the C++
# ValidateOpDef() from ../framework/op_def_util.h).
attrs = {}
inputs = []
input_types = []
with g.as_default(), ops.name_scope(name) as scope:
# keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>}
...
# NOTE(mrry): We add an explicit colocation constraint between
# the newly created op and any of its reference-typed inputs.
must_colocate_inputs = [val for arg, val in zip(op_def.input_arg, inputs)
if arg.is_ref]
with _MaybeColocateWith(must_colocate_inputs):
# Add Op to graph
op = g.create_op(op_type_name, inputs, output_types, name=scope,
input_types=input_types, attrs=attr_protos,
op_def=op_def)
return output_structure, op_def.is_stateful, op
该方法比较长,最后是调用了create_op():
def create_op(
self,
op_type,
inputs,
dtypes, # pylint: disable=redefined-outer-name
input_types=None,
name=None,
attrs=None,
op_def=None,
compute_shapes=True,
compute_device=True):
"""Creates an `Operation` in this graph.
This is a low-level interface for creating an `Operation`. Most
programs will not call this method directly, and instead use the
Python op constructors, such as `tf.constant()`, which add ops to
the default graph.
Args:
op_type: The `Operation` type to create. This corresponds to the
`OpDef.name` field for the proto that defines the operation.
inputs: A list of `Tensor` objects that will be inputs to the `Operation`.
dtypes: A list of `DType` objects that will be the types of the tensors
that the operation produces.
input_types: (Optional.) A list of `DType`s that will be the types of
the tensors that the operation consumes. By default, uses the base
`DType` of each input in `inputs`. Operations that expect
reference-typed inputs must specify `input_types` explicitly.
name: (Optional.) A string name for the operation. If not specified, a
name is generated based on `op_type`.
attrs: (Optional.) A dictionary where the key is the attribute name (a
string) and the value is the respective `attr` attribute of the
`NodeDef` proto that will represent the operation (an `AttrValue`
proto).
op_def: (Optional.) The `OpDef` proto that describes the `op_type` that
the operation will have.
compute_shapes: (Optional.) Deprecated. Has no effect (shapes are always
computed).
compute_device: (Optional.) If True, device functions will be executed
to compute the device property of the Operation.
Raises:
TypeError: if any of the inputs is not a `Tensor`.
ValueError: if colocation conflicts with existing device assignment.
Returns:
An `Operation` object.
"""
del compute_shapes
self._check_not_finalized()
for idx, a in enumerate(inputs):
if not isinstance(a, Tensor):
raise TypeError("Input #%d is not a tensor: %s" % (idx, a))
if name is None:
name = op_type
# If a names ends with a ‘/‘ it is a "name scope" and we use it as-is,
# after removing the trailing ‘/‘.
if name and name[-1] == "/":
name = _name_from_scope_name(name)
else:
name = self.unique_name(name)
node_def = _NodeDef(op_type, name, device=None, attrs=attrs)
input_ops = set([t.op for t in inputs])
control_inputs = self._control_dependencies_for_inputs(input_ops)
# _create_op_helper mutates the new Operation. `_mutation_lock` ensures a
# Session.run call cannot occur between creating and mutating the op.
with self._mutation_lock():
ret = Operation(
node_def,
self,
inputs=inputs,
output_types=dtypes,
control_inputs=control_inputs,
input_types=input_types,
original_op=self._default_original_op,
op_def=op_def)
self._create_op_helper(ret, compute_device=compute_device)
return ret
又调用了_create_op_helper():
def _create_op_helper(self, op, compute_device=True):
"""Common logic for creating an op in this graph."""
# Apply any additional attributes requested. Do not overwrite any existing
# attributes.
for key, value in self._attr_scope_map.items():
try:
op.get_attr(key)
except ValueError:
if callable(value):
value = value(op.node_def)
if not isinstance(value, (type(None), attr_value_pb2.AttrValue)):
raise TypeError(
"Callable for scope map key ‘%s‘ must return either None or "
"an AttrValue protocol buffer; but it returned: %s" % (key,
value))
if value:
op._set_attr(key, value) # pylint: disable=protected-access
# Apply a kernel label if one has been specified for this op type.
try:
kernel_label = self._op_to_kernel_label_map[op.type]
op._set_attr("_kernel", # pylint: disable=protected-access
attr_value_pb2.AttrValue(s=compat.as_bytes(kernel_label)))
except KeyError:
pass
# Apply the overriding op type for gradients if one has been specified for
# this op type.
try:
mapped_op_type = self._gradient_override_map[op.type]
op._set_attr("_gradient_op_type", # pylint: disable=protected-access
attr_value_pb2.AttrValue(s=compat.as_bytes(mapped_op_type)))
except KeyError:
pass
self._record_op_seen_by_control_dependencies(op)
if compute_device:
self._apply_device_functions(op)
if self._colocation_stack:
all_colocation_groups = []
for colocation_op in self._colocation_stack:
all_colocation_groups.extend(colocation_op.colocation_groups())
if colocation_op.device:
# Make this device match the device of the colocated op, to provide
# consistency between the device and the colocation property.
if (op.device and pydev.canonical_name(op.device) !=
pydev.canonical_name(colocation_op.device)):
logging.warning("Tried to colocate %s with an op %s that had "
"a different device: %s vs %s. Postponing "
"error-checking until all devices are assigned.",
op.name, colocation_op.name, op.device,
colocation_op.device)
else:
op._set_device(colocation_op.device) # pylint: disable=protected-access
all_colocation_groups = sorted(set(all_colocation_groups))
# pylint: disable=protected-access
op._set_attr("_class", attr_value_pb2.AttrValue(
list=attr_value_pb2.AttrValue.ListValue(s=all_colocation_groups)))
# pylint: enable=protected-access
# Sets "container" attribute if
# (1) self._container is not None
# (2) "is_stateful" is set in OpDef
# (3) "container" attribute is in OpDef
# (4) "container" attribute is None
if self._container and op.op_def.is_stateful:
try:
container_attr = op.get_attr("container")
except ValueError:
# "container" attribute is not in OpDef
pass
else:
if not container_attr:
op._set_attr("container", attr_value_pb2.AttrValue( # pylint: disable=protected-access
s=compat.as_bytes(self._container)))
其中有段逻辑
if compute_device:
self._apply_device_functions(op)
compute_device = True,会接着调用_apply_device_functions(op):
def _apply_device_functions(self, op):
"""Applies the current device function stack to the given operation."""
# Apply any device functions in reverse order, so that the most recently
# pushed function has the first chance to apply a device to the op.
# We apply here because the result can depend on the Operation‘s
# signature, which is computed in the Operation constructor.
for device_function in reversed(self._device_function_stack):
if device_function is None:
break
op._set_device(device_function(op)) # pylint: disable=protected-access
这个方法里为op分配的设备。分配策略为replica_device_setter()方法设置的策略。
参考:
[1] http://jcf94.com/2018/01/13/2018-01-13-tfunpacking/ (session.run())
[2] http://jcf94.com/2018/01/23/2018-01-23-tfunpacking2/ (tf数据流模型和自动求导)
[3] http://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/ (graph和node)
[4] http://jcf94.com/2018/03/07/2018-03-07-tfunpacking4/ (device)
[5] http://jcf94.com/2018/03/09/2018-03-09-tfunpacking5/ (distributed)
[6] https://www.tensorflow.org/deploy/distributed (distributed tensorflow)
[7] https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow (asynchronous training in distributed tensorflow)
标签:方式 rail sof nes content min opd eva handle
原文地址:https://www.cnblogs.com/lixiaolun/p/9818347.html