Source code for idaes.core.util.convergence.convergence_base

##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018-2019, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory,  National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes-pse".
##############################################################################
"""
This module provides the base classes and methods for running convergence
evaluations on IDAES models. The convergence evaluation runs a given model over
a set of sample points to ensure reliable convergence over the parameter space.

The module requires the user to provide:
  - a set of inputs along with their lower bound, upper bound, mean,
and standard deviation.
  - an initialized Pyomo model
  - a Pyomo solver with appropriate options

The module executes convergence evaluation in two steps. In the first step, a
json file is created that containsa set of points sampled from the provided
inputs. This step only needs to be done once - up front. The second step, which
should be executed any time there is a major code change that could impact the
model, takes that set of sampled points and solves the model at each of the
points, collecting convergence statistics (success/failure, iterations, and
solution time).

This can be used as a tool to evaluate model convergence reliability over the
defined input space, or to verify that convergence performance is not
decreasing with framework and/or model changes.

In order to write a convergence evaluation for your model, you must inherit a
class from ConvergenceEvaluation, and implement three methods:

- get_specification: This method should create and return a
    ConvergenceEvaluationSpecification object. There are methods on
    ConvergenceEvaluationSpecification to add inputs. These inputs contain a
    string that identifies a Pyomo Param or Var object, the lower and upper
    bounds, and the mean and standard deviation to be used for sampling. When
    samples are generated, they are drawn from a normal distribution, and then
    truncated by the lower or upper bounds.
- get_initialized_model: This method should create and return a Pyomo model
    object that is already initialized and ready to be solved. This model will
    be modified according to the sampled inputs, and then it will be solved.
- get_solver: This method should return an instance of the Pyomo solver that
    will be used for the analysis.

There are methods to create the sample points file (on
ConvergenceEvaluationSpecification), to run a convergence evaluation
(run_convergence_evaluation), and print the results in table form
(print_convergence_statistics).

However, this package can also be executed using the command-line interface.
See the documentation in convergence.py for more information.
"""
# stdlib
from collections import OrderedDict
import getpass
import importlib as il
import json
import logging
import numpy as np
import sys
from io import StringIO
# pyomo
import pyutilib.services
from pyutilib.misc import capture_output
from pyomo.core import Param, Var
from pyomo.opt import TerminationCondition
from pyomo.common.log import LoggingIntercept
# idaes
import idaes.core.util.convergence.mpi_utils as mpiu
from idaes.dmf import resource


class ConvergenceEvaluationSpecification(object):
    def __init__(self):
        self._inputs = OrderedDict()

    def add_sampled_input(self, name, pyomo_path, lower, upper, mean, std):
        """
        Add an input that should be sampled when forming the set of
        specific points that need to be run for the convergence evaluation

        The input will be sampled assuming a normal distribution
        (with given mean and standard devation)
        truncated to the values given by lower and upper bounds

        Parameters
        ----------
        name : str
           The name of the input.
        pyomo_path : str
           A string representation of the path to the variable or parameter to
           be sampled. This string will be executed to retrieve the Pyomo
           component.
        lower : float
           A lower bound on the input variable or parameter.
        upper : float
           An upper bound on the input variable or parameter
        mean : float
           The mean value to use when generating samples
        std : float
           The standard deviation to use when generating samples

        Returns
        -------
           N/A
        """
        # ToDo: put some error checking here ... Maybe we should have the model
        # ToDo: already? Can use to check if the pyomo_path is valid? check if
        # ToDo: bounds will be violated?
        spec = OrderedDict()
        spec['pyomo_path'] = pyomo_path
        spec['lower'] = lower
        spec['upper'] = upper
        spec['mean'] = mean
        spec['std'] = std
        self._inputs[name] = spec

    @property
    def inputs(self):
        return self._inputs


class ConvergenceEvaluation(object):
    def __init__(self):
        self._sampling_specifications = list()

    def get_specification(self):
        """
        User should override this method to return an instance of the
        ConvergenceEvaluationSpecification for this particular model and test
        set.

        The basic flow for this method is:
           - Create a ConvergenceEvaluationSpecification
           - Call add_sampled_input for every input that should be varied.
           - return the ConvergenceEvaluationSpecification

        Returns
        -------
           ConvergenceEvaluationSpecification
        """
        raise NotImplementedError('Not implemented in the base class. This'
                                  ' should be overridden in the derived class')

    def get_initialized_model(self):
        """
        User should override this method to return an initialized model that is
        ready to solve. The convergence evaluation methods will change the
        values of parameters or variables according to the sampling
        specifications.

        Returns
        -------
           Pyomo model : return a Pyomo model object that is initialized and
                        ready to solve. This is the model object that will be
                        used in the evaluation.
        """
        raise NotImplementedError('Not implemented in the base class. This'
                                  ' should be overridden in the derived class')

    def get_solver(self):
        """
        User should create and return the solver that will be used for the
        convergence evaluation (including any necessary options)

        Returns
        -------
           Pyomo solver

        """
        # ToDo: We may want this to be standardized across all the models
        # within IDAES (should not need different solver options for different
        # unit models)
        raise NotImplementedError('Not implemented in the base class. This'
                                  ' should be overridden in the derived class')


def _class_import(class_path):
    # Note, this method assumes that everything in front
    # of the last dot is a module, followed by one class
    # I don't think this will work for nested classes
    tokens = class_path.split('.')
    modpath = '.'.join(tokens[0:-1])
    mod = il.import_module(modpath)
    ret_class = getattr(mod, tokens[-1])
    return ret_class


def _run_ipopt_with_stats(model, solver, max_iter=500, max_cpu_time=120):
    """
    Run the solver (must be ipopt) and return the convergence statistics

    Parameters
    ----------
    model : Pyomo model
       The pyomo model to be solved

    solver : Pyomo solver
       The pyomo solver to use - it must be ipopt, but with whichever options
       are preferred

    max_iter : int
       The maximum number of iterations to allow for ipopt

    max_cpu_time : int
       The maximum cpu time to allow for ipopt (in seconds)

    Returns
    -------
       Returns a tuple with (solve status object, bool (solve successful or
       not), number of iters, solve time)
    """
    # ToDo: Check that the "solver" is, in fact, IPOPT

    pyutilib.services.TempfileManager.push()
    tempfile = pyutilib.services.TempfileManager.create_tempfile(
                        suffix='ipopt_out',
                        text=True)
    opts = {'output_file': tempfile,
            'max_iter': max_iter,
            'max_cpu_time': max_cpu_time}

    status_obj = solver.solve(model, options=opts, tee=True)
    solved = True
    if status_obj.solver.termination_condition != TerminationCondition.optimal:
        solved = False

    iters = 0
    time = 0
    # parse the output file to get the iteration count, solver times, etc.
    with open(tempfile, 'r') as f:
        for line in f:
            if line.startswith('Number of Iterations....:'):
                tokens = line.split()
                iters = int(tokens[3])
            elif line.startswith(
                    'Total CPU secs in IPOPT (w/o function evaluations)   ='):
                tokens = line.split()
                time += float(tokens[9])
            elif line.startswith(
                    'Total CPU secs in NLP function evaluations           ='):
                tokens = line.split()
                time += float(tokens[8])

    pyutilib.services.TempfileManager.pop(remove=True)
    return status_obj, solved, iters, time


def _progress_bar(fraction, msg, length=20):
    length = length - 2
    n_complete = int(length*fraction)
    n_remaining = length - n_complete
    characters = ['*']*n_complete         # ['*' for i in range(n_complete)]
    characters.extend(['-']*n_remaining)  # ['-' for i in range(n_remaining)])
    sys.stdout.write('%5.1f%s [%s] %s\n' % (
                            fraction*100.0, '%', ''.join(characters), msg))
    sys.stdout.flush()


def _set_model_parameters_from_sample(model, inputs, sample_point):
    """
    This method takes the parameter values in sample_points and sets them on
    the model to prepare it for the new solve

    Parameters
    ----------
    model : Pyomo model
       The pyomo model that will be modified with values from the new
       sample_points

    inputs : dict
       The inputs dictionary from the sample-file

    sample_point : dict
       The dictionary of var and/or parameter values for the sample point

    Returns
    -------
       N/A
    """
    for k, v in sample_point.items():
        if k == '_name':
            # because parallel task manager does not handle dictionaries,
            # we add an _name entry to the sample point dictionary
            # ToDo: remove this when parallel task manager is fixed
            continue
        # k stores the "name" of the input
        # need to get the pyomo path of the input from the inputs structure
        pyomo_path = inputs[k]['pyomo_path']

        comp = model.find_component(pyomo_path)
        try:
            ctype = comp.type()
        except AttributeError:
            ctype = None

        if ctype is Param:
            if comp.is_constant():
                raise ValueError(
                        'Convergence testing found an input of type Param that'
                        'was not mutable. Please make sure all sampled inputs'
                        ' are either mutable params or fixed vars.')
            comp.set_value(v)
            # print('Just set', comp,'to', float(v))

        elif ctype is Var:
            if not comp.is_fixed():
                raise ValueError(
                        'Convergence testing found an input of type Var that'
                        'was not fixed. Please make sure all sampled inputs'
                        ' are either mutable params or fixed vars.')
            comp.set_value(float(v))
            # print('Just set', comp,'to', float(v))
        else:
            raise ValueError('Failed to find a valid input component (must be'
                             ' a fixed Var or a mutable Param). Instead,'
                             ' pyomo_path: {} returned: {}'
                             .format(pyomo_path, comp))


[docs]def write_sample_file(eval_spec, filename, convergence_evaluation_class_str, n_points, seed=None): """ Samples the space of the inputs defined in the eval_spec, and creates a json file with all the points to be used in executing a convergence evaluation Parameters ---------- filename : str The filename for the json file that will be created containing all the points to be run eval_spec : ConvergenceEvaluationSpecification The convergence evaluation specification object that we would like to sample convergence_evaluation_class_str : str Python string that identifies the convergence evaluation class for this specific evaluation. This is usually in the form of module.class_name. n_points : int The total number of points that should be created seed : int or None The seed to be used when generating samples. If set to None, then the seed is not set Returns ------- N/A """ if seed is not None: np.random.seed(seed) # build the samples samples = OrderedDict() for i in range(n_points): sample = samples['Sample-{}'.format(i+1)] = OrderedDict() for k, v in eval_spec.inputs.items(): s = np.random.normal(loc=v['mean'], scale=v['std']) if s > v['upper']: s = v['upper'] elif s < v['lower']: s = v['lower'] sample[k] = s # create the dictionary storing all the necessary information jsondict = OrderedDict() jsondict['inputs'] = OrderedDict(eval_spec.inputs) jsondict['n_points'] = n_points jsondict['seed'] = seed jsondict['samples'] = samples jsondict['convergence_evaluation_class_str'] = \ convergence_evaluation_class_str with open(filename, 'w') as fd: json.dump(jsondict, fd, indent=3)
def run_convergence_evaluation_from_sample_file(sample_file): # load the sample file try: with open(sample_file, 'r') as fd: jsondict = json.load(fd, object_pairs_hook=OrderedDict) except Exception as e: print('Error reading json file: {}'.format(str(e))) raise ValueError('Problem reading json file: {}'.format(sample_file)) # create the convergence evaluation object convergence_evaluation_class_str = jsondict[ 'convergence_evaluation_class_str'] try: convergence_evaluation_class = _class_import( convergence_evaluation_class_str) conv_eval = convergence_evaluation_class() except Exception as e: print('Error creating the class: {}. Error returned: {}'.format( convergence_evaluation_class_str, str(e))) raise ValueError( 'Invalid value specified for convergence_evaluation_class_str:' '{} in sample file: {}'.format( convergence_evaluation_class_str, sample_file)) return run_convergence_evaluation(jsondict, conv_eval)
[docs]def run_convergence_evaluation(sample_file_dict, conv_eval): """ Run convergence evaluation and generate the statistics based on information in the sample_file. Parameters ---------- sample_file_dict : dict Dictionary created by ConvergenceEvaluationSpecification that contains the input and sample point information conv_eval : ConvergenceEvaluation The ConvergenceEvaluation object that should be used Returns ------- N/A """ inputs = sample_file_dict['inputs'] samples = sample_file_dict['samples'] # current parallel task manager code does not work with dictionaries, so # convert samples to a list # ToDo: fix and test parallel task manager with dictionaries and change # this samples_list = list() for k, v in samples.items(): v['_name'] = k samples_list.append(v) n_samples = len(samples_list) task_mgr = mpiu.ParallelTaskManager(n_samples) local_samples_list = task_mgr.global_to_local_data(samples_list) results = list() for (si, ss) in enumerate(local_samples_list): sample_name = ss['_name'] # print progress on the rank-0 process if task_mgr.is_root(): _progress_bar(float(si) / float(len(local_samples_list)), 'Root Process: {}'.format(sample_name)) # capture the output # ToDo: make this an option and turn off for single sample execution output_buffer = StringIO() with LoggingIntercept(output_buffer, 'idaes', logging.ERROR): with capture_output(): # as str_out: model = conv_eval.get_initialized_model() _set_model_parameters_from_sample(model, inputs, ss) solver = conv_eval.get_solver() (status_obj, solved, iters, time) = \ _run_ipopt_with_stats(model, solver) # run without output capture # model = conv_eval.get_initialized_model() # _set_model_parameters_from_sample(model, inputs, ss) # solver = conv_eval.get_solver() # (status_obj, solved, iters, time) = \ # _run_ipopt_with_stats(model, solver) if not solved: print('Sample: {} failed to converge.'.format(sample_name)) results_dict = OrderedDict() results_dict['name'] = sample_name results_dict['sample_point'] = ss results_dict['solved'] = solved results_dict['iters'] = iters results_dict['time'] = time results.append(results_dict) global_results = task_mgr.gather_global_data(results) return inputs, samples, global_results
def save_convergence_statistics(inputs, results, dmf=None): s = Stats(results) if dmf is None: print_convergence_statistics(inputs, results, s) else: save_results_to_dmf(dmf, inputs, results, s) class Stats(object): def __init__(self, results): self.notable_cases, self.failed_cases = [], [] self.iters_successful = list() self.time_successful = list() # loop through and gather some data for r in results: if r['solved'] is True: self.iters_successful.append(r['iters']) self.time_successful.append(r['time']) else: self.failed_cases.append(r) # data for summary table self.iters_min = 0 self.iters_mean = 0 self.iters_std = 0 self.iters_max = 0 if len(self.iters_successful) > 0: self.iters_min = int(np.min(self.iters_successful)) self.iters_mean = int(np.mean(self.iters_successful)) self.iters_std = int(np.std(self.iters_successful)) self.iters_max = int(np.max(self.iters_successful)) self.time_min = 0 self.time_mean = 0 self.time_std = 0 self.time_max = 0 if len(self.time_successful) > 0: self.time_min = float(np.min(self.time_successful)) self.time_mean = float(np.mean(self.time_successful)) self.time_std = float(np.std(self.time_successful)) self.time_max = float(np.max(self.time_successful))
[docs]def save_results_to_dmf(dmf, inputs, results, stats): """Save results of run, along with stats, to DMF. Args: dmf (DMF): Data management framework object inputs (dict): Run inputs results (dict): Run results stats (Stats): Calculated result statistics Returns: None """ d = {} # data x = {} for k, v in inputs.items(): values = [results[i]['sample_point'][k] for i in range(len(results))] x[k] = { 'min': float(np.min(values)), 'mean': float(np.mean(values)), 'stdev': float(np.std(values)), 'max': float(np.max(values)) } d['scenarios'] = x d['summary'] = { 'cases_success': len(results) - len(stats.failed_cases), 'cases_total': len(results), 'iters': { 'min': stats.iters_min, '-1std': stats.iters_mean - stats.iters_std, 'mean': stats.iters_mean, '+1std': stats.iters_mean + stats.iters_std, 'max': stats.iters_max }, 'time': { 'min': stats.time_min, '-1std': stats.time_mean - stats.time_std, 'mean': stats.time_mean, '+1std': stats.time_mean + stats.time_std, 'max': stats.time_max } } tbl = [] for r in results: notable = False solved = r['solved'] item = { 'solved': solved, 'name': r['name'], 'iters': r['iters'], 'time': r['time'] } for vtype in 'time', 'iters': mean, std = [getattr(stats, a) for a in ('{}_mean'.format(vtype), '{}_std'.format(vtype))] if r[vtype] > mean + 2. * std and r[vtype] > mean + 5: item['outlier_{}'.format(vtype)] = True notable = True if not solved: stats.failed_cases.append(r) notable = True else: stats.iters_successful.append(r['iters']) stats.time_successful.append(r['time']) tbl.append(item) if notable: stats.notable_cases.append(r) d['results_table'] = tbl # notable cases cases = [] for r in stats.notable_cases: msg, is_outlier = [], False if not r['solved']: msg.append('failed solve') else: is_outlier = True if r['outlier_time']: msg.append('long runtime') if r['outlier_iters']: msg.append('high iteration count') case = {'messages': msg} if is_outlier: outliers = {} for k, v in r['sample_point'].items(): if k == '_name': # we need this because the parallel task manager does not # handle dicts and we add _name to identify the sample name # TODO: Fix this when the parallel task manager is extended # TODO: to handle dicts continue mean = inputs[k]['mean'] std = inputs[k]['std'] alpha = (v - mean) / std outliers[k] = (v, alpha) case['outliers'] = outliers else: case['outliers'] = [] cases.append(case) d['notable_cases'] = cases # Build and save resource object rsrc = resource.Resource(value={ 'name': 'convergence_results', 'desc': 'statistics returned from run_convergence_evaluation', 'creator': {'name': getpass.getuser()}, 'data': d}, type_=resource.TY_DATA) dmf.add(rsrc)