Source code for idaes.core.scaling.scaler_profiling

#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2023 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved.  Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################
"""
Tools for profiling scaling alternatives.
"""
import sys

from pyomo.environ import check_optimal_termination, Constraint
from pyomo.common.tempfiles import TempfileManager

from idaes.core.util.scaling import jacobian_cond
from idaes.core.scaling.autoscaling import AutoScaler
from idaes.core.scaling.custom_scaler_base import (
    CustomScalerBase,
    ConstraintScalingScheme,
)
from idaes.core.solvers import get_solver


[docs] class ScalingProfiler: """ Class for running a set of constraint scaling methods on a model and reporting the effect on model condition number and solver behavior. Users should call the profile_scaling_methods method to generate a dict of results or the report_scaling_profiles method for a stream-based output. Users are expected to provide callback functions to 1) construct an initialized model that will be used for profiling, 2) apply user-defined variable scaling (used for the imperfect information case) and 3) perturb the model from the initialized state to test how well the model solves (optional). Users may also provide a dict of scaling methods they wish to apply using the scaling_methods argument. If this is not provided, the tool will default to applying all the scaling methods defined by the AutoScaler and CustomScalerBase classes. **NOTE** methods from the AutoScaler class are applied to Pyomo Blocks, whilst those from CustomScalerBase are applied to individual ConstraintDatas. The profiling tool assumes that methods will be applied to ConstraintDatas unless the `block_based` keyword argument is set to True for the scaling method. Args: build_model: callback to use to construct initialized model for testing user_scaling: callback to use to apply user-defined scaling to initialized model perturb_states: (optional) callback to use to perturb model state for re-solve tests scaling_methods: (optional) dict of constraint scaling methods to profile. {"Name": (method, kwargs)} solver: (optional) Pyomo solver object to use for re-solve tests """ def __init__( self, build_model, user_scaling, perturb_state=None, scaling_methods: dict = None, solver=None, ): """ Sets up a framework for applying different scaling methods to a model and compiling a report of their effects on the Jacobian condition number and how easily the scaled model can be solved for a perturbed state. """ self._build_model = build_model self._user_scaling = user_scaling self._perturb_state = perturb_state self._scaling_methods = scaling_methods self._solver = solver if self._solver is None: self._solver = get_solver("ipopt_v2", writer_config={"scale_model": True}) if self._scaling_methods is None: ascaler = AutoScaler() cscaler = CustomScalerBase() self._scaling_methods = { "Vars Only": (None, {}), "Harmonic": ( cscaler.scale_constraint_by_nominal_value, {"scheme": ConstraintScalingScheme.harmonicMean}, ), "Inverse Sum": ( cscaler.scale_constraint_by_nominal_value, {"scheme": ConstraintScalingScheme.inverseSum}, ), "Inverse Root Sum Squares": ( cscaler.scale_constraint_by_nominal_value, {"scheme": ConstraintScalingScheme.inverseRSS}, ), "Inverse Maximum": ( cscaler.scale_constraint_by_nominal_value, {"scheme": ConstraintScalingScheme.inverseMaximum}, ), "Inverse Minimum": ( cscaler.scale_constraint_by_nominal_value, {"scheme": ConstraintScalingScheme.inverseMinimum}, ), "Nominal L1 Norm": ( cscaler.scale_constraint_by_nominal_derivative_norm, {"norm": 1}, ), "Nominal L2 Norm": ( cscaler.scale_constraint_by_nominal_derivative_norm, {"norm": 2}, ), "Actual L1 Norm": ( ascaler.scale_constraints_by_jacobian_norm, {"norm": 1, "block_based": True}, ), "Actual L2 Norm": ( ascaler.scale_constraints_by_jacobian_norm, {"norm": 2, "block_based": True}, ), }
[docs] def profile_scaling_methods(self): """ Generate results for all provided scaling methods. For each scaling method, calculate the Jacobian condition number and re-solve the model with both user-provided variable scaling and perfect variable scaling (scaling by inverse magnitude). A base case with no scaling applied is also run for reference. Args: None Returns: dict with results for all scaling methods """ # Generate data for unscaled model m = self._build_model() unscaled = jacobian_cond(m, scaled=False) stats = self._solved_perturbed_state(m) results = { "Unscaled": {"Manual": {"condition_number": unscaled, **stats}}, } # Run other cases for case, (meth, margs) in self._scaling_methods.items(): results[case] = self.run_case(meth, **margs) return results
[docs] def report_scaling_profiles(self, stream=None): """ Run scaling profile workflow nad report results to a stream. Args: stream: StringIO object to write result to (default=stdout) Returns: None """ results = self.profile_scaling_methods() self.write_profile_report(results, stream)
[docs] def write_profile_report(self, results: dict, stream=None): """ Write a report on the comparison of scaling methods to a stream based on existing results dict. Args: results: dict containing results from a scaler profiling run stream: StringIO object to write result to (default=stdout) Returns: None """ # If stream is None, default to stdout if stream is None: stream = sys.stdout # Get length of longest method name for padding max_str = max([len(i) for i in results.keys()]) if max_str < len("Scaling Method"): max_str = len("Scaling Method") # Length of each stats field is 22 characters, plus 4 for column dividers # Max line length is thus longest string name + 2 columns of 22+4 characters max_line = max_str + 26 * 2 # Write header rows stream.write(f"\n{'='*max_line}\n") stream.write("Scaling Profile Report\n") stream.write(f"{'-' * max_line}\n") # Pad User Scaling columns to full column width (22) stream.write( f"{'Scaling Method': <{max_str}} || {'User Scaling': <{22}} || Perfect Scaling\n" ) # Iterate over keys in results and write summary for each scaling method for k, v in results.items(): if v["Manual"]["solved"]: msolved = "Solved" else: msolved = "Failed" # Pad iterations to 3 characters - hopefully we don;t see more than 999 iterations miters = f"{v['Manual']['iterations']: <{3}}" stream.write( f"{k: <{max_str}} || {v['Manual']['condition_number']:.3E} | {msolved} {miters} " ) if "Auto" in v.keys(): if v["Auto"]["solved"]: asolved = "Solved" else: asolved = "Failed" # Pad iterations again aiters = f"{v['Auto']['iterations']: <{3}}" stream.write( f"|| {v['Auto']['condition_number']:.3E} | {asolved} {aiters}\n" ) else: # Add training column divider but no auto column stream.write("||\n") # Write footer row stream.write(f"{'=' * max_line}\n")
[docs] def run_case(self, scaling_method, **kwargs): """ Run case for a given scaling method with both perfect and imperfect scaling information. Args: scaling_method: constraint scaling method to be tested kwargs: keyword argument to be passed to scaling method Returns: dict summarising results of scaling case """ block_based = kwargs.pop("block_based", False) # Imperfect information manual = self._run_scenario( scaling_method, perfect=False, block_based=block_based, **kwargs ) # Perfect information perfect = self._run_scenario( scaling_method, perfect=True, block_based=block_based, **kwargs ) return {"Manual": manual, "Auto": perfect}
def _scale_vars(self, model, perfect=False): """ Apply variable scaling to model. """ if perfect: scaler = AutoScaler() scaler.scale_variables_by_magnitude(model) return model self._user_scaling(model) return model def _apply_scaling(self, model, scaling_method, block_based, **kwargs): """ Collect stats for a given scaling method. """ if scaling_method is not None: if block_based: scaling_method(model, **kwargs) else: for c in model.component_data_objects( ctype=Constraint, descend_into=True ): scaling_method(c, **kwargs) def _run_scenario(self, scaling_method, block_based, perfect, **kwargs): """ Run a single scenario """ m = self._build_model() self._scale_vars(m, perfect=perfect) self._apply_scaling(m, scaling_method, block_based=block_based, **kwargs) cond = jacobian_cond(m, scaled=True) stats = self._solved_perturbed_state(m) return {"condition_number": cond, **stats} def _solved_perturbed_state(self, model): """ Run re-solve tests if perturb_state callback provided. """ if self._perturb_state is None: return {} self._perturb_state(model) TempfileManager.push() tempfile = TempfileManager.create_tempfile(suffix="ipopt_out", text=True) opts = {"output_file": tempfile} try: status_obj = self._solver.solve(model, options=opts, tee=True) solved = True if not check_optimal_termination(status_obj): solved = False iters, iters_in_restoration, iters_w_regularization = ( self._parse_ipopt_output(tempfile) ) return { "solved": solved, "termination_message": status_obj.solver.termination_message, "iterations": iters, "iters_in_restoration": iters_in_restoration, "iters_w_regularization": iters_w_regularization, } except RuntimeError as err: # Likely a critical solver failure return { "solved": False, "termination_message": str(err), "iterations": -1, "iters_in_restoration": -1, "iters_w_regularization": -1, } def _parse_ipopt_output(self, ipopt_file): """ Parse an IPOPT output file and return: * number of iterations * time in IPOPT Returns ------- Returns a tuple with (solve status object, bool (solve successful or not), number of iters, solve time) """ # ToDO: Check for final iteration with regularization or restoration iters = 0 iters_in_restoration = 0 iters_w_regularization = 0 # parse the output file to get the iteration count, solver times, etc. with open(ipopt_file, "r") as f: parseline = False for line in f: if line.startswith("iter"): # This marks the start of the iteration logging, set parseline True parseline = True elif line.startswith("Number of Iterations....:"): # Marks end of iteration logging, set parseline False parseline = False tokens = line.split() iters = int(tokens[3]) elif parseline: # Line contains details of an iteration, look for restoration or regularization tokens = line.split() try: if not tokens[6] == "-": # Iteration with regularization iters_w_regularization += 1 if tokens[0].endswith("r"): # Iteration in restoration iters_in_restoration += 1 except IndexError: # Blank line at end of iteration list, so assume we hit this pass return iters, iters_in_restoration, iters_w_regularization