# -*- coding: utf-8 -*-
#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2024 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################
"""
This module contains a collection of tools for diagnosing modeling issues.
"""
__author__ = "Alexander Dowling, Douglas Allan, Andrew Lee, Robby Parker, Ben Knueven"
from operator import itemgetter
import sys
from inspect import signature
from math import log, log10, isclose, inf, isfinite
import json
from typing import List
import logging
from itertools import combinations, chain
import numpy as np
from scipy.linalg import svd
from scipy.sparse.linalg import svds, norm
from scipy.sparse import issparse, find
from pyomo.environ import (
Binary,
Integers,
Block,
check_optimal_termination,
ComponentMap,
ConcreteModel,
Constraint,
Expression,
Objective,
Param,
RangeSet,
Set,
SolverFactory,
value,
Var,
)
from pyomo.core.expr.numeric_expr import (
DivisionExpression,
NPV_DivisionExpression,
PowExpression,
NPV_PowExpression,
UnaryFunctionExpression,
NPV_UnaryFunctionExpression,
NumericExpression,
)
from pyomo.core.base.block import BlockData
from pyomo.core.base.var import VarData
from pyomo.core.base.constraint import ConstraintData
from pyomo.core.base.expression import ExpressionData
from pyomo.repn.standard_repn import ( # pylint: disable=no-name-in-module
generate_standard_repn,
)
from pyomo.common.collections import ComponentSet
from pyomo.common.config import (
ConfigDict,
ConfigValue,
document_kwargs_from_configdict,
PositiveInt,
NonNegativeFloat,
NonNegativeInt,
)
from pyomo.util.check_units import identify_inconsistent_units
from pyomo.contrib.incidence_analysis import IncidenceGraphInterface
from pyomo.core.expr.visitor import identify_variables, StreamBasedExpressionVisitor
from pyomo.contrib.pynumero.interfaces.pyomo_nlp import PyomoNLP
from pyomo.contrib.pynumero.asl import AmplInterface
from pyomo.contrib.fbbt.fbbt import compute_bounds_on_expr
from pyomo.contrib.iis import mis
from pyomo.common.deprecation import deprecation_warning
from pyomo.common.errors import PyomoException
from pyomo.common.tempfiles import TempfileManager
from pyomo.core import expr as EXPR
from pyomo.common.numeric_types import native_types
from pyomo.core.base.units_container import _PyomoUnit
from idaes.core.solvers.get_solver import get_solver
from idaes.core.util.misc import compact_expression_to_string
from idaes.core.util.model_statistics import (
activated_blocks_set,
deactivated_blocks_set,
activated_equalities_set,
deactivated_equalities_set,
activated_inequalities_set,
deactivated_inequalities_set,
activated_objectives_set,
deactivated_objectives_set,
variables_in_activated_constraints_set,
variables_not_in_activated_constraints_set,
degrees_of_freedom,
large_residuals_set,
variables_near_bounds_set,
)
from idaes.core.util.scaling import (
get_jacobian,
extreme_jacobian_columns,
extreme_jacobian_rows,
extreme_jacobian_entries,
jacobian_cond,
)
from idaes.core.util.parameter_sweep import (
SequentialSweepRunner,
ParameterSweepBase,
is_psweepspec,
)
import idaes.logger as idaeslog
_log = idaeslog.getLogger(__name__)
MAX_STR_LENGTH = 84
TAB = " " * 4
# Constants for Degeneracy Hunter
YTOL = 0.9
MMULT = 0.99
# TODO: Add suggested steps to cautions - how?
[docs]
def svd_callback_validator(val):
"""Domain validator for SVD callbacks
Args:
val : value to be checked
Returns:
TypeError if val is not a valid callback
"""
if callable(val):
sig = signature(val)
if len(sig.parameters) >= 2:
return val
_log.error(
f"SVD callback {val} must be a callable which takes at least two arguments."
)
raise ValueError(
"SVD callback must be a callable which takes at least two arguments."
)
[docs]
def svd_dense(jacobian, number_singular_values):
"""
Callback for performing SVD analysis using scipy.linalg.svd
Args:
jacobian: Jacobian to be analysed
number_singular_values: number of singular values to compute
Returns:
u, s and v numpy arrays
"""
u, s, vT = svd(jacobian.todense(), full_matrices=False)
# Reorder singular values and vectors so that the singular
# values are from least to greatest
u = np.flip(u[:, -number_singular_values:], axis=1)
s = np.flip(s[-number_singular_values:], axis=0)
vT = np.flip(vT[-number_singular_values:, :], axis=0)
return u, s, vT.transpose()
[docs]
def svd_sparse(jacobian, number_singular_values):
"""
Callback for performing SVD analysis using scipy.sparse.linalg.svds
Args:
jacobian: Jacobian to be analysed
number_singular_values: number of singular values to compute
Returns:
u, s and v numpy arrays
"""
u, s, vT = svds(jacobian, k=number_singular_values, which="SM")
return u, s, vT.transpose()
CONFIG = ConfigDict()
CONFIG.declare(
"variable_bounds_absolute_tolerance",
ConfigValue(
default=1e-4,
domain=NonNegativeFloat,
description="Absolute tolerance for considering a variable to be close "
"to its bounds.",
),
)
CONFIG.declare(
"variable_bounds_relative_tolerance",
ConfigValue(
default=1e-4,
domain=NonNegativeFloat,
description="Relative tolerance for considering a variable to be close "
"to its bounds.",
),
)
CONFIG.declare(
"variable_bounds_violation_tolerance",
ConfigValue(
default=0,
domain=NonNegativeFloat,
description="Absolute tolerance for considering a variable to violate its bounds.",
doc="Absolute tolerance for considering a variable to violate its bounds. "
"Some solvers relax bounds on variables thus allowing a small violation to be "
"considered acceptable.",
),
)
CONFIG.declare(
"constraint_residual_tolerance",
ConfigValue(
default=1e-5,
domain=NonNegativeFloat,
description="Absolute tolerance to use when checking constraint residuals.",
),
)
CONFIG.declare(
"constraint_term_mismatch_tolerance",
ConfigValue(
default=1e6,
domain=NonNegativeFloat,
description="Magnitude difference to use when checking for mismatched additive terms in constraints.",
),
)
CONFIG.declare(
"constraint_term_cancellation_tolerance",
ConfigValue(
default=1e-4,
domain=NonNegativeFloat,
description="Absolute tolerance to use when checking for canceling additive terms in constraints.",
),
)
CONFIG.declare(
"max_canceling_terms",
ConfigValue(
default=5,
domain=NonNegativeInt,
description="Maximum number of terms to consider when looking for canceling combinations in expressions.",
),
)
CONFIG.declare(
"constraint_term_zero_tolerance",
ConfigValue(
default=1e-10,
domain=NonNegativeFloat,
description="Absolute tolerance to use when determining if a constraint term is equal to zero.",
),
)
CONFIG.declare(
"variable_large_value_tolerance",
ConfigValue(
default=1e4,
domain=NonNegativeFloat,
description="Absolute tolerance for considering a value to be large.",
),
)
CONFIG.declare(
"variable_small_value_tolerance",
ConfigValue(
default=1e-4,
domain=NonNegativeFloat,
description="Absolute tolerance for considering a value to be small.",
),
)
CONFIG.declare(
"variable_zero_value_tolerance",
ConfigValue(
default=1e-8,
domain=NonNegativeFloat,
description="Absolute tolerance for considering a value to be near to zero.",
),
)
CONFIG.declare(
"jacobian_large_value_caution",
ConfigValue(
default=1e4,
domain=NonNegativeFloat,
description="Tolerance for raising a caution for large Jacobian values.",
),
)
CONFIG.declare(
"jacobian_large_value_warning",
ConfigValue(
default=1e8,
domain=NonNegativeFloat,
description="Tolerance for raising a warning for large Jacobian values.",
),
)
CONFIG.declare(
"jacobian_small_value_caution",
ConfigValue(
default=1e-4,
domain=NonNegativeFloat,
description="Tolerance for raising a caution for small Jacobian values.",
),
)
CONFIG.declare(
"jacobian_small_value_warning",
ConfigValue(
default=1e-8,
domain=NonNegativeFloat,
description="Tolerance for raising a warning for small Jacobian values.",
),
)
CONFIG.declare(
"warn_for_evaluation_error_at_bounds",
ConfigValue(
default=True,
domain=bool,
description="If False, warnings will not be generated for things like log(x) with x >= 0",
),
)
CONFIG.declare(
"parallel_component_tolerance",
ConfigValue(
default=1e-8,
domain=NonNegativeFloat,
description="Tolerance for identifying near-parallel Jacobian rows/columns",
),
)
CONFIG.declare(
"absolute_feasibility_tolerance",
ConfigValue(
default=1e-6,
domain=NonNegativeFloat,
description="Feasibility tolerance for identifying infeasible constraints and bounds",
),
)
SVDCONFIG = ConfigDict()
SVDCONFIG.declare(
"number_of_smallest_singular_values",
ConfigValue(
domain=PositiveInt,
description="Number of smallest singular values to compute",
),
)
SVDCONFIG.declare(
"svd_callback",
ConfigValue(
default=svd_dense,
domain=svd_callback_validator,
description="Callback to SVD method of choice (default = svd_dense)",
doc="Callback to SVD method of choice (default = svd_dense). "
"Callbacks should take the Jacobian and number of singular values "
"to compute as options, plus any method specific arguments, and should "
"return the u, s and v matrices as numpy arrays.",
),
)
SVDCONFIG.declare(
"svd_callback_arguments",
ConfigValue(
default=None,
domain=dict,
description="Optional arguments to pass to SVD callback (default = None)",
),
)
SVDCONFIG.declare(
"singular_value_tolerance",
ConfigValue(
default=1e-6,
domain=NonNegativeFloat,
description="Tolerance for defining a small singular value",
),
)
SVDCONFIG.declare(
"size_cutoff_in_singular_vector",
ConfigValue(
default=0.1,
domain=NonNegativeFloat,
description="Size below which to ignore constraints and variables in "
"the singular vector",
),
)
DHCONFIG = ConfigDict()
DHCONFIG.declare(
"solver",
ConfigValue(
default="scip",
domain=str,
description="MILP solver to use for finding irreducible degenerate sets.",
),
)
DHCONFIG.declare(
"solver_options",
ConfigValue(
domain=None,
description="Options to pass to MILP solver.",
),
)
DHCONFIG.declare(
"M", # TODO: Need better name
ConfigValue(
default=1e5,
domain=NonNegativeFloat,
description="Maximum value for nu in MILP models.",
),
)
DHCONFIG.declare(
"m_small", # TODO: Need better name
ConfigValue(
default=1e-5,
domain=NonNegativeFloat,
description="Smallest value for nu to be considered non-zero in MILP models.",
),
)
DHCONFIG.declare(
"trivial_constraint_tolerance",
ConfigValue(
default=1e-6,
domain=NonNegativeFloat,
description="Tolerance for identifying non-zero rows in Jacobian.",
),
)
def _get_bounds_with_inf(node: NumericExpression):
lb, ub = compute_bounds_on_expr(node)
if lb is None:
lb = -inf
if ub is None:
ub = inf
return lb, ub
def _check_eval_error_division(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node.args[1])
if (config.warn_for_evaluation_error_at_bounds and (lb <= 0 <= ub)) or (
lb < 0 < ub
):
msg = f"Potential division by 0 in {node}; Denominator bounds are ({lb}, {ub})"
warn_list.append(msg)
def _check_eval_error_pow(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
arg1, arg2 = node.args
lb1, ub1 = _get_bounds_with_inf(arg1)
lb2, ub2 = _get_bounds_with_inf(arg2)
integer_domains = ComponentSet([Binary, Integers])
integer_exponent = False
# if the exponent is an integer, there should not be any evaluation errors
if isinstance(arg2, VarData) and arg2.domain in integer_domains:
# The exponent is an integer variable
# check if the base can be zero
integer_exponent = True
if lb2 == ub2 and lb2 == round(lb2):
# The exponent is fixed to an integer
integer_exponent = True
repn = generate_standard_repn(arg2, quadratic=True)
if (
repn.nonlinear_expr is None
and repn.constant == round(repn.constant)
and all(i.domain in integer_domains for i in repn.linear_vars)
and all(i[0].domain in integer_domains for i in repn.quadratic_vars)
and all(i[1].domain in integer_domains for i in repn.quadratic_vars)
and all(i == round(i) for i in repn.linear_coefs)
and all(i == round(i) for i in repn.quadratic_coefs)
):
# The exponent is a linear or quadratic expression containing
# only integer variables with integer coefficients
integer_exponent = True
if integer_exponent and (
(lb1 > 0 or ub1 < 0)
or (not config.warn_for_evaluation_error_at_bounds and (lb1 >= 0 or ub1 <= 0))
):
# life is good; the exponent is an integer and the base is nonzero
return None
elif integer_exponent and lb2 >= 0:
# life is good; the exponent is a nonnegative integer
return None
# if the base is positive, there should not be any evaluation errors
if lb1 > 0 or (not config.warn_for_evaluation_error_at_bounds and lb1 >= 0):
return None
if lb1 >= 0 and lb2 >= 0:
return None
msg = f"Potential evaluation error in {node}; "
msg += f"base bounds are ({lb1}, {ub1}); "
msg += f"exponent bounds are ({lb2}, {ub2})"
warn_list.append(msg)
def _check_eval_error_log(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node.args[0])
if (config.warn_for_evaluation_error_at_bounds and lb <= 0) or lb < 0:
msg = f"Potential log of a non-positive number in {node}; Argument bounds are ({lb}, {ub})"
warn_list.append(msg)
def _check_eval_error_tan(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node)
if not (isfinite(lb) and isfinite(ub)):
msg = f"{node} may evaluate to -inf or inf; Argument bounds are {_get_bounds_with_inf(node.args[0])}"
warn_list.append(msg)
def _check_eval_error_asin(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node.args[0])
if lb < -1 or ub > 1:
msg = f"Potential evaluation of asin outside [-1, 1] in {node}; Argument bounds are ({lb}, {ub})"
warn_list.append(msg)
def _check_eval_error_acos(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node.args[0])
if lb < -1 or ub > 1:
msg = f"Potential evaluation of acos outside [-1, 1] in {node}; Argument bounds are ({lb}, {ub})"
warn_list.append(msg)
def _check_eval_error_sqrt(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
lb, ub = _get_bounds_with_inf(node.args[0])
if lb < 0:
msg = f"Potential square root of a negative number in {node}; Argument bounds are ({lb}, {ub})"
warn_list.append(msg)
_unary_eval_err_handler = dict()
_unary_eval_err_handler["log"] = _check_eval_error_log
_unary_eval_err_handler["log10"] = _check_eval_error_log
_unary_eval_err_handler["tan"] = _check_eval_error_tan
_unary_eval_err_handler["asin"] = _check_eval_error_asin
_unary_eval_err_handler["acos"] = _check_eval_error_acos
_unary_eval_err_handler["sqrt"] = _check_eval_error_sqrt
def _check_eval_error_unary(
node: NumericExpression, warn_list: List[str], config: ConfigDict
):
if node.getname() in _unary_eval_err_handler:
_unary_eval_err_handler[node.getname()](node, warn_list, config)
_eval_err_handler = dict()
_eval_err_handler[DivisionExpression] = _check_eval_error_division
_eval_err_handler[NPV_DivisionExpression] = _check_eval_error_division
_eval_err_handler[PowExpression] = _check_eval_error_pow
_eval_err_handler[NPV_PowExpression] = _check_eval_error_pow
_eval_err_handler[UnaryFunctionExpression] = _check_eval_error_unary
_eval_err_handler[NPV_UnaryFunctionExpression] = _check_eval_error_unary
class _EvalErrorWalker(StreamBasedExpressionVisitor):
def __init__(self, config: ConfigDict):
super().__init__()
self._warn_list = list()
self._config = config
def exitNode(self, node, data):
"""
callback to be called as the visitor moves from the leaf
nodes back to the root node.
Args:
node: a pyomo expression node
data: not used in this walker
"""
if type(node) in _eval_err_handler:
_eval_err_handler[type(node)](node, self._warn_list, self._config)
return self._warn_list
# TODO: Rename and redirect once old DegeneracyHunter is removed
[docs]
@document_kwargs_from_configdict(DHCONFIG)
class DegeneracyHunter2:
"""
Degeneracy Hunter is a tool for identifying Irreducible Degenerate Sets (IDS) in
Pyomo models.
Original implementation by Alex Dowling.
Args:
model: model to be diagnosed. The DegeneracyHunter does not support indexed Blocks.
"""
def __init__(self, model, **kwargs):
# TODO: In future may want to generalise this to accept indexed blocks
# However, for now some of the tools do not support indexed blocks
if not isinstance(model, BlockData):
raise TypeError(
"model argument must be an instance of a Pyomo BlockData object "
"(either a scalar Block or an element of an indexed Block)."
)
self._model = model
self.config = DHCONFIG(kwargs)
# Get Jacobian and NLP
self.jacobian, self.nlp = get_jacobian(
self._model, scaled=False, equality_constraints_only=True
)
# Placeholder for solver - deferring construction lets us unit test more easily
self.solver = None
# Create placeholders for results
self.degenerate_set = {}
self.irreducible_degenerate_sets = []
def _get_solver(self):
if self.solver is None:
self.solver = SolverFactory(self.config.solver)
options = self.config.solver_options
if options is None:
options = {}
self.solver.options = options
return self.solver
def _prepare_candidates_milp(self):
"""
Prepare MILP to find candidate equations for consider for IDS
Args:
None
Returns:
m_fc: Pyomo model to find candidates
"""
_log.info("Building MILP model.")
# Create Pyomo model for irreducible degenerate set
m_dh = ConcreteModel()
# Create index for constraints
m_dh.C = Set(initialize=range(self.jacobian.shape[0]))
m_dh.V = Set(initialize=range(self.jacobian.shape[1]))
# Specify minimum size for nu to be considered non-zero
M = self.config.M
m_small = self.config.m_small
# Add variables
m_dh.nu = Var(
m_dh.C,
bounds=(-M - m_small, M + m_small),
initialize=1.0,
)
m_dh.y_pos = Var(m_dh.C, domain=Binary)
m_dh.y_neg = Var(m_dh.C, domain=Binary)
m_dh.abs_nu = Var(m_dh.C, bounds=(0, M + m_small))
m_dh.pos_xor_neg = Constraint(m_dh.C)
# Constraint to enforce set is degenerate
if issparse(self.jacobian):
J = self.jacobian.tocsc()
def eq_degenerate(m_dh, v):
if np.sum(np.abs(J[:, v])) > self.config.trivial_constraint_tolerance:
# Find the columns with non-zero entries
C_ = find(J[:, v])[0]
return sum(J[c, v] * m_dh.nu[c] for c in C_) == 0
else:
# This variable does not appear in any constraint
return Constraint.Skip
else:
J = self.jacobian
def eq_degenerate(m_dh, v):
if np.sum(np.abs(J[:, v])) > self.config.trivial_constraint_tolerance:
return sum(J[c, v] * m_dh.nu[c] for c in m_dh.C) == 0
else:
# This variable does not appear in any constraint
return Constraint.Skip
m_dh.degenerate = Constraint(m_dh.V, rule=eq_degenerate)
# When y_pos = 1, nu >= m_small
# When y_pos = 0, nu >= - m_small
def eq_pos_lower(b, c):
return b.nu[c] >= -m_small + 2 * m_small * b.y_pos[c]
m_dh.pos_lower = Constraint(m_dh.C, rule=eq_pos_lower)
# When y_pos = 1, nu <= M + m_small
# When y_pos = 0, nu <= m_small
def eq_pos_upper(b, c):
return b.nu[c] <= M * b.y_pos[c] + m_small
m_dh.pos_upper = Constraint(m_dh.C, rule=eq_pos_upper)
# When y_neg = 1, nu <= -m_small
# When y_neg = 0, nu <= m_small
def eq_neg_upper(b, c):
return b.nu[c] <= m_small - 2 * m_small * b.y_neg[c]
m_dh.neg_upper = Constraint(m_dh.C, rule=eq_neg_upper)
# When y_neg = 1, nu >= -M - m_small
# When y_neg = 0, nu >= - m_small
def eq_neg_lower(b, c):
return b.nu[c] >= -M * b.y_neg[c] - m_small
m_dh.neg_lower = Constraint(m_dh.C, rule=eq_neg_lower)
# Absolute value
def eq_abs_lower(b, c):
return -b.abs_nu[c] <= b.nu[c]
m_dh.abs_lower = Constraint(m_dh.C, rule=eq_abs_lower)
def eq_abs_upper(b, c):
return b.nu[c] <= b.abs_nu[c]
m_dh.abs_upper = Constraint(m_dh.C, rule=eq_abs_upper)
# At least one constraint must be in the degenerate set
m_dh.degenerate_set_nonempty = Constraint(
expr=sum(m_dh.y_pos[c] + m_dh.y_neg[c] for c in m_dh.C) >= 1
)
# Minimize the L1-norm of nu
m_dh.obj = Objective(expr=sum(m_dh.abs_nu[c] for c in m_dh.C))
self.candidates_milp = m_dh
def _identify_candidates(self):
eq_con_list = self.nlp.get_pyomo_equality_constraints()
for i in self.candidates_milp.C:
# Check if constraint is included
if self.candidates_milp.abs_nu[i]() > self.config.m_small * MMULT:
# If it is, save the value of nu
if eq_con_list is None:
name = i
else:
name = eq_con_list[i]
self.degenerate_set[name] = self.candidates_milp.nu[i]()
def _solve_candidates_milp(self, tee: bool = False):
"""Solve MILP to generate set of candidate equations
Arguments:
tee: print solver output (default = False)
"""
_log.info("Solving Candidates MILP model.")
results = self._get_solver().solve(self.candidates_milp, tee=tee)
self.degenerate_set = {}
if check_optimal_termination(results):
# We found a degenerate set
self._identify_candidates()
else:
_log.debug(
"Solver did not return an optimal termination condition for "
"Candidates MILP. This probably indicates the system is full rank."
)
def _prepare_ids_milp(self):
"""
Prepare MILP to compute the irreducible degenerate set
"""
_log.info("Building MILP model to compute irreducible degenerate set.")
n_eq = self.jacobian.shape[0]
n_var = self.jacobian.shape[1]
# Create Pyomo model for irreducible degenerate set
m_dh = ConcreteModel()
# Create index for constraints
m_dh.C = Set(initialize=range(n_eq))
m_dh.V = Set(initialize=range(n_var))
# Specify minimum size for nu to be considered non-zero
M = self.config.M
# Add variables
m_dh.nu = Var(m_dh.C, bounds=(-M, M), initialize=1.0)
m_dh.y = Var(m_dh.C, domain=Binary)
# Constraint to enforce set is degenerate
if issparse(self.jacobian):
J = self.jacobian.tocsc()
def eq_degenerate(m_dh, v):
# Find the columns with non-zero entries
C = find(J[:, v])[0]
if len(C) == 0:
# Catch for edge-case of trivial constraint 0==0
return Constraint.Skip
return sum(J[c, v] * m_dh.nu[c] for c in C) == 0
else:
J = self.jacobian
def eq_degenerate(m_dh, v):
return sum(J[c, v] * m_dh.nu[c] for c in m_dh.C) == 0
m_dh.degenerate = Constraint(m_dh.V, rule=eq_degenerate)
def eq_lower(m_dh, c):
return -M * m_dh.y[c] <= m_dh.nu[c]
m_dh.lower = Constraint(m_dh.C, rule=eq_lower)
def eq_upper(m_dh, c):
return m_dh.nu[c] <= M * m_dh.y[c]
m_dh.upper = Constraint(m_dh.C, rule=eq_upper)
m_dh.obj = Objective(expr=sum(m_dh.y[c] for c in m_dh.C))
self.ids_milp = m_dh
def _get_ids(self):
# Create empty dictionary
ids_ = {}
eq_con_list = self.nlp.get_pyomo_equality_constraints()
for i in self.ids_milp.C:
# Check if constraint is included
if self.ids_milp.y[i]() > YTOL:
# If it is, save the value of nu
ids_[eq_con_list[i]] = self.ids_milp.nu[i]()
return ids_
def _solve_ids_milp(self, cons: Constraint, tee: bool = False):
"""Solve MILP to check if equation 'cons' is a significant component
in an irreducible degenerate set
Args:
cons: constraint to consider
tee: Boolean, print solver output (default = False)
Returns:
ids: dictionary containing the IDS
"""
_log.info(f"Solving IDS MILP for constraint {cons.name}.")
eq_con_list = self.nlp.get_pyomo_equality_constraints()
cons_idx = eq_con_list.index(cons)
# Fix weight on candidate equation
self.ids_milp.nu[cons_idx].fix(1.0)
# Solve MILP
results = self._get_solver().solve(self.ids_milp, tee=tee)
self.ids_milp.nu[cons_idx].unfix()
if check_optimal_termination(results):
# We found an irreducible degenerate set
return self._get_ids()
else:
raise ValueError(
f"Solver did not return an optimal termination condition for "
f"IDS MILP with constraint {cons.name}."
)
[docs]
def find_irreducible_degenerate_sets(self, tee=False):
"""
Compute irreducible degenerate sets
Args:
tee: Print solver output logs to screen (default=False)
"""
# Solve to find candidate equations
_log.info("Searching for Candidate Equations")
self._prepare_candidates_milp()
self._solve_candidates_milp(tee=tee)
# Find irreducible degenerate sets
# Check if degenerate_set is not empty
if self.degenerate_set:
_log.info("Searching for Irreducible Degenerate Sets")
self._prepare_ids_milp()
# Loop over candidate equations
count = 1
for k in self.degenerate_set:
print(f"Solving MILP {count} of {len(self.degenerate_set)}.")
_log.info_high(f"Solving MILP {count} of {len(self.degenerate_set)}.")
# Check if equation is a major element of an IDS
ids_ = self._solve_ids_milp(cons=k, tee=tee)
if ids_ is not None:
self.irreducible_degenerate_sets.append(ids_)
count += 1
else:
_log.debug("No candidate equations found")
[docs]
def report_irreducible_degenerate_sets(self, stream=None, tee: bool = False):
"""
Print a report of all the Irreducible Degenerate Sets (IDS) identified in
model.
Args:
stream: I/O object to write report to (default = stdout)
tee: whether to write solver output logs to screen
Returns:
None
"""
if stream is None:
stream = sys.stdout
self.find_irreducible_degenerate_sets(tee=tee)
stream.write("=" * MAX_STR_LENGTH + "\n")
stream.write("Irreducible Degenerate Sets\n")
if self.irreducible_degenerate_sets:
for i, s in enumerate(self.irreducible_degenerate_sets):
stream.write(f"\n{TAB}Irreducible Degenerate Set {i}")
stream.write(f"\n{TAB*2}nu{TAB}Constraint Name")
for k, v in s.items():
value_string = f"{v:.1f}"
sep = (2 + len(TAB) - len(value_string)) * " "
stream.write(f"\n{TAB*2}{value_string}{sep}{k.name}")
stream.write("\n")
else:
stream.write(
f"\n{TAB}No candidate equations. The Jacobian is likely full rank.\n"
)
stream.write("\n" + "=" * MAX_STR_LENGTH + "\n")
[docs]
class DegeneracyHunter:
"""
Degeneracy Hunter is a collection of utility functions to assist in mathematical
modeling in Pyomo.
"""
def __init__(self, block_or_jac, solver=None):
"""Initialize Degeneracy Hunter Object
Args:
block_or_jac: Pyomo model or Jacobian
solver: Pyomo SolverFactory
Notes:
Passing a Jacobian to Degeneracy Hunter is current untested.
"""
msg = (
"DegeneracyHunter is being deprecated in favor of the new "
"DiagnosticsToolbox."
)
deprecation_warning(msg=msg, logger=_log, version="2.2.0", remove_in="3.0.0")
block_like = False
try:
block_like = issubclass(block_or_jac.ctype, Block)
except AttributeError:
pass
if block_like:
# Add Pyomo model to the object
self.block = block_or_jac
# setup pynumero interface
if not AmplInterface.available():
raise RuntimeError("Pynumero not available.")
self.nlp = PyomoNLP(self.block)
# Get the scaled Jacobian of equality constraints
self.jac_eq = get_jacobian(self.block, equality_constraints_only=True)[0]
# Create a list of equality constraint names
self._eq_con_list = self.nlp.get_pyomo_equality_constraints()
self._var_list = self.nlp.get_pyomo_variables()
self.candidate_eqns = None
elif type(block_or_jac) is np.array: # pylint: disable=unidiomatic-typecheck
raise NotImplementedError(
"Degeneracy Hunter currently only supports analyzing a Pyomo model"
)
# # TODO: Need to refactor, document, and test support for Jacobian
# self.jac_eq = block_or_jac
# self._eq_con_list = None
else:
raise TypeError("Check the type for 'block_or_jac'")
# number of equality constraints, variables
self.n_eq = self.jac_eq.shape[0]
self.n_var = self.jac_eq.shape[1]
# Initialize solver
if solver is None:
# TODO: Test performance with open solvers such as cbc
self.solver = SolverFactory("gurobi")
self.solver.options = {"NumericFocus": 3}
else:
# TODO: Make this a custom exception following IDAES standards
# assert type(solver) is SolverFactory, "Argument solver should be type SolverFactory"
self.solver = solver
# Create spot to store singular values
self.s = None
# Set constants for MILPs
self.max_nu = 1e5
self.min_nonzero_nu = 1e-5
[docs]
def check_residuals(self, tol=1e-5, print_level=1, sort=True):
"""
Method to return a ComponentSet of all Constraint components with a
residual greater than a given threshold which appear in a model.
Args:
block: model to be studied
tol: residual threshold for inclusion in ComponentSet
print_level: controls to extend of output to the screen:
* 0 - nothing printed
* 1 - only name of constraint printed
* 2 - each constraint is pretty printed
* 3 - pretty print each constraint, then print value for included variable
sort: sort residuals in descending order for printing
Returns:
A ComponentSet including all Constraint components with a residual
greater than tol which appear in block
"""
if print_level > 0:
residual_values = large_residuals_set(self.block, tol, True)
else:
return large_residuals_set(self.block, tol, False)
print(" ")
if len(residual_values) > 0:
print("All constraints with residuals larger than", tol, ":")
if print_level == 1:
print("Count\tName\t|residual|")
if sort:
residual_values = dict(
sorted(residual_values.items(), key=itemgetter(1), reverse=True)
)
for i, (c, r) in enumerate(residual_values.items()):
if print_level == 1:
# Basic print statement. count, constraint, residual
print(i, "\t", c, "\t", r)
else:
# Pretty print constraint
print("\ncount =", i, "\t|residual| =", r)
c.pprint()
if print_level == 2:
# print values and bounds for each variable in the constraint
print("variable\tlower\tvalue\tupper")
for v in identify_variables(c.body):
self.print_variable_bounds(v)
else:
print("No constraints with residuals larger than", tol, "!")
return residual_values.keys()
[docs]
def check_variable_bounds(
self, tol=1e-5, relative=False, skip_lb=False, skip_ub=False, verbose=True
):
"""
Return a ComponentSet of all variables within a tolerance of their bounds.
Args:
block: model to be studied
tol: residual threshold for inclusion in ComponentSet (default = 1e-5)
relative : Boolean, use relative tolerance (default = False)
skip_lb: Boolean to skip lower bound (default = False)
skip_ub: Boolean to skip upper bound (default = False)
verbose: Boolean to toggle on printing to screen (default = True)
Returns:
A ComponentSet including all Constraint components with a residual
greater than tol which appear in block
"""
vnbs = variables_near_bounds_set(self.block, tol, relative, skip_lb, skip_ub)
if verbose:
print(" ")
if relative:
s = "(relative)"
else:
s = "(absolute)"
if len(vnbs) > 0:
print("Variables within", tol, s, "of their bounds:")
print("variable\tlower\tvalue\tupper")
for v in vnbs:
self.print_variable_bounds(v)
else:
print("No variables within", tol, s, "of their bounds.")
return vnbs
[docs]
def check_rank_equality_constraints(self, tol=1e-6, dense=False):
"""
Method to check the rank of the Jacobian of the equality constraints
Args:
tol: Tolerance for smallest singular value (default=1E-6)
dense: If True, use a dense svd to perform singular value analysis,
which tends to be slower but more reliable than svds
Returns:
Number of singular values less than tolerance (-1 means error)
"""
print("\nChecking rank of Jacobian of equality constraints...")
print(
"Model contains",
self.n_eq,
"equality constraints and",
self.n_var,
"variables.",
)
counter = 0
if self.n_eq > 1:
if self.s is None:
self.svd_analysis(dense=dense)
n = len(self.s)
print("Smallest singular value(s):")
for i in range(n):
print("%.3E" % self.s[i])
if self.s[i] < tol:
counter += 1
else:
print(f"Only singular value: {norm(self.jac_eq,'fro')}")
return counter
# TODO: Refactor, this should not be a staticmethod
@staticmethod
def _prepare_ids_milp(jac_eq, M=1e5):
"""
Prepare MILP to compute the irreducible degenerate set
Args:
jac_eq Jacobian of equality constraints [matrix]
M: largest value for nu
Returns:
m_dh: Pyomo model to calculate irreducible degenerate sets
"""
n_eq = jac_eq.shape[0]
n_var = jac_eq.shape[1]
# Create Pyomo model for irreducible degenerate set
m_dh = ConcreteModel()
# Create index for constraints
m_dh.C = Set(initialize=range(n_eq))
m_dh.V = Set(initialize=range(n_var))
# Add variables
m_dh.nu = Var(m_dh.C, bounds=(-M, M), initialize=1.0)
m_dh.y = Var(m_dh.C, domain=Binary)
# Constraint to enforce set is degenerate
if issparse(jac_eq):
m_dh.J = jac_eq.tocsc()
def eq_degenerate(m_dh, v):
# Find the columns with non-zero entries
C_ = find(m_dh.J[:, v])[0]
return sum(m_dh.J[c, v] * m_dh.nu[c] for c in C_) == 0
else:
m_dh.J = jac_eq
def eq_degenerate(m_dh, v):
return sum(m_dh.J[c, v] * m_dh.nu[c] for c in m_dh.C) == 0
m_dh.degenerate = Constraint(m_dh.V, rule=eq_degenerate)
def eq_lower(m_dh, c):
return -M * m_dh.y[c] <= m_dh.nu[c]
m_dh.lower = Constraint(m_dh.C, rule=eq_lower)
def eq_upper(m_dh, c):
return m_dh.nu[c] <= M * m_dh.y[c]
m_dh.upper = Constraint(m_dh.C, rule=eq_upper)
m_dh.obj = Objective(expr=sum(m_dh.y[c] for c in m_dh.C))
return m_dh
# TODO: Refactor, this should not be a staticmethod
@staticmethod
def _prepare_find_candidates_milp(jac_eq, M=1e5, m_small=1e-5):
"""
Prepare MILP to find candidate equations for consider for IDS
Args:
jac_eq Jacobian of equality constraints [matrix]
M: maximum value for nu
m_small: smallest value for nu to be considered non-zero
Returns:
m_fc: Pyomo model to find candidates
"""
n_eq = jac_eq.shape[0]
n_var = jac_eq.shape[1]
# Create Pyomo model for irreducible degenerate set
m_dh = ConcreteModel()
# Create index for constraints
m_dh.C = Set(initialize=range(n_eq))
m_dh.V = Set(initialize=range(n_var))
# Specify minimum size for nu to be considered non-zero
m_dh.m_small = m_small
# Add variables
m_dh.nu = Var(m_dh.C, bounds=(-M - m_small, M + m_small), initialize=1.0)
m_dh.y_pos = Var(m_dh.C, domain=Binary)
m_dh.y_neg = Var(m_dh.C, domain=Binary)
m_dh.abs_nu = Var(m_dh.C, bounds=(0, M + m_small))
m_dh.pos_xor_neg = Constraint(m_dh.C)
# Constraint to enforce set is degenerate
if issparse(jac_eq):
m_dh.J = jac_eq.tocsc()
def eq_degenerate(m_dh, v):
if np.sum(np.abs(m_dh.J[:, v])) > 1e-6:
# Find the columns with non-zero entries
C_ = find(m_dh.J[:, v])[0]
return sum(m_dh.J[c, v] * m_dh.nu[c] for c in C_) == 0
else:
# This variable does not appear in any constraint
return Constraint.Skip
else:
m_dh.J = jac_eq
def eq_degenerate(m_dh, v):
if np.sum(np.abs(m_dh.J[:, v])) > 1e-6:
return sum(m_dh.J[c, v] * m_dh.nu[c] for c in m_dh.C) == 0
else:
# This variable does not appear in any constraint
return Constraint.Skip
m_dh.degenerate = Constraint(m_dh.V, rule=eq_degenerate)
# When y_pos = 1, nu >= m_small
# When y_pos = 0, nu >= - m_small
def eq_pos_lower(m_dh, c):
return m_dh.nu[c] >= -m_small + 2 * m_small * m_dh.y_pos[c]
m_dh.pos_lower = Constraint(m_dh.C, rule=eq_pos_lower)
# When y_pos = 1, nu <= M + m_small
# When y_pos = 0, nu <= m_small
def eq_pos_upper(m_dh, c):
return m_dh.nu[c] <= M * m_dh.y_pos[c] + m_small
m_dh.pos_upper = Constraint(m_dh.C, rule=eq_pos_upper)
# When y_neg = 1, nu <= -m_small
# When y_neg = 0, nu <= m_small
def eq_neg_upper(m_dh, c):
return m_dh.nu[c] <= m_small - 2 * m_small * m_dh.y_neg[c]
m_dh.neg_upper = Constraint(m_dh.C, rule=eq_neg_upper)
# When y_neg = 1, nu >= -M - m_small
# When y_neg = 0, nu >= - m_small
def eq_neg_lower(m_dh, c):
return m_dh.nu[c] >= -M * m_dh.y_neg[c] - m_small
m_dh.neg_lower = Constraint(m_dh.C, rule=eq_neg_lower)
# Absolute value
def eq_abs_lower(m_dh, c):
return -m_dh.abs_nu[c] <= m_dh.nu[c]
m_dh.abs_lower = Constraint(m_dh.C, rule=eq_abs_lower)
def eq_abs_upper(m_dh, c):
return m_dh.nu[c] <= m_dh.abs_nu[c]
m_dh.abs_upper = Constraint(m_dh.C, rule=eq_abs_upper)
# At least one constraint must be in the degenerate set
m_dh.degenerate_set_nonempty = Constraint(
expr=sum(m_dh.y_pos[c] + m_dh.y_neg[c] for c in m_dh.C) >= 1
)
# Minimize the L1-norm of nu
m_dh.obj = Objective(expr=sum(m_dh.abs_nu[c] for c in m_dh.C))
return m_dh
# TODO: Refactor, this should not be a staticmethod
@staticmethod
def _check_candidate_ids(ids_milp, solver, c, eq_con_list=None, tee=False):
"""Solve MILP to check if equation 'c' is a significant component in an irreducible
degenerate set
Args:
ids_milp: Pyomo model to calculate IDS
solver: Pyomo solver (must support MILP)
c: index for the constraint to consider [integer]
eq_con_list: names of equality constraints. If none, use elements of ids_milp (default=None)
tee: Boolean, print solver output (default = False)
Returns:
ids: either None or dictionary containing the IDS
"""
# Fix weight on candidate equation
ids_milp.nu[c].fix(1.0)
# Solve MILP
results = solver.solve(ids_milp, tee=tee)
ids_milp.nu[c].unfix()
if check_optimal_termination(results):
# We found an irreducible degenerate set
# Create empty dictionary
ids_ = {}
for i in ids_milp.C:
# Check if constraint is included
if ids_milp.y[i]() > 0.9:
# If it is, save the value of nu
if eq_con_list is None:
name = i
else:
name = eq_con_list[i]
ids_[name] = ids_milp.nu[i]()
return ids_
else:
return None
# TODO: Refactor, this should not be a staticmethod
@staticmethod
def _find_candidate_eqs(candidates_milp, solver, eq_con_list=None, tee=False):
"""Solve MILP to generate set of candidate equations
Arguments:
candidates_milp: Pyomo model to calculate IDS
solver: Pyomo solver (must support MILP)
eq_con_list: names of equality constraints. If none, use elements of ids_milp (default=None)
tee: Boolean, print solver output (default = False)
Returns:
candidate_eqns: either None or list of indices
degenerate_set: either None or dictionary containing the degenerate_set
"""
results = solver.solve(candidates_milp, tee=tee)
if check_optimal_termination(results):
# We found a degenerate set
# Create empty dictionary
ds_ = {}
# Create empty list
candidate_eqns = []
for i in candidates_milp.C:
# Check if constraint is included
if candidates_milp.abs_nu[i]() > candidates_milp.m_small * 0.99:
# If it is, save the value of nu
if eq_con_list is None:
name = i
else:
name = eq_con_list[i]
ds_[name] = candidates_milp.nu[i]()
candidate_eqns.append(i)
return candidate_eqns, ds_
else:
return None, None
[docs]
def svd_analysis(self, n_sv=None, dense=False):
"""
Perform SVD analysis of the constraint Jacobian
Args:
n_sv: number of smallest singular values to compute
dense: If True, use a dense svd to perform singular value analysis,
which tends to be slower but more reliable than svds
Returns:
None
Actions:
Stores SVD results in object
"""
if n_sv is None:
# Determine the number of singular values to compute
# The "-1" is needed to avoid an error with svds
n_sv = min(10, min(self.n_eq, self.n_var) - 1)
if self.n_eq > 1:
if n_sv >= min(self.n_eq, self.n_var):
raise ValueError(
f"For a {self.n_eq} by {self.n_var} system, svd_analysis "
f"can compute at most {min(self.n_eq, self.n_var) - 1} "
f"singular values and vectors, but {n_sv} were called for."
)
if n_sv < 1:
raise ValueError(f"Nonsense value for n_sv={n_sv} received.")
print("Computing the", n_sv, "smallest singular value(s)")
# Perform SVD
# Recall J is a n_eq x n_var matrix
# Thus U is a n_eq x n_eq matrix
# And V is a n_var x n_var
# (U or V may be smaller in economy mode)
if dense:
u, s, vT = svd(self.jac_eq.todense(), full_matrices=False)
# Reorder singular values and vectors so that the singular
# values are from least to greatest
u = np.flip(u[:, -n_sv:], axis=1)
s = np.flip(s[-n_sv:], axis=0)
vT = np.flip(vT[-n_sv:, :], axis=0)
else:
# svds does not guarantee the order in which it generates
# singular values, but typically generates them least-to-greatest.
# Maybe the warning is for singular values of nearly equal
# magnitude or a similar edge case?
u, s, vT = svds(self.jac_eq, k=n_sv, which="SM") # , solver='lobpcg')
# Save results
self.u = u
self.s = s
self.v = vT.transpose()
else:
raise ValueError(
"Model needs at least 2 equality constraints to perform svd_analysis."
)
[docs]
def underdetermined_variables_and_constraints(self, n_calc=1, tol=0.1, dense=False):
"""
Determines constraints and variables associated with the smallest
singular values by having large components in the left and right
singular vectors, respectively, associated with those singular values.
Args:
n_calc: The singular value, as ordered from least to greatest
starting from 1, to calculate associated constraints and variables
tol: Size below which to ignore constraints and variables in
the singular vector
dense: If True, use a dense svd to perform singular value analysis,
which tends to be slower but more reliable than svds
Returns:
None
"""
if self.s is None:
self.svd_analysis(
n_sv=max(n_calc, min(10, min(self.n_eq, self.n_var) - 1)), dense=dense
)
n_sv = len(self.s)
if n_sv < n_calc:
raise ValueError(
f"User wanted constraints and variables associated "
f"with the {n_calc}-th smallest singular value, "
f"but only {n_sv} small singular values have been "
f"calculated. Run svd_analysis again and specify "
f"n_sv>={n_calc}."
)
print("Column: Variable")
for i in np.where(abs(self.v[:, n_calc - 1]) > tol)[0]:
print(str(i) + ": " + self._var_list[i].name)
print("")
print("Row: Constraint")
for i in np.where(abs(self.u[:, n_calc - 1]) > tol)[0]:
print(str(i) + ": " + self._eq_con_list[i].name)
[docs]
def find_candidate_equations(self, verbose=True, tee=False):
"""
Solve MILP to find a degenerate set and candidate equations
Args:
verbose: Print information to the screen (default=True)
tee: Print solver output to screen (default=True)
Returns:
ds: either None or dictionary of candidate equations
"""
if verbose:
print("*** Searching for a Single Degenerate Set ***")
print("Building MILP model...")
self.candidates_milp = self._prepare_find_candidates_milp(
self.jac_eq, self.max_nu, self.min_nonzero_nu
)
if verbose:
print("Solving MILP model...")
ce, ds = self._find_candidate_eqs(
self.candidates_milp, self.solver, self._eq_con_list, tee
)
if ce is not None:
self.candidate_eqns = ce
return ds
[docs]
def find_irreducible_degenerate_sets(self, verbose=True, tee=False):
"""
Compute irreducible degenerate sets
Args:
verbose: Print information to the screen (default=True)
tee: Print solver output to screen (default=True)
Returns:
irreducible_degenerate_sets: list of irreducible degenerate sets
"""
# If there are no candidate equations, find them!
if not self.candidate_eqns:
self.find_candidate_equations()
irreducible_degenerate_sets = []
# Check if it is empty or None
if self.candidate_eqns:
if verbose:
print("*** Searching for Irreducible Degenerate Sets ***")
print("Building MILP model...")
self.dh_milp = self._prepare_ids_milp(self.jac_eq, self.max_nu)
# Loop over candidate equations
for i, c in enumerate(self.candidate_eqns):
if verbose:
print("Solving MILP", i + 1, "of", len(self.candidate_eqns), "...")
# Check if equation 'c' is a major element of an IDS
ids_ = self._check_candidate_ids(
self.dh_milp, self.solver, c, self._eq_con_list, tee
)
if ids_ is not None:
irreducible_degenerate_sets.append(ids_)
if verbose:
for i, s in enumerate(irreducible_degenerate_sets):
print("\nIrreducible Degenerate Set", i)
print("nu\tConstraint Name")
for k, v in s.items():
print(v, "\t", k)
else:
print("No candidate equations. The Jacobian is likely full rank.")
return irreducible_degenerate_sets
### Helper Functions
# Note: This makes sense as a static method
[docs]
@staticmethod
def print_variable_bounds(v):
"""
Print variable, bounds, and value
Args:
v: variable
Returns:
None
"""
print(v, "\t\t", v.lb, "\t", v.value, "\t", v.ub)
[docs]
def psweep_runner_validator(val):
"""Domain validator for Parameter Sweep runners
Args:
val : value to be checked
Returns:
TypeError if val is not a valid callback
"""
if issubclass(val, ParameterSweepBase):
return val
raise ValueError("Workflow runner must be a subclass of ParameterSweepBase.")
CACONFIG = ConfigDict()
CACONFIG.declare(
"input_specification",
ConfigValue(
domain=is_psweepspec,
doc="ParameterSweepSpecification object defining inputs to be sampled",
),
)
CACONFIG.declare(
"workflow_runner",
ConfigValue(
default=SequentialSweepRunner,
domain=psweep_runner_validator,
doc="Parameter sweep workflow runner",
),
)
CACONFIG.declare(
"solver_options",
ConfigValue(
domain=None,
description="Options to pass to IPOPT.",
),
)
CACONFIG.declare(
"halt_on_error",
ConfigValue(
default=False,
domain=bool,
doc="Whether to halt execution of parameter sweep on encountering a solver error (default=False).",
),
)
[docs]
class IpoptConvergenceAnalysis:
"""
Tool to perform a parameter sweep of model checking for numerical issues and
convergence characteristics. Users may specify an IDAES ParameterSweep class to
perform the sweep (default is SequentialSweepRunner).
"""
CONFIG = CACONFIG()
def __init__(self, model, **kwargs):
# TODO: In future may want to generalise this to accept indexed blocks
# However, for now some of the tools do not support indexed blocks
if not isinstance(model, BlockData):
raise TypeError(
"model argument must be an instance of a Pyomo BlockData object "
"(either a scalar Block or an element of an indexed Block)."
)
self.config = self.CONFIG(kwargs)
self._model = model
solver = SolverFactory("ipopt")
if self.config.solver_options is not None:
solver.options = self.config.solver_options
self._psweep = self.config.workflow_runner(
input_specification=self.config.input_specification,
build_model=self._build_model,
rebuild_model=True,
run_model=self._run_model,
build_outputs=self._build_outputs,
halt_on_error=self.config.halt_on_error,
handle_solver_error=self._recourse,
solver=solver,
)
@property
def results(self):
"""
Returns the results of the IpoptConvergenceAnalysis run
"""
return self._psweep.results
@property
def samples(self):
"""
Returns the set of input samples for convergence analysis (pandas DataFrame)
"""
return self._psweep.get_input_samples()
[docs]
def run_convergence_analysis(self):
"""
Execute convergence analysis sweep by calling execute_parameter_sweep
method in specified runner.
Returns:
dict of results from parameter sweep
"""
return self._psweep.execute_parameter_sweep()
[docs]
def run_convergence_analysis_from_dict(self, input_dict: dict):
"""
Execute convergence analysis sweep using specification defined in dict.
Args:
input_dict: dict to load specification from
Returns:
dict of results from parameter sweep
"""
self.from_dict(input_dict)
return self.run_convergence_analysis()
[docs]
def run_convergence_analysis_from_file(self, filename: str):
"""
Execute convergence analysis sweep using specification defined in json file.
Args:
filename: name of file to load specification from as string
Returns:
dict of results from parameter sweep
"""
self.from_json_file(filename)
return self.run_convergence_analysis()
[docs]
def compare_convergence_to_baseline(
self, filename: str, rel_tol: float = 0.1, abs_tol: float = 1
):
"""
Run convergence analysis and compare results to those defined in baseline file.
Args:
filename: name of baseline file to load specification from as string
rel_tol: relative tolerance to use for comparing number of iterations
abs_tol: absolute tolerance to use for comparing number of iterations
Returns:
dict containing lists of differences between convergence analysis run and baseline
"""
with open(filename, "r") as f:
# Load file manually so we have saved results
jdict = json.load(f)
f.close()
# Run convergence analysis from dict
self.run_convergence_analysis_from_dict(jdict)
# Compare results
return self._compare_results_to_dict(jdict, rel_tol=rel_tol, abs_tol=abs_tol)
[docs]
def assert_baseline_comparison(
self, filename: str, rel_tol: float = 0.1, abs_tol: float = 1
):
"""
Run convergence analysis and assert no differences in results to those defined
in baseline file.
Args:
filename: name of baseline file to load specification from as string
rel_tol: relative tolerance to use for comparing number of iterations
abs_tol: absolute tolerance to use for comparing number of iterations
Raises:
AssertionError if results of convergence analysis do not match baseline
"""
diffs = self.compare_convergence_to_baseline(
filename, rel_tol=rel_tol, abs_tol=abs_tol
)
if any(len(v) != 0 for v in diffs.values()):
raise AssertionError("Convergence analysis does not match baseline")
[docs]
def report_convergence_summary(self, stream=None):
"""
Reports a brief summary of the model convergence run.
Args:
stream: Optional output stream to print results to.
Returns:
None
"""
if stream is None:
stream = sys.stdout
successes = 0
failures = 0
runs_w_restoration = 0
runs_w_regulariztion = 0
runs_w_num_iss = 0
for v in self.results.values():
# Check for differences
if v["success"]:
successes += 1
else:
failures += 1
if v["results"]["iters_in_restoration"] > 0:
runs_w_restoration += 1
if v["results"]["iters_w_regularization"] > 0:
runs_w_regulariztion += 1
if v["results"]["numerical_issues"] > 0:
runs_w_num_iss += 1
stream.write(
f"Successes: {successes}, Failures {failures} ({100*successes/(successes+failures)}%)\n"
)
stream.write(f"Runs with Restoration: {runs_w_restoration}\n")
stream.write(f"Runs with Regularization: {runs_w_regulariztion}\n")
stream.write(f"Runs with Numerical Issues: {runs_w_num_iss}\n")
[docs]
def to_dict(self):
"""
Serialize specification and current results to dict form
Returns:
dict
"""
return self._psweep.to_dict()
[docs]
def from_dict(self, input_dict):
"""
Load specification and results from dict.
Args:
input_dict: dict to load from
Returns:
None
"""
return self._psweep.from_dict(input_dict)
[docs]
def to_json_file(self, filename):
"""
Write specification and results to json file.
Args:
filename: name of file to write to as string
Returns:
None
"""
return self._psweep.to_json_file(filename)
[docs]
def from_json_file(self, filename):
"""
Load specification and results from json file.
Args:
filename: name of file to load from as string
Returns:
None
"""
return self._psweep.from_json_file(filename)
def _build_model(self):
# Create new instance of model by cloning
return self._model.clone()
def _run_model(self, model, solver):
# Run model using IPOPT and collect stats
(
status,
iters,
iters_in_restoration,
iters_w_regularization,
time,
) = self._run_ipopt_with_stats(model, solver)
run_stats = [
iters,
iters_in_restoration,
iters_w_regularization,
time,
]
success = check_optimal_termination(status)
return success, run_stats
@staticmethod
def _build_outputs(model, run_stats):
# Run model diagnostics numerical checks
dt = DiagnosticsToolbox(model=model)
warnings = False
try:
dt.assert_no_numerical_warnings()
except AssertionError:
warnings = True
# Compile Results
return {
"iters": run_stats[0],
"iters_in_restoration": run_stats[1],
"iters_w_regularization": run_stats[2],
"time": run_stats[3],
"numerical_issues": warnings,
}
@staticmethod
def _recourse(model):
# Return a default dict indicating no results
return {
"iters": -1,
"iters_in_restoration": -1,
"iters_w_regularization": -1,
"time": -1,
"numerical_issues": -1,
}
@staticmethod
def _parse_ipopt_output(ipopt_file):
# PArse IPOPT logs and return key metrics
# ToDO: Check for final iteration with regularization or restoration
iters = 0
iters_in_restoration = 0
iters_w_regularization = 0
time = 0
# parse the output file to get the iteration count, solver times, etc.
with open(ipopt_file, "r") as f:
parseline = False
for line in f:
if line.startswith("iter"):
# This marks the start of the iteration logging, set parseline True
parseline = True
elif line.startswith("Number of Iterations....:"):
# Marks end of iteration logging, set parseline False
parseline = False
tokens = line.split()
iters = int(tokens[3])
elif parseline:
# Line contains details of an iteration, look for restoration or regularization
tokens = line.split()
try:
if not tokens[6] == "-":
# Iteration with regularization
iters_w_regularization += 1
if tokens[0].endswith("r"):
# Iteration in restoration
iters_in_restoration += 1
except IndexError:
# Blank line at end of iteration list, so assume we hit this
pass
elif line.startswith(
"Total CPU secs in IPOPT (w/o function evaluations) ="
):
tokens = line.split()
time += float(tokens[9])
elif line.startswith(
"Total CPU secs in NLP function evaluations ="
):
tokens = line.split()
time += float(tokens[8])
return iters, iters_in_restoration, iters_w_regularization, time
def _run_ipopt_with_stats(self, model, solver, max_iter=500, max_cpu_time=120):
# Solve model using provided solver (assumed to be IPOPT) and parse logs
# ToDo: Check that the "solver" is, in fact, IPOPT
TempfileManager.push()
tempfile = TempfileManager.create_tempfile(suffix="ipopt_out", text=True)
opts = {
"output_file": tempfile,
"max_iter": max_iter,
"max_cpu_time": max_cpu_time,
}
status_obj = solver.solve(model, options=opts, tee=True)
(
iters,
iters_in_restoration,
iters_w_regularization,
time,
) = self._parse_ipopt_output(tempfile)
TempfileManager.pop(remove=True)
return status_obj, iters, iters_in_restoration, iters_w_regularization, time
def _compare_results_to_dict(
self,
compare_dict: dict,
rel_tol: float = 0.1,
abs_tol: float = 1,
):
# Compare results
success_diff = []
iters_diff = []
restore_diff = []
reg_diff = []
num_iss_diff = []
for k, v in self.results.items():
# Get sample result from compare dict
try:
comp = compare_dict["results"][k]
except KeyError:
# Reading from json often converts ints to strings
# Check to see if the index as a string works
comp = compare_dict["results"][str(k)]
# Check for differences
if v["success"] != comp["success"]:
success_diff.append(k)
if not isclose(
v["results"]["iters"],
comp["results"]["iters"],
rel_tol=rel_tol,
abs_tol=abs_tol,
):
iters_diff.append(k)
if not isclose(
v["results"]["iters_in_restoration"],
comp["results"]["iters_in_restoration"],
rel_tol=rel_tol,
abs_tol=abs_tol,
):
restore_diff.append(k)
if not isclose(
v["results"]["iters_w_regularization"],
comp["results"]["iters_w_regularization"],
rel_tol=rel_tol,
abs_tol=abs_tol,
):
reg_diff.append(k)
if v["results"]["numerical_issues"] != comp["results"]["numerical_issues"]:
num_iss_diff.append(k)
return {
"success": success_diff,
"iters": iters_diff,
"iters_in_restoration": restore_diff,
"iters_w_regularization": reg_diff,
"numerical_issues": num_iss_diff,
}
[docs]
def get_valid_range_of_component(component):
"""
Return the valid range for a component as specified in the model metadata.
Args:
component: Pyomo component to get valid range for
Returns:
valid range for component if found. This will either be a 2-tuple (low, high) or None.
Raises:
AttributeError if metadata object not found
"""
# Get metadata for component
parent = component.parent_block()
try:
if hasattr(parent, "params"):
meta = parent.params.get_metadata().properties
else:
meta = parent.get_metadata().properties
except AttributeError:
raise AttributeError(f"Could not find metadata for component {component.name}")
# Get valid range from metadata
try:
n, i = meta.get_name_and_index(component.parent_component().local_name)
cmeta = getattr(meta, n)[i]
valid_range = cmeta.valid_range
except ValueError:
# Assume no metadata for this property
_log.debug(f"No metadata entry for component {component.name}; returning None")
valid_range = None
return valid_range
[docs]
def set_bounds_from_valid_range(component, descend_into=True):
"""
Set bounds on Pyomo components based on valid range recorded in model metadata.
WARNING - this function will overwrite any bounds already set on the component/model.
This function will iterate over component data objects in Blocks and indexed components.
Args:
component: Pyomo component to set bounds on. This can be a Block, Var or Param.
descend_into: (optional) Whether to descend into components on child Blocks (default=True)
Returns:
None
"""
if component.is_indexed():
for k in component:
set_bounds_from_valid_range(component[k])
elif isinstance(component, BlockData):
for i in component.component_data_objects(
ctype=[Var, Param], descend_into=descend_into
):
set_bounds_from_valid_range(i)
elif not hasattr(component, "bounds"):
raise TypeError(
f"Component {component.name} does not have bounds. Only Vars and Params have bounds."
)
else:
valid_range = get_valid_range_of_component(component)
if valid_range is None:
valid_range = (None, None)
component.setlb(valid_range[0])
component.setub(valid_range[1])
[docs]
def list_components_with_values_outside_valid_range(component, descend_into=True):
"""
Return a list of component objects with values outside the valid range specified in the model
metadata.
This function will iterate over component data objects in Blocks and indexed components.
Args:
component: Pyomo component to search for component outside of range on.
This can be a Block, Var or Param.
descend_into: (optional) Whether to descend into components on child Blocks (default=True)
Returns:
list of component objects found with values outside the valid range.
"""
comp_list = []
if component.is_indexed():
for k in component:
comp_list.extend(
list_components_with_values_outside_valid_range(component[k])
)
elif isinstance(component, BlockData):
for i in component.component_data_objects(
ctype=[Var, Param], descend_into=descend_into
):
comp_list.extend(list_components_with_values_outside_valid_range(i))
else:
valid_range = get_valid_range_of_component(component)
if valid_range is not None:
cval = value(component)
if cval is not None and (cval < valid_range[0] or cval > valid_range[1]):
comp_list.append(component)
return comp_list
[docs]
def ipopt_solve_halt_on_error(model, options=None):
"""
Run IPOPT to solve model with debugging output enabled.
This function calls IPOPT to solve the model provided with settings
to halt on AMPL evaluation errors and report these with symbolic names.
Args:
model: Pyomo model to be solved.
options: solver options to be passed to IPOPT
Returns:
Pyomo solver results dict
"""
if options is None:
options = {}
solver = SolverFactory("ipopt")
solver.options = options
solver.options["halt_on_ampl_error"] = "yes"
return solver.solve(
model, tee=True, symbolic_solver_labels=True, export_defined_variables=False
)
[docs]
def check_parallel_jacobian(
model,
tolerance: float = 1e-4,
direction: str = "row",
jac=None,
nlp=None,
):
"""
Check for near-parallel rows or columns in the Jacobian.
Near-parallel rows or columns indicate a potential degeneracy in the model,
as this means that the associated constraints or variables are (near)
duplicates of each other.
For efficiency, the ``jac`` and ``nlp`` arguments may be provided if they are
already available. If these are provided, the provided model is not used. If
either ``jac`` or ``nlp`` is not provided, a Jacobian and ``PyomoNLP`` are
computed using the model.
This method is based on work published in:
Klotz, E., Identification, Assessment, and Correction of Ill-Conditioning and
Numerical Instability in Linear and Integer Programs, Informs 2014, pgs. 54-108
https://pubsonline.informs.org/doi/epdf/10.1287/educ.2014.0130
Args:
model: model to be analysed
tolerance: tolerance to use to determine if constraints/variables are parallel
direction: 'row' (default, constraints) or 'column' (variables)
jac: model Jacobian as a ``scipy.sparse.coo_matrix``, optional
nlp: ``PyomoNLP`` of model, optional
Returns:
list of 2-tuples containing parallel Pyomo components
"""
# Thanks to Robby Parker for the sparse matrix implementation and
# significant performance improvements
if direction not in ["row", "column"]:
raise ValueError(
f"Unrecognised value for direction ({direction}). "
"Must be 'row' or 'column'."
)
if jac is None or nlp is None:
jac, nlp = get_jacobian(model, scaled=False)
# Get vectors that we will check, and the Pyomo components
# they correspond to.
if direction == "row":
components = nlp.get_pyomo_constraints()
csrjac = jac.tocsr()
# Make everything a column vector (CSC) for consistency
vectors = [csrjac[i, :].transpose().tocsc() for i in range(len(components))]
elif direction == "column":
components = nlp.get_pyomo_variables()
cscjac = jac.tocsc()
vectors = [cscjac[:, i] for i in range(len(components))]
# List to store pairs of parallel components
parallel = []
vectors_by_nz = {}
for vecidx, vec in enumerate(vectors):
maxval = max(np.abs(vec.data))
# Construct tuple of sorted col/row indices that participate
# in this vector (with non-negligible coefficient).
nz = tuple(
sorted(
idx
for idx, val in zip(vec.indices, vec.data)
if abs(val) > tolerance and abs(val) / maxval > tolerance
)
)
if nz in vectors_by_nz:
# Store the index as well so we know what component this
# correrponds to.
vectors_by_nz[nz].append((vec, vecidx))
else:
vectors_by_nz[nz] = [(vec, vecidx)]
for vecs in vectors_by_nz.values():
for idx, (u, uidx) in enumerate(vecs):
# idx is the "local index", uidx is the "global index"
# Frobenius norm of the matrix is 2-norm of this column vector
unorm = norm(u, ord="fro")
for v, vidx in vecs[idx + 1 :]:
vnorm = norm(v, ord="fro")
# Explicitly multiply a row vector * column vector
prod = u.transpose().dot(v)
absprod = abs(prod[0, 0])
diff = abs(absprod - unorm * vnorm)
if diff <= tolerance or diff <= tolerance * max(unorm, vnorm):
parallel.append((uidx, vidx))
parallel = [(components[uidx], components[vidx]) for uidx, vidx in parallel]
return parallel
[docs]
def compute_ill_conditioning_certificate(
model,
target_feasibility_tol: float = 1e-06,
ratio_cutoff: float = 1e-04,
direction: str = "row",
):
"""
Finds constraints (rows) or variables (columns) in the model Jacobian that
may be contributing to ill conditioning.
This method is based on work published in:
Klotz, E., Identification, Assessment, and Correction of Ill-Conditioning and
Numerical Instability in Linear and Integer Programs, Informs 2014, pgs. 54-108
https://pubsonline.informs.org/doi/epdf/10.1287/educ.2014.0130
Args:
model: model to be analysed
target_feasibility_tol: target tolerance for solving ill conditioning problem
ratio_cutoff: cut-off for reporting ill conditioning
direction: 'row' (default, constraints) or 'column' (variables)
Returns:
list of strings reporting ill-conditioned variables/constraints and their
associated y values
"""
_log.warning(
"Ill conditioning checks are a beta capability. Please be aware that "
"the name, location, and API for this may change in future releases."
)
# Thanks to B. Knueven for this implementation
if direction not in ["row", "column"]:
raise ValueError(
f"Unrecognised value for direction ({direction}). "
"Must be 'row' or 'column'."
)
jac, nlp = get_jacobian(model, scaled=False)
inverse_target_kappa = 1e-16 / target_feasibility_tol
# Set up the components we will analyze, either row or column
if direction == "row":
components = nlp.get_pyomo_constraints()
components_set = RangeSet(0, len(components) - 1)
results_set = RangeSet(0, nlp.n_primals() - 1)
jac = jac.transpose().tocsr()
elif direction == "column":
components = nlp.get_pyomo_variables()
components_set = RangeSet(0, len(components) - 1)
results_set = RangeSet(0, nlp.n_constraints() - 1)
jac = jac.tocsr()
# Build test problem
inf_prob = ConcreteModel()
inf_prob.y_pos = Var(components_set, bounds=(0, None))
inf_prob.y_neg = Var(components_set, bounds=(0, None))
inf_prob.y = Expression(components_set, rule=lambda m, i: m.y_pos[i] - m.y_neg[i])
inf_prob.res_pos = Var(results_set, bounds=(0, None))
inf_prob.res_neg = Var(results_set, bounds=(0, None))
inf_prob.res = Expression(
results_set, rule=lambda m, i: m.res_pos[i] - m.res_neg[i]
)
def b_rule(b, i):
lhs = 0.0
row = jac.getrow(i)
for j, val in zip(row.indices, row.data):
lhs += val * b.y[j]
return lhs == b.res[i]
inf_prob.by = Constraint(results_set, rule=b_rule)
# Normalization of y
inf_prob.normalize = Constraint(
expr=1 == sum(inf_prob.y_pos.values()) - sum(inf_prob.y_neg.values())
)
inf_prob.y_norm = Var()
inf_prob.y_norm_constr = Constraint(
expr=inf_prob.y_norm
== sum(inf_prob.y_pos.values()) + sum(inf_prob.y_neg.values())
)
inf_prob.res_norm = Var()
inf_prob.res_norm_constr = Constraint(
expr=inf_prob.res_norm
== sum(inf_prob.res_pos.values()) + sum(inf_prob.res_neg.values())
)
# Objective -- minimize residual
inf_prob.min_res = Objective(expr=inf_prob.res_norm)
solver = SolverFactory("cbc") # TODO: Consider making this an option
# tighten tolerances # TODO: If solver is an option, need to allow user options
solver.options["primalT"] = target_feasibility_tol * 1e-1
solver.options["dualT"] = target_feasibility_tol * 1e-1
results = solver.solve(inf_prob, tee=False)
if not check_optimal_termination(results):
# TODO: maybe we should tighten tolerances first?
raise RuntimeError("Ill conditioning diagnostic problem infeasible")
result_norm = inf_prob.res_norm.value
if result_norm < 0.0:
# TODO: try again with tighter tolerances?
raise RuntimeError(
"Ill conditioning diagnostic problem has numerically troublesome solution"
)
if result_norm >= inverse_target_kappa:
return []
# find an equivalent solution which minimizes y_norm
inf_prob.min_res.deactivate()
inf_prob.res_norm.fix()
inf_prob.min_y = Objective(expr=inf_prob.y_norm)
# if this problem is numerically infeasible, we can still report something to the user
results = solver.solve(inf_prob, tee=False, load_solutions=False)
if check_optimal_termination(results):
inf_prob.solutions.load_from(results)
ill_cond = []
slist = sorted(
inf_prob.y, key=lambda dict_key: abs(value(inf_prob.y[dict_key])), reverse=True
)
cutoff = None
for i in slist:
if cutoff is None:
cutoff = abs(value(inf_prob.y[i])) * ratio_cutoff
val = value(inf_prob.y[i])
if abs(val) < cutoff:
break
ill_cond.append((components[i], val))
return ill_cond
# -------------------------------------------------------------------------------------------
# Private module functions
def _var_in_block(var, block):
parent = var.parent_block()
while parent is not None:
if parent is block:
return True
parent = parent.parent_block()
return False
def _vars_fixed_to_zero(model):
# Set of variables fixed to 0
zero_vars = ComponentSet()
for v in model.component_data_objects(Var, descend_into=True):
if v.fixed and value(v) == 0:
zero_vars.add(v)
return zero_vars
def _vars_near_zero(model, variable_zero_value_tolerance):
# Set of variables with values close to 0
near_zero_vars = ComponentSet()
for v in model.component_data_objects(Var, descend_into=True):
if v.value is not None and abs(value(v)) <= variable_zero_value_tolerance:
near_zero_vars.add(v)
return near_zero_vars
def _vars_violating_bounds(model, tolerance):
violated_bounds = ComponentSet()
for v in model.component_data_objects(Var, descend_into=True):
if v.value is not None:
if v.lb is not None and v.value <= v.lb - tolerance:
violated_bounds.add(v)
elif v.ub is not None and v.value >= v.ub + tolerance:
violated_bounds.add(v)
return violated_bounds
def _vars_with_none_value(model):
none_value = ComponentSet()
for v in model.component_data_objects(Var, descend_into=True):
if v.value is None:
none_value.add(v)
return none_value
def _vars_with_extreme_values(model, large, small, zero):
extreme_vars = ComponentSet()
for v in model.component_data_objects(Var, descend_into=True):
if v.value is not None:
mag = abs(value(v))
if mag > abs(large):
extreme_vars.add(v)
elif mag < abs(small) and mag > abs(zero):
extreme_vars.add(v)
return extreme_vars
def _write_report_section(
stream,
lines_list,
title=None,
line_if_empty=None,
end_line=None,
header="-",
footer=None,
):
"""
Writes output in standard format for report and display methods.
Args:
stream: stream to write to
lines_list: list containing lines to be written in body of report
title: title to be put at top of report
line_if_empty: line to be written if lines_list is empty
end_line: line to be written at end of report
header: character to use to write header separation line
footer: character to use to write footer separation line
Returns:
None
"""
stream.write(f"{header * MAX_STR_LENGTH}\n")
if title is not None:
stream.write(f"{title}\n\n")
if len(lines_list) > 0:
for i in lines_list:
stream.write(f"{TAB}{i}\n")
elif line_if_empty is not None:
stream.write(f"{TAB}{line_if_empty}\n")
stream.write("\n")
if end_line is not None:
stream.write(f"{end_line}\n")
if footer is not None:
stream.write(f"{footer * MAX_STR_LENGTH}\n")
def _collect_model_statistics(model):
vars_in_constraints = variables_in_activated_constraints_set(model)
fixed_vars_in_constraints = ComponentSet()
free_vars_in_constraints = ComponentSet()
free_vars_lb = ComponentSet()
free_vars_ub = ComponentSet()
free_vars_lbub = ComponentSet()
ext_fixed_vars_in_constraints = ComponentSet()
ext_free_vars_in_constraints = ComponentSet()
for v in vars_in_constraints:
if v.fixed:
fixed_vars_in_constraints.add(v)
if not _var_in_block(v, model):
ext_fixed_vars_in_constraints.add(v)
else:
free_vars_in_constraints.add(v)
if not _var_in_block(v, model):
ext_free_vars_in_constraints.add(v)
if v.lb is not None:
if v.ub is not None:
free_vars_lbub.add(v)
else:
free_vars_lb.add(v)
elif v.ub is not None:
free_vars_ub.add(v)
# Generate report
# TODO: Binary and boolean vars
stats = []
stats.append(
f"{TAB}Activated Blocks: {len(activated_blocks_set(model))} "
f"(Deactivated: {len(deactivated_blocks_set(model))})"
)
stats.append(
f"{TAB}Free Variables in Activated Constraints: "
f"{len(free_vars_in_constraints)} "
f"(External: {len(ext_free_vars_in_constraints)})"
)
stats.append(f"{TAB * 2}Free Variables with only lower bounds: {len(free_vars_lb)}")
stats.append(f"{TAB * 2}Free Variables with only upper bounds: {len(free_vars_ub)}")
stats.append(
f"{TAB * 2}Free Variables with upper and lower bounds: "
f"{len(free_vars_lbub)}"
)
stats.append(
f"{TAB}Fixed Variables in Activated Constraints: "
f"{len(fixed_vars_in_constraints)} "
f"(External: {len(ext_fixed_vars_in_constraints)})"
)
stats.append(
f"{TAB}Activated Equality Constraints: {len(activated_equalities_set(model))} "
f"(Deactivated: {len(deactivated_equalities_set(model))})"
)
stats.append(
f"{TAB}Activated Inequality Constraints: {len(activated_inequalities_set(model))} "
f"(Deactivated: {len(deactivated_inequalities_set(model))})"
)
stats.append(
f"{TAB}Activated Objectives: {len(activated_objectives_set(model))} "
f"(Deactivated: {len(deactivated_objectives_set(model))})"
)
return stats
[docs]
class ConstraintTermAnalysisVisitor(EXPR.StreamBasedExpressionVisitor):
"""
Expression walker for checking Constraints for problematic terms.
This walker will walk the expression and look for summation terms
with mismatched magnitudes or potential cancellations.
Args:
term_mismatch_tolerance: tolerance to use when determining mismatched
terms
term_cancellation_tolerance: tolerance to use when identifying
possible cancellation of terms
term_zero_tolerance: tolerance for considering terms equal to zero
max_canceling_terms: maximum number of terms to consider when looking
for canceling combinations (None = consider all possible combinations)
max_cancellations_per_node: maximum number of cancellations to collect
for a single node. Collection will terminate when this many cancellations
have been identified (None = collect all cancellations)
Returns:
list of values for top-level summation terms
list of terms with mismatched magnitudes
list of terms with potential cancellations
bool indicating whether expression is a constant
"""
def __init__(
self,
term_mismatch_tolerance: float = 1e6,
term_cancellation_tolerance: float = 1e-4,
term_zero_tolerance: float = 1e-10,
max_canceling_terms: int = 4,
max_cancellations_per_node: int = 5,
):
super().__init__()
# Tolerance attributes
self._log_mm_tol = log10(term_mismatch_tolerance)
self._sum_tol = term_cancellation_tolerance
self._zero_tolerance = term_zero_tolerance
self._max_canceling_terms = max_canceling_terms
self._max_cancellations_per_node = max_cancellations_per_node
# Placeholders for collecting results
self.canceling_terms = ComponentMap()
self.mismatched_terms = ComponentMap()
# Flag for if cancellation collection hit limit
self._cancellation_tripped = False
def _get_value_for_sum_subexpression(self, child_data):
# child_data is a tuple, with the 0-th element being the node values
if isinstance(child_data[0][0], str):
# Values may be a list containing a string in some cases (e.g. external functions)
# Return the string in this case
return child_data[0][0]
return sum(i for i in child_data[0])
def _generate_combinations(self, inputs, equality=False):
# We want to test all combinations of terms for cancellation
# The number of combinations we check depends on whether this is an (in)equality
# expression or a sum node deeper in the expression tree.
# We expect (in)equalities to generally sum to 0 (0 == expr) thus we want to
# check if any subset of the sum terms sum to zero (i.e. are any terms unnecessary).
# For other sum nodes, we need to check for any combination of terms.
# Maximum number of terms to include in combinations
max_comb = len(inputs)
if equality:
# Subtract 1 if (in)equality node
max_comb += -1
# We also have a limit on the maximum number of terms to consider
if self._max_canceling_terms is not None:
max_comb = min(max_comb, self._max_canceling_terms)
# Single terms cannot cancel, thus we want all combinations of length 2 to max terms
# Note the need for +1 due to way range works
combo_range = range(2, max_comb + 1)
# Collect combinations of terms in an iterator
# In order to identify the terms in each cancellation, we will pair each value
# with its index in the input set as a tuple using enumerate
for i in chain.from_iterable(
combinations(enumerate(inputs), r) for r in combo_range
):
# Yield each combination in the set
yield i
def _check_sum_cancellations(self, values_list, equality=False):
# First, strip any terms with value 0 as they do not contribute to cancellation
# We do this to keep the number of possible combinations as small as possible
stripped = [i for i in values_list if abs(i) >= self._zero_tolerance]
cancellations = []
if len(stripped) == 0:
# If the stripped list is empty, there are no non-zero terms
# We can stop here and return False as there are no possible cancellations
return cancellations
# For scaling of tolerance, we want to compare to the largest absolute value of
# the input values
max_value = abs(max(stripped, key=abs))
for i in self._generate_combinations(stripped, equality):
# Generate combinations will return a list of combinations of input terms
# each element of the list will be a 2-tuple representing a term in the
# input list with the first value being the position in the input set and
# the second being the value
# Check if the sum of values in the combination are below tolerance
if abs(sum(j[1] for j in i)) <= self._sum_tol * max_value:
# If so, record combination as canceling
cancellations.append(i)
# Terminate loop if we have reached the max cancellations to collect
if len(cancellations) >= self._max_cancellations_per_node:
self._cancellation_tripped = True
break
return cancellations
def _perform_checks(self, node, child_data):
# Perform checks for problematic expressions
# First, need to check to see if any child data is a list
# This indicates a sum expression
const = True
for d in child_data:
# We will check for canceling terms here, rather than the sum itself, to handle special cases
# We want to look for cases where a sum term results in a value much smaller
# than the terms of the sum
# Each element of child_data is a tuple where the 0-th element is the node values
if isinstance(d[0][0], str):
# Values may be a list containing a string in some cases (e.g. external functions)
# Skip if this is the case
pass
else:
for c in self._check_sum_cancellations(d[0]):
if node in self.canceling_terms.keys():
self.canceling_terms[node].append(c)
else:
self.canceling_terms[node] = [c]
# Expression is not constant if any child is not constant
# Element 1 is a bool indicating if the child node is constant
if not d[1]:
const = False
# Return any problematic terms found
return const
def _check_base_type(self, node):
if isinstance(node, VarData):
const = node.fixed
else:
const = True
return [value(node)], const, False
def _check_equality_expression(self, node, child_data):
# (In)equality expressions are a special case of sum expressions
# child_data has two elements; 0 is the LHS of the (in)equality and 1 is the RHS
# Each of these then contains three elements; 0 is a list of values for the sum components,
# 1 is a bool indicating if the child node term is constant, and 2 is a bool indicating if
# the child ode is a sum expression.
# First, to check for cancellation we need to negate one side of the expression
# mdata will contain the new set of child_data with negated values
mdata = []
# child_data[0][0] has the values of the LHS of the (in)equality, and we will negate these
vals = []
for j in child_data[0][0]:
vals.append(-j)
# Append the negated values along with whether the node is constant to mdata
mdata.append((vals, child_data[0][1]))
# child_data[1] is the RHS, so we take this as it appears and append to mdata
mdata.append(child_data[1])
# Next, call the method to check the sum expression
vals, const, _ = self._check_sum_expression(node, mdata)
# Next, we need to check for canceling terms.
# child_data[x][2] indicates if a node is a sum expression or not
if not child_data[0][2] and not child_data[1][2]:
# If both sides are not sum expressions, we have the form a == b
# Simple lLinking constraints do not need to be checked
pass
# Next, we can ignore any term that has already been flagged as mismatched
elif node in self.mismatched_terms.keys():
pass
# We can also ignore any case where one side of the (in)equality is constant
# I.e. if either child_node[x][1] is True
elif any(d[1] for d in mdata):
pass
else:
# Check for cancellation
# First, collect terms from both sides
# Note: outer loop comes first in list comprehension
t = [i for d in mdata for i in d[0]]
# Then check for cancellations
for c in self._check_sum_cancellations(t, equality=True):
if node in self.canceling_terms.keys():
self.canceling_terms[node].append(c)
else:
self.canceling_terms[node] = [c]
return vals, const, False
def _check_product_expr(self, node, child_data):
# We expect child_data to be a tuple of len 2 (a*b)
# If both or neither a and b are sums (xor), handle like other expressions
if not child_data[0][2] ^ child_data[1][2]:
return self._check_general_expr(node, child_data)
else:
# Here we have the case of a sum and a multiplier
# For this case, we will make the result look like a sum with the
# multiplier applied
# This is important for scaling of the sum terms, as cancellation should be
# Checked on the scaled value
# First, check if both terms are constants - if so we can just get the value of
# this node and move on.
if child_data[0][1] and child_data[1][1]:
return self._check_general_expr(node, child_data)
# The length of the values (child_data[i][0]) of the multiplier will be 1
# We can just iterate over all terms in both value lists and multiply
# each pair
vals = [i * j for i in child_data[0][0] for j in child_data[1][0]]
# Return the list of values, not constant, is a sum expression (apparent)
return vals, False, True
def _check_div_expr(self, node, child_data):
# We expect child_data to be a tuple of len 2 (a/b)
# If the numerator is not a sum, handle like general expression
# child_data[0] is numerator, child_data[1] is denominator
if not child_data[0][2]:
return self._check_general_expr(node, child_data)
else:
# If the numerator is a sum, we will treat this as if the division
# were applied to each term in the numerator separately
# This is important for scaling of the sum terms, as cancellation should be
# Checked on the scaled value
# First, check if the numerator is constant - if so we can just get the value of
# this node and move on.
if child_data[0][1]:
return self._check_general_expr(node, child_data)
# Next, we need to get the value of the denominator
denom = self._get_value_for_sum_subexpression(child_data[1])
try:
vals = [i / denom for i in child_data[0][0]]
except ZeroDivisionError:
raise ZeroDivisionError(
f"Error in ConstraintTermAnalysisVisitor: found division with denominator of 0 "
f"({str(node)})."
)
# Return the list of values, not constant, is a sum expression (apparent)
return vals, False, True
def _check_negation_expr(self, node, child_data):
# Here we need to defer checking of cancellations too due to different ways
# these can appear in an expression.
# We will simply negate all values on the child node values (child_data[0][0])
# and pass on the rest.
vals = [-i for i in child_data[0][0]]
# Return the list of values, not constant, is a sum expression (apparent)
return vals, child_data[0][1], child_data[0][2]
def _check_general_expr(self, node, child_data):
const = self._perform_checks(node, child_data)
try:
# pylint: disable-next=protected-access
val = node._apply_operation(
list(map(self._get_value_for_sum_subexpression, child_data))
)
except ValueError:
raise ValueError(
f"Error in ConstraintTermAnalysisVisitor: error evaluating {str(node)}."
)
except ZeroDivisionError:
raise ZeroDivisionError(
f"Error in ConstraintTermAnalysisVisitor: found division with denominator of 0 "
f"({str(node)})."
)
except Exception as err:
# Catch and re-raise any other error that occurs
_log.exception(f"Unexpected {err=}, {type(err)=}")
raise
return [val], const, False
def _check_other_expression(self, node, child_data):
const = self._perform_checks(node, child_data)
# First, need to get value of input terms, which may be sub-expressions
input_mag = []
for i in child_data:
input_mag.append(self._get_value_for_sum_subexpression(i))
# Next, create a copy of the function with expected magnitudes as inputs
newfunc = node.create_node_with_local_data(input_mag)
# Evaluate new function and return the value along with check results
return [value(newfunc)], const, False
def _check_ranged_expression(self, node, child_data):
# child_data should have 3 elements, LHS, middle term, and RHS
lhs_vals, lhs_const, _ = self._check_equality_expression(node, child_data[:2])
rhs_vals, rhs_const, _ = self._check_equality_expression(node, child_data[1:])
# Constant is a bit vague in terms ranged expressions.
# We will call the term constant only if all parts are constant
const = lhs_const and rhs_const
# For values, we need to avoid double counting the middle term
# Also for sign convention, we will negate the outer terms
vals = lhs_vals + [-rhs_vals[1]]
return vals, const, False
def _check_sum_expression(self, node, child_data):
# Sum expressions need special handling
# For sums, collect all child values into a list
vals = []
# We will check for cancellation in this node at the next level
# Pyomo is generally good at simplifying compound sums
const = True
# Collect data from child nodes
for d in child_data:
vals.append(self._get_value_for_sum_subexpression(d))
# Expression is not constant if any child is not constant
# Element 1 is a bool indicating if node is constant
if not d[1]:
const = False
# Check for mismatched terms
if len(vals) > 1:
absvals = [abs(v) for v in vals if abs(v) >= self._zero_tolerance]
if len(absvals) > 0:
vl = max(absvals)
vs = min(absvals)
if vl != vs:
if vs == 0:
diff = log10(vl)
else:
diff = log10(vl / vs)
if diff >= self._log_mm_tol:
self.mismatched_terms[node] = (vl, vs)
return vals, const, True
node_type_method_map = {
EXPR.EqualityExpression: _check_equality_expression,
EXPR.InequalityExpression: _check_equality_expression,
EXPR.RangedExpression: _check_ranged_expression,
EXPR.SumExpression: _check_sum_expression,
EXPR.NPV_SumExpression: _check_sum_expression,
EXPR.ProductExpression: _check_product_expr,
EXPR.MonomialTermExpression: _check_product_expr,
EXPR.NPV_ProductExpression: _check_product_expr,
EXPR.DivisionExpression: _check_div_expr,
EXPR.NPV_DivisionExpression: _check_div_expr,
EXPR.PowExpression: _check_general_expr,
EXPR.NPV_PowExpression: _check_general_expr,
EXPR.NegationExpression: _check_negation_expr,
EXPR.NPV_NegationExpression: _check_negation_expr,
EXPR.AbsExpression: _check_general_expr,
EXPR.NPV_AbsExpression: _check_general_expr,
EXPR.UnaryFunctionExpression: _check_general_expr,
EXPR.NPV_UnaryFunctionExpression: _check_general_expr,
EXPR.Expr_ifExpression: _check_other_expression,
EXPR.ExternalFunctionExpression: _check_other_expression,
EXPR.NPV_ExternalFunctionExpression: _check_other_expression,
EXPR.LinearExpression: _check_sum_expression,
}
[docs]
def exitNode(self, node, data):
"""
Method to call when exiting node to check for potential issues.
"""
# Return [node values], constant (bool), sum expression (bool)
# first check if the node is a leaf
nodetype = type(node)
if nodetype in native_types:
return [node], True, False
node_func = self.node_type_method_map.get(nodetype, None)
if node_func is not None:
return node_func(self, node, data)
if not node.is_expression_type():
# this is a leaf, but not a native type
if nodetype is _PyomoUnit:
# Unit have no value, so return 1 as a placeholder
return [1], True, False
# Var or Param
return self._check_base_type(node)
# might want to add other common types here
# not a leaf - check if it is a named expression
if (
hasattr(node, "is_named_expression_type")
and node.is_named_expression_type()
):
return self._check_other_expression(node, data)
raise TypeError(
f"An unhandled expression node type: {str(nodetype)} was encountered while "
f"analyzing constraint terms {str(node)}"
)
[docs]
def walk_expression(self, expr):
"""
Main method to call to walk an expression and return analysis.
Args:
expr - expression to be analyzed
Returns:
list of values of top-level additive terms
ComponentSet containing any mismatched terms
ComponentSet containing any canceling terms
Bool indicating whether expression is a constant
"""
# Create new holders for collected terms
self.canceling_terms = ComponentMap()
self.mismatched_terms = ComponentMap()
# Call parent walk_expression method
vals, const, _ = super().walk_expression(expr)
# Return results
return (
vals,
self.mismatched_terms,
self.canceling_terms,
const,
self._cancellation_tripped,
)