Source code for greenlearning.utils.external_optimizer

"""TensorFlow interface for third-party optimizers.

Code below is taken from https://github.com/tensorflow/tensorflow/blob/v1.15.2/tensorflow/contrib/opt/python/training/external_optimizer.py,
because the ``tf.contrib`` module is not included in TensorFlow 2.

Another solution is using TensorFlow Probability, see the following references.
But the following solution requires setting the weights before building the network and loss,
which is not consistent with other optimizers in graph mode.
A possible solution Could be adding a TFPOptimizerInterface similar to ScipyOptimizerInterface.

- https://www.tensorflow.org/probability/api_docs/python/tfp/optimizer/lbfgs_minimize
- https://github.com/tensorflow/probability/blob/master/tensorflow_probability/python/optimizer/lbfgs_test.py
- https://stackoverflow.com/questions/58591562/how-can-we-use-lbfgs-minimize-in-tensorflow-2-0
- https://stackoverflow.com/questions/59029854/use-scipy-optimizer-with-tensorflow-2-0-for-neural-network-training
- https://pychao.com/2019/11/02/optimize-tensorflow-keras-models-with-l-bfgs-from-tensorflow-probability/
- https://gist.github.com/piyueh/712ec7d4540489aad2dcfb80f9a54993
- https://github.com/pierremtb/PINNs-TF2.0/blob/master/utils/neuralnetwork.py
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np

from .backend import tf

__all__ = ['ExternalOptimizerInterface', 'ScipyOptimizerInterface']


[docs]class ExternalOptimizerInterface(object): """Base class for interfaces with external optimization algorithms. Subclass this and implement `_minimize` in order to wrap a new optimization algorithm. `ExternalOptimizerInterface` should not be instantiated directly; instead use e.g. `ScipyOptimizerInterface`. @@__init__ @@minimize """ def __init__(self, loss, var_list=None, equalities=None, inequalities=None, var_to_bounds=None, **optimizer_kwargs): """Initialize a new interface instance. Args: loss: A scalar `Tensor` to be minimized. var_list: Optional `list` of `Variable` objects to update to minimize `loss`. Defaults to the list of variables collected in the graph under the key `GraphKeys.TRAINABLE_VARIABLES`. equalities: Optional `list` of equality constraint scalar `Tensor`s to be held equal to zero. inequalities: Optional `list` of inequality constraint scalar `Tensor`s to be held nonnegative. var_to_bounds: Optional `dict` where each key is an optimization `Variable` and each corresponding value is a length-2 tuple of `(low, high)` bounds. Although enforcing this kind of simple constraint could be accomplished with the `inequalities` arg, not all optimization algorithms support general inequality constraints, e.g. L-BFGS-B. Both `low` and `high` can either be numbers or anything convertible to a NumPy array that can be broadcast to the shape of `var` (using `np.broadcast_to`). To indicate that there is no bound, use `None` (or `+/- np.infty`). For example, if `var` is a 2x3 matrix, then any of the following corresponding `bounds` could be supplied: * `(0, np.infty)`: Each element of `var` held positive. * `(-np.infty, [1, 2])`: First column less than 1, second column less than 2. * `(-np.infty, [[1], [2], [3]])`: First row less than 1, second row less than 2, etc. * `(-np.infty, [[1, 2, 3], [4, 5, 6]])`: Entry `var[0, 0]` less than 1, `var[0, 1]` less than 2, etc. **optimizer_kwargs: Other subclass-specific keyword arguments. """ self._loss = loss self._equalities = equalities or [] self._inequalities = inequalities or [] if var_list is None: self._vars = tf.trainable_variables() else: self._vars = list(var_list) packed_bounds = None if var_to_bounds is not None: left_packed_bounds = [] right_packed_bounds = [] for var in self._vars: shape = var.get_shape().as_list() bounds = (-np.infty, np.infty) if var in var_to_bounds: bounds = var_to_bounds[var] left_packed_bounds.extend( list(np.broadcast_to(bounds[0], shape).flat)) right_packed_bounds.extend( list(np.broadcast_to(bounds[1], shape).flat)) packed_bounds = list(zip(left_packed_bounds, right_packed_bounds)) self._packed_bounds = packed_bounds self._update_placeholders = [ tf.placeholder(var.dtype) for var in self._vars ] self._var_updates = [ var.assign(tf.reshape(placeholder, _get_shape_tuple(var))) for var, placeholder in zip(self._vars, self._update_placeholders) ] loss_grads = _compute_gradients(loss, self._vars) equalities_grads = [ _compute_gradients(equality, self._vars) for equality in self._equalities ] inequalities_grads = [ _compute_gradients(inequality, self._vars) for inequality in self._inequalities ] self.optimizer_kwargs = optimizer_kwargs self._packed_var = self._pack(self._vars) self._packed_loss_grad = self._pack(loss_grads) self._packed_equality_grads = [ self._pack(equality_grads) for equality_grads in equalities_grads ] self._packed_inequality_grads = [ self._pack(inequality_grads) for inequality_grads in inequalities_grads ] dims = [_prod(_get_shape_tuple(var)) for var in self._vars] accumulated_dims = list(_accumulate(dims)) self._packing_slices = [ slice(start, end) for start, end in zip(accumulated_dims[:-1], accumulated_dims[1:]) ]
[docs] def minimize(self, session=None, feed_dict=None, fetches=None, step_callback=None, loss_callback=None, **run_kwargs): """Minimize a scalar `Tensor`. Variables subject to optimization are updated in-place at the end of optimization. Note that this method does *not* just return a minimization `Op`, unlike `Optimizer.minimize()`; instead it actually performs minimization by executing commands to control a `Session`. Args: session: A `Session` instance. feed_dict: A feed dict to be passed to calls to `session.run`. fetches: A list of `Tensor`s to fetch and supply to `loss_callback` as positional arguments. step_callback: A function to be called at each optimization step; arguments are the current values of all optimization variables flattened into a single vector. loss_callback: A function to be called every time the loss and gradients are computed, with evaluated fetches supplied as positional arguments. **run_kwargs: kwargs to pass to `session.run`. """ session = session or tf.get_default_session() feed_dict = feed_dict or {} fetches = fetches or [] loss_callback = loss_callback or (lambda *fetches: None) step_callback = step_callback or (lambda xk: None) # Construct loss function and associated gradient. loss_grad_func = self._make_eval_func([self._loss, self._packed_loss_grad], session, feed_dict, fetches, loss_callback) # Construct equality constraint functions and associated gradients. equality_funcs = self._make_eval_funcs(self._equalities, session, feed_dict, fetches) equality_grad_funcs = self._make_eval_funcs(self._packed_equality_grads, session, feed_dict, fetches) # Construct inequality constraint functions and associated gradients. inequality_funcs = self._make_eval_funcs(self._inequalities, session, feed_dict, fetches) inequality_grad_funcs = self._make_eval_funcs(self._packed_inequality_grads, session, feed_dict, fetches) # Get initial value from TF session. initial_packed_var_val = session.run(self._packed_var) # Perform minimization. packed_var_val = self._minimize( initial_val=initial_packed_var_val, loss_grad_func=loss_grad_func, equality_funcs=equality_funcs, equality_grad_funcs=equality_grad_funcs, inequality_funcs=inequality_funcs, inequality_grad_funcs=inequality_grad_funcs, packed_bounds=self._packed_bounds, step_callback=step_callback, optimizer_kwargs=self.optimizer_kwargs) var_vals = [ packed_var_val[packing_slice] for packing_slice in self._packing_slices ] # Set optimization variables to their new values. session.run( self._var_updates, feed_dict=dict(zip(self._update_placeholders, var_vals)), **run_kwargs)
def _minimize(self, initial_val, loss_grad_func, equality_funcs, equality_grad_funcs, inequality_funcs, inequality_grad_funcs, packed_bounds, step_callback, optimizer_kwargs): """Wrapper for a particular optimization algorithm implementation. It would be appropriate for a subclass implementation of this method to raise `NotImplementedError` if unsupported arguments are passed: e.g. if an algorithm does not support constraints but `len(equality_funcs) > 0`. Args: initial_val: A NumPy vector of initial values. loss_grad_func: A function accepting a NumPy packed variable vector and returning two outputs, a loss value and the gradient of that loss with respect to the packed variable vector. equality_funcs: A list of functions each of which specifies a scalar quantity that an optimizer should hold exactly zero. equality_grad_funcs: A list of gradients of equality_funcs. inequality_funcs: A list of functions each of which specifies a scalar quantity that an optimizer should hold >= 0. inequality_grad_funcs: A list of gradients of inequality_funcs. packed_bounds: A list of bounds for each index, or `None`. step_callback: A callback function to execute at each optimization step, supplied with the current value of the packed variable vector. optimizer_kwargs: Other key-value arguments available to the optimizer. Returns: The optimal variable vector as a NumPy vector. """ raise NotImplementedError( 'To use ExternalOptimizerInterface, subclass from it and implement ' 'the _minimize() method.') @classmethod def _pack(cls, tensors): """Pack a list of `Tensor`s into a single, flattened, rank-1 `Tensor`.""" if not tensors: return None elif len(tensors) == 1: return tf.reshape(tensors[0], [-1]) else: flattened = [tf.reshape(tensor, [-1]) for tensor in tensors] return tf.concat(flattened, 0) def _make_eval_func(self, tensors, session, feed_dict, fetches, callback=None): """Construct a function that evaluates a `Tensor` or list of `Tensor`s.""" if not isinstance(tensors, list): tensors = [tensors] num_tensors = len(tensors) def eval_func(x): """Function to evaluate a `Tensor`.""" augmented_feed_dict = { var: x[packing_slice].reshape(_get_shape_tuple(var)) for var, packing_slice in zip(self._vars, self._packing_slices) } augmented_feed_dict.update(feed_dict) augmented_fetches = tensors + fetches augmented_fetch_vals = session.run( augmented_fetches, feed_dict=augmented_feed_dict) if callable(callback): callback(*augmented_fetch_vals[num_tensors:]) return augmented_fetch_vals[:num_tensors] return eval_func def _make_eval_funcs(self, tensors, session, feed_dict, fetches, callback=None): return [ self._make_eval_func(tensor, session, feed_dict, fetches, callback) for tensor in tensors ]
[docs]class ScipyOptimizerInterface(ExternalOptimizerInterface): """Wrapper allowing `scipy.optimize.minimize` to operate a `tf.compat.v1.Session`. Example: .. code-block:: python vector = tf.Variable([7., 7.], 'vector') # Make vector norm as small as possible. loss = tf.reduce_sum(tf.square(vector)) optimizer = ScipyOptimizerInterface(loss, options={'maxiter': 100}) with tf.compat.v1.Session() as session: optimizer.minimize(session) # The value of vector should now be [0., 0.]. Example with simple bound constraints: .. code-block:: python vector = tf.Variable([7., 7.], 'vector') # Make vector norm as small as possible. loss = tf.reduce_sum(tf.square(vector)) optimizer = ScipyOptimizerInterface(loss, var_to_bounds={vector: ([1, 2], np.infty)}) with tf.compat.v1.Session() as session: optimizer.minimize(session) # The value of vector should now be [1., 2.]. Example with more complicated constraints: .. code-block:: python vector = tf.Variable([7., 7.], 'vector') # Make vector norm as small as possible. loss = tf.reduce_sum(tf.square(vector)) # Ensure the vector's y component is = 1. equalities = [vector[1] - 1.] # Ensure the vector's x component is >= 1. inequalities = [vector[0] - 1.] # Our default SciPy optimization algorithm, L-BFGS-B, does not support # general constraints. Thus we use SLSQP instead. optimizer = ScipyOptimizerInterface(loss, equalities=equalities, inequalities=inequalities, method='SLSQP') with tf.compat.v1.Session() as session: optimizer.minimize(session) # The value of vector should now be [1., 1.]. """ _DEFAULT_METHOD = 'L-BFGS-B' def _minimize(self, initial_val, loss_grad_func, equality_funcs, equality_grad_funcs, inequality_funcs, inequality_grad_funcs, packed_bounds, step_callback, optimizer_kwargs): def loss_grad_func_wrapper(x): # SciPy's L-BFGS-B Fortran implementation requires gradients as doubles. loss, gradient = loss_grad_func(x) return loss, gradient.astype('float64') optimizer_kwargs = dict(optimizer_kwargs.items()) method = optimizer_kwargs.pop('method', self._DEFAULT_METHOD) constraints = [] for func, grad_func in zip(equality_funcs, equality_grad_funcs): constraints.append({'type': 'eq', 'fun': func, 'jac': grad_func}) for func, grad_func in zip(inequality_funcs, inequality_grad_funcs): constraints.append({'type': 'ineq', 'fun': func, 'jac': grad_func}) minimize_args = [loss_grad_func_wrapper, initial_val] minimize_kwargs = { 'jac': True, 'callback': step_callback, 'method': method, 'constraints': constraints, 'bounds': packed_bounds, } for kwarg in minimize_kwargs: if kwarg in optimizer_kwargs: if kwarg == 'bounds': # Special handling for 'bounds' kwarg since ability to specify bounds # was added after this module was already publicly released. raise ValueError( 'Bounds must be set using the var_to_bounds argument') raise ValueError( 'Optimizer keyword arg \'{}\' is set ' 'automatically and cannot be injected manually'.format(kwarg)) minimize_kwargs.update(optimizer_kwargs) import scipy.optimize # pylint: disable=g-import-not-at-top result = scipy.optimize.minimize(*minimize_args, **minimize_kwargs) message_lines = [ 'Optimization terminated with:', ' Message: %s', ' Objective function value: %f', ] message_args = [result.message, result.fun] if hasattr(result, 'nit'): # Some optimization methods might not provide information such as nit and # nfev in the return. Logs only available information. message_lines.append(' Number of iterations: %d') message_args.append(result.nit) if hasattr(result, 'nfev'): message_lines.append(' Number of functions evaluations: %d') message_args.append(result.nfev) tf.logging.info('\n'.join(message_lines), *message_args) return result['x']
def _accumulate(list_): total = 0 yield total for x in list_: total += x yield total def _get_shape_tuple(tensor): return tuple(tensor.get_shape().as_list()) def _prod(array): prod = 1 for value in array: prod *= value return prod def _compute_gradients(tensor, var_list): grads = tf.gradients(tensor, var_list) # tf.gradients sometimes returns `None` when it should return 0. return [ grad if grad is not None else tf.zeros_like(var) for var, grad in zip(var_list, grads) ]