#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2026 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################
"""
Utility functions for scaling.
Authors: Andrew Lee, Douglas Allan
"""
from copy import deepcopy
import math
import sys
import json
import numpy as np
from scipy.linalg import norm, pinv
from scipy.sparse import diags
from scipy.sparse.linalg import inv as spinv, norm as spnorm
from pyomo.environ import (
Binary,
Block,
Boolean,
Constraint,
Expression,
NegativeIntegers,
NegativeReals,
NonNegativeIntegers,
NonNegativeReals,
NonPositiveIntegers,
NonPositiveReals,
Objective,
PositiveIntegers,
PositiveReals,
Reference,
Suffix,
value,
Var,
)
from pyomo.core.base.block import BlockData
from pyomo.core.base.var import VarData
from pyomo.core.base.constraint import ConstraintData
from pyomo.core.base.expression import ExpressionData
from pyomo.core.base.param import ParamData
from pyomo.core import expr as EXPR
from pyomo.core.base.suffix import SuffixFinder
from pyomo.core.base.units_container import _PyomoUnit
from pyomo.common.collections import ComponentSet
from pyomo.common.modeling import unique_component_name
from pyomo.common.numeric_types import native_types
from pyomo.dae import DerivativeVar
from pyomo.dae.flatten import slice_component_along_sets
from pyomo.contrib.pynumero.interfaces.pyomo_nlp import PyomoNLP
from pyomo.contrib.pynumero.asl import AmplInterface
from idaes.core.util.exceptions import BurntToast
import idaes.logger as idaeslog
_log = idaeslog.getLogger(__name__)
TAB = " " * 4
def _filter_unknown(block_data):
# It can be confusing to users to see a block named "unknown" appear in
# an error message, but that's, unfortunately, what Pyomo uses as the
# default name of ConcreteModels. Therefore, filter out that case.
block_name = block_data.name
if block_name == "unknown" and block_data.model() is block_data:
return "model"
return f"block {block_name}"
[docs]
def get_scaling_factor_suffix(blk: BlockData):
"""
Get scaling suffix from block.
Args:
blk: component to get scaling factor suffix for
Returns:
Pyomo scaling Suffix
Raises:
TypeError if component is an IndexedBlock
"""
if isinstance(blk, BlockData):
pass
elif isinstance(blk, Block):
raise TypeError(
"IndexedBlocks cannot have scaling factors attached to them. "
"Please assign scaling factors to the elements of the IndexedBlock."
)
else:
raise TypeError(
f"Component {blk.name} was not a BlockData, instead it was a {type(blk)}"
)
try:
sfx = blk.scaling_factor
except AttributeError:
# No existing suffix, create one
_log.debug(f"Created new scaling suffix for {_filter_unknown(blk)}")
sfx = blk.scaling_factor = Suffix(direction=Suffix.EXPORT)
if not sfx.active:
raise RuntimeError(
f"The scaling suffix on {_filter_unknown(blk)} has been deactivated. "
"Typically, this means that the user has performed a scaling transformation "
"on the model."
)
return sfx
[docs]
def get_scaling_hint_suffix(blk: BlockData):
"""
Get scaling hint suffix from block.
Creates a new suffix if one is not found.
Args:
blk: block to get suffix for
Returns:
Pyomo scaling hint Suffix
Raises:
TypeError if component is an IndexedBlock or non-block.
"""
if isinstance(blk, BlockData):
pass
elif isinstance(blk, Block):
raise TypeError(
"IndexedBlocks cannot have scaling hints attached to them. "
"Please assign scaling hints to the elements of the IndexedBlock."
)
else:
raise TypeError(
f"Component {blk.name} was not a BlockData, instead it was a {type(blk)}"
)
try:
sfx = blk.scaling_hint
except AttributeError:
# No existing suffix, create one
_log.debug(f"Created new scaling hint suffix for {_filter_unknown(blk)}")
sfx = blk.scaling_hint = Suffix()
return sfx
[docs]
def get_component_scaling_suffix(component):
"""
Get scaling suffix appropriate to component type from parent block.
Creates a new suffix if one is not found.
Args:
component: component to get suffix for
Returns:
Pyomo scaling factor Suffix (for VarData and ConstraintData)
or Pyomo scaling hint Suffix (for ExpressionData)
Raises:
TypeError if component isn't a VarData, ConstraintData, or ExpressionData.
"""
blk = component.parent_block()
if isinstance(component, (VarData, ConstraintData)):
return get_scaling_factor_suffix(blk)
elif isinstance(component, ExpressionData):
return get_scaling_hint_suffix(blk)
else:
raise TypeError(
"Can get scaling factors for only VarData, ConstraintData, and (hints from) ExpressionData. "
f"Component {component.name} is instead {type(component)}."
)
[docs]
def scaling_factors_to_dict(blk_or_suffix, descend_into: bool = True):
"""
Write scaling factor and/or scaling hint suffixes to a serializable
json dict. If a Block, indexed or otherwise is passed, this function
collects both scaling factors and hints. If a suffix is passed
directly, it serializes only that suffix (factors or hints) and leaves
the other suffix (hints or factors) out of the resulting dict.
Component objects are replaced by their local names so they can be
serialized.
Args:
blk_or_suffix: Pyomo Block or Suffix object to covert to dict
descend_into: for Blocks, whether to descend into any child blocks
Returns
dict containing scaling factors and/or scaling hints indexed by
component names
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
# First, determine what type of component we have
if isinstance(blk_or_suffix, Suffix):
out_dict = {"suffix": _suffix_to_dict(blk_or_suffix)}
blk = blk_or_suffix.parent_block()
elif isinstance(blk_or_suffix, BlockData):
# Scalar block or element of indexed block
out_dict = _collect_block_suffixes(blk_or_suffix, descend_into=descend_into)
blk = blk_or_suffix
elif isinstance(blk_or_suffix, Block):
# Indexed block
blk = blk_or_suffix
out_dict = {"block_data": {}}
for bd in blk_or_suffix.values():
out_dict["block_data"][bd.name] = _collect_block_suffixes(
bd, descend_into=descend_into
)
else:
# Not a block or suffix
raise TypeError(
f"{blk_or_suffix.name} is not an instance of a Block of Suffix."
)
# Attach block name for future verification
out_dict["block_name"] = blk.name
return out_dict
[docs]
def scaling_factors_from_dict(
blk_or_suffix,
json_dict: dict,
overwrite: bool = False,
verify_names: bool = True,
):
"""
Set scaling factors and/or scaling hints based on values in a serializable json dict.
This method expects components to be referenced by their local names.
Args:
blk_or_suffix: Pyomo Block or Suffix object to set scaling factors on
json_dict: dict of scaling factors and/or scaling hints to load into model
overwrite: (bool) whether to overwrite existing scaling factors/hints or not
verify_names: (bool) whether to verify that all names in dict exist on block
Returns
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
# First, copy json_dict so we do not mutate original
sdict = deepcopy(json_dict)
# Pop block name for verification
block_name = sdict.pop("block_name")
# Next, determine what type of component we have
if isinstance(blk_or_suffix, Suffix):
# Suffix
if verify_names and block_name != blk_or_suffix.parent_block().name:
raise ValueError(
f"Name of parent block ({blk_or_suffix.parent_block().name}) does "
f"not match that recorded in json_dict ({block_name})"
)
_suffix_from_dict(
blk_or_suffix,
sdict["suffix"],
overwrite=overwrite,
verify_names=verify_names,
)
elif isinstance(blk_or_suffix, BlockData):
# Scalar block or element of indexed block
if verify_names and block_name != blk_or_suffix.name:
raise ValueError(
f"Block name ({blk_or_suffix.name}) does "
f"not match that recorded in json_dict ({block_name})"
)
_set_block_suffixes_from_dict(
blk_or_suffix, sdict, overwrite=overwrite, verify_names=verify_names
)
elif isinstance(blk_or_suffix, Block):
# Indexed block
if verify_names and block_name != blk_or_suffix.name:
raise ValueError(
f"Block name ({blk_or_suffix.name}) does "
f"not match that recorded in json_dict ({block_name})"
)
for bd_name, bd_dict in sdict["block_data"].items():
bd = blk_or_suffix.parent_block().find_component(bd_name)
_set_block_suffixes_from_dict(
bd, bd_dict, overwrite=overwrite, verify_names=verify_names
)
else:
# Not a block or suffix
raise TypeError(
f"{blk_or_suffix.name} is not an instance of a Block of Suffix."
)
[docs]
def scaling_factors_to_json_file(blk_or_suffix, filename: str):
"""
Serialize scaling factors to file in json format.
Args:
blk_of_suffix: Block or Suffix to save scaling factors for
filename: name of file to write to as string
Returns:
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
with open(filename, "w") as fd:
json.dump(scaling_factors_to_dict(blk_or_suffix), fd, indent=3)
[docs]
def scaling_factors_from_json_file(
blk_or_suffix, filename: str, overwrite: bool = False, verify_names: bool = True
):
"""
Load scaling factors from json file.
Args:
blk_of_suffix: Block or Suffix to load scaling factors for
filename: name of file to load as string
overwrite: (bool) whether to overwrite existing scaling factors or not
verify_names: (bool) whether to verify that all names in dict exist on block
Returns:
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
with open(filename, "r") as f:
scaling_factors_from_dict(
blk_or_suffix, json.load(f), overwrite=overwrite, verify_names=verify_names
)
f.close()
def _collect_block_suffixes(block_data, descend_into=True):
sf_suffix = get_scaling_factor_suffix(block_data)
sh_suffix = get_scaling_hint_suffix(block_data)
out_dict = {
"scaling_factor_suffix": _suffix_to_dict(sf_suffix),
"scaling_hint_suffix": _suffix_to_dict(sh_suffix),
}
if descend_into:
out_dict["subblock_suffixes"] = {}
for sb in block_data.component_data_objects(Block, descend_into=False):
out_dict["subblock_suffixes"][sb.local_name] = _collect_block_suffixes(
sb, descend_into
)
return out_dict
def _set_block_suffixes_from_dict(
block_data, json_dict, verify_names=True, overwrite=False
):
# First, copy dict so we can take it apart
sdict = deepcopy(json_dict)
# Pop any subblock suffixes
sb_dict = sdict.pop("subblock_suffixes", None)
sf_dict = sdict.pop("scaling_factor_suffix", None)
sh_dict = sdict.pop("scaling_hint_suffix", None)
# Set local suffix values
sf_suffix = get_scaling_factor_suffix(block_data)
sh_suffix = get_scaling_hint_suffix(block_data)
if sf_dict is not None:
_suffix_from_dict(
sf_suffix,
sf_dict,
verify_names=verify_names,
overwrite=overwrite,
valid_types=[VarData, ConstraintData],
)
elif verify_names:
raise KeyError(
f"Missing scaling factor dictionary for {_filter_unknown(block_data)}."
)
if sh_dict is not None:
_suffix_from_dict(
sh_suffix,
sh_dict,
verify_names=verify_names,
overwrite=overwrite,
valid_types=[ExpressionData],
)
elif verify_names:
raise KeyError(
f"Missing scaling hint dictionary for {_filter_unknown(block_data)}."
)
if sb_dict is not None:
# Get each subblock and apply function recursively
for sb, sb_dict_value in sb_dict.items():
subblock = block_data.find_component(sb)
if subblock is not None:
_set_block_suffixes_from_dict(
subblock,
sb_dict_value,
verify_names=verify_names,
overwrite=overwrite,
)
elif verify_names:
raise AttributeError(
f"{_filter_unknown(block_data)} does not have a subblock named {sb}.".capitalize()
)
def _suffix_to_dict(suffix):
sdict = {}
for k, v in suffix.items():
# Record components by their local name so we can use
# find_Component to retrieve them later
sdict[k.local_name] = v
return sdict
def _suffix_from_dict(
suffix, json_dict, verify_names=True, overwrite=False, valid_types=None
):
parent_block = suffix.parent_block()
for k, v in json_dict.items():
comp = parent_block.find_component(k)
if comp is not None:
if valid_types is not None and not any(
[isinstance(comp, cls) for cls in valid_types]
):
raise TypeError(
f"Expected {comp.name} to be a subclass of {valid_types}, "
f"but it was instead {type(comp)}"
)
if overwrite or comp not in suffix:
suffix[comp] = v
elif verify_names:
raise ValueError(
f"Could not find component {k} on {_filter_unknown(parent_block)}."
)
[docs]
def get_scaling_factor(component, default: float = None, warning: bool = False):
"""
Get scaling factor for component.
Args:
component: component to get scaling factor for
default: scaling factor to return if no scaling factor
exists for component
warning: Bool to determine whether a warning should be
returned if no scaling factor is found
Returns:
float scaling factor
Raises:
TypeError if component is not VarData, ConstraintData, or ExpressionData
"""
if component.is_indexed():
raise TypeError(
f"Component {component.name} is indexed. It is ambiguous which scaling factor to return."
)
if component.is_expression_type() and not component.is_named_expression_type():
raise TypeError(
"Can only get scaling hints for named expressions, but component was an unnamed expression."
)
if isinstance(component, (VarData, ConstraintData)):
sfx_finder = SuffixFinder("scaling_factor")
elif isinstance(component, ExpressionData):
sfx_finder = SuffixFinder("scaling_hint")
else:
raise TypeError(
f"Can get scaling factors for only VarData, ConstraintData, and (hints from) ExpressionData. "
f"Component {component.name} is instead {type(component)}."
)
sf = sfx_finder.find(component_data=component)
if sf is None:
if warning:
_log.warning(f"Missing scaling factor for {component.name}")
return default
else:
return sf
[docs]
def set_scaling_factor(component, scaling_factor: float, overwrite: bool = False):
"""
Set scaling factor for component.
Scaling factors must be positive, non-zero floats.
Args:
component: component to set scaling factor for
scaling_factor: scaling factor to assign
overwrite: (bool) whether to overwrite existing scaling factor
Returns:
None
Raises:
ValueError is scaling_factor is 0 or negative
"""
# Cast to float to catch any garbage inputs
scaling_factor = float(scaling_factor)
# Check for negative or 0 scaling factors
if scaling_factor < 0:
raise ValueError(
f"Scaling factor for {component.name} is negative ({scaling_factor}). "
"Scaling factors must be strictly positive."
)
elif scaling_factor == 0:
raise ValueError(
f"Scaling factor for {component.name} is zero. "
"Scaling factors must be strictly positive."
)
elif scaling_factor == float("inf"):
raise ValueError(
f"Scaling factor for {component.name} is infinity. "
"Scaling factors must be finite."
)
elif math.isnan(scaling_factor):
raise ValueError(f"Scaling factor for {component.name} is NaN.")
if component.is_indexed():
# What if a scaling factor already exists for the indexed component?
# for idx in component:
# set_scaling_factor(component[idx], scaling_factor=scaling_factor, overwrite=overwrite)
raise TypeError(
f"Component {component.name} is indexed. Set scaling factors for individual indices instead."
)
try:
sfx = get_component_scaling_suffix(component)
except RuntimeError as err:
raise RuntimeError(
f"Cannot set a scaling factor for {component.name} because the scaling_factor "
"suffix has been deactivated."
) from err
if not overwrite and component in sfx:
_log.debug(
f"Existing scaling factor for {component.name} found and overwrite=False. "
"Scaling factor unchanged."
)
else:
sfx[component] = scaling_factor
[docs]
def del_scaling_factor(component, delete_empty_suffix: bool = False):
"""
Delete scaling factor for component.
Args:
component: component to delete scaling factor for
delete_empty_suffix: (bool) whether to delete scaling Suffix if it is
empty after deletion.
"""
if component.is_indexed():
raise TypeError(
f"Component {component.name} is indexed. It is ambiguous which scaling factor to delete."
)
# Get suffix
parent = component.parent_block()
# TODO what if a scaling factor exists in a non-standard place?
sfx = get_component_scaling_suffix(component)
# Delete entry for component if it exists
# Pyomo handles case where value does not exist in suffix with a no-op
sfx.clear_value(component)
if delete_empty_suffix:
# Check if Suffix is empty (i.e. length 0)
if len(sfx) == 0:
# If so, delete suffix from parent block of component
if sfx.name == "scaling_factor":
_log.debug(f"Deleting empty scaling suffix from {parent.name}")
elif sfx.name == "scaling_hint":
_log.debug(f"Deleting empty scaling hint suffix from {parent.name}")
else:
raise BurntToast(
"This branch should be inaccessible, please report this issue "
"to the IDAES developers."
)
parent.del_component(sfx)
[docs]
def report_scaling_factors(
blk: Block, ctype=None, descend_into: bool = False, stream=None
):
"""
Write the scaling factors for all components in a Block to a stream.
Args:
blk: Block to get scaling factors and/or scaling hints from.
ctype: None, Var, Constraint, or Expression. Type of component to show scaling factors for
(if None, shows all elements).
descend_into: whether to show scaling factors for components in sub-blocks.
stream: StringIO object to write results to. If not provided, writes to stdout.
Raises:
TypeError if blk is not a Pyomo Block.
ValueError is ctype is not None, Var or Constraint.
"""
if stream is None:
stream = sys.stdout
if ctype not in [None, Var, Constraint, Expression]:
raise ValueError(
f"report_scaling_factors only supports None, Var, Constraint, or Expression for argument ctype: "
f"received {ctype}."
)
if not isinstance(blk, (Block, BlockData)):
raise TypeError(
"report_scaling_factors: blk must be an instance of a Pyomo Block."
)
stream.write(f"Scaling Factors for {_filter_unknown(blk)}\n")
# We will report Vars and Constraint is separate sections for clarity - iterate separately
if ctype == Var or ctype is None:
# Collect Vars
vdict = {}
for blkdata in blk.values():
for vardata in blkdata.component_data_objects(
Var, descend_into=descend_into
):
val = vardata.value
sf = get_scaling_factor(vardata)
if sf is not None:
sfstr = "{:.3E}".format(sf)
else:
sfstr = "None "
if val is not None:
vstr = "{:.3E}".format(val)
if sf is not None:
sval = "{:.3E}".format(value(vardata * sf))
else:
sval = vstr
else:
vstr = "None "
sval = "None"
vdict[vardata.name] = (sfstr, vstr, sval)
# Write Var section - skip if no Vars
if len(vdict) > 0:
# Get longest var name
header = "Variable"
maxname = len(max(vdict.keys(), key=len))
if maxname < len(header):
maxname = len(header)
stream.write(
f"\n{header}{' '*(maxname-len(header))}{TAB}Scaling Factor{TAB}Value{' '*4}{TAB}Scaled Value\n"
)
for n, i in vdict.items():
# Pad name to length
stream.write(
f"{n + ' '*(maxname-len(n))}{TAB}{i[0]}{' '*5}{TAB}{i[1]}{TAB}{i[2]}\n"
)
if ctype == Constraint or ctype is None:
# Collect Constraints
cdict = {}
for blkdata in blk.values():
for condata in blkdata.component_data_objects(
Constraint, descend_into=descend_into
):
sf = get_scaling_factor(condata)
if sf is not None:
sfstr = "{:.3E}".format(sf)
else:
sfstr = "None"
cdict[condata.name] = sfstr
# Write Constraint section - skip if no Constraints
if len(cdict) > 0:
# Get longest con name
header = "Constraint"
maxname = len(max(cdict.keys(), key=len))
if maxname < len(header):
maxname = len(header)
stream.write(
f"\n{header}{' ' * (maxname - len(header))}{TAB}Scaling Factor\n"
)
for n, i in cdict.items():
# Pad name to length
stream.write(f"{n + ' ' * (maxname - len(n))}{TAB}{i}\n")
if ctype == Expression or ctype is None:
# Collect Expressions
edict = {}
for blkdata in blk.values():
for exprdata in blkdata.component_data_objects(
Expression, descend_into=descend_into
):
sf = get_scaling_factor(exprdata)
if sf is not None:
sfstr = "{:.3E}".format(sf)
else:
sfstr = "None"
edict[exprdata.name] = sfstr
# Write Expression section - skip if no Expressions
if len(edict) > 0:
# Get longest con name
header = "Expression"
maxname = len(max(edict.keys(), key=len))
if maxname < len(header):
maxname = len(header)
stream.write(
f"\n{header}{' ' * (maxname - len(header))}{TAB}Scaling Hint\n"
)
for n, i in edict.items():
# Pad name to length
stream.write(f"{n + ' ' * (maxname - len(n))}{TAB}{i}\n")
# Unscaled variables and constraints generators adopted from old scaling tools,
# originally by John Eslick
[docs]
def unscaled_variables_generator(
blk: Block, descend_into: Boolean = True, include_fixed: Boolean = False
):
"""Generator for unscaled variables
Args:
block
Yields:
variables with no scale factor
"""
for v in blk.component_data_objects(Var, descend_into=descend_into):
if v.fixed and not include_fixed:
continue
if get_scaling_factor(v) is None:
yield v
[docs]
def list_unscaled_variables(
blk: Block, descend_into: bool = True, include_fixed: bool = False
):
"""
Return a list of variables which do not have a scaling factor assigned
Args:
blk: block to check for unscaled variables
descend_into: bool indicating whether to check variables in sub-blocks
include_fixed: bool indicating whether to include fixed Vars in list
Returns:
list of unscaled variable data objects
"""
return [c for c in unscaled_variables_generator(blk, descend_into, include_fixed)]
[docs]
def unscaled_constraints_generator(blk: Block, descend_into=True):
"""Generator for unscaled constraints
Args:
block
Yields:
constraints with no scale factor
"""
for c in blk.component_data_objects(
Constraint, active=True, descend_into=descend_into
):
if get_scaling_factor(c) is None:
yield c
[docs]
def list_unscaled_constraints(blk: Block, descend_into: bool = True):
"""
Return a list of constraints which do not have a scaling factor assigned
Args:
blk: block to check for unscaled constraints
descend_into: bool indicating whether to check constraints in sub-blocks
Returns:
list of unscaled constraint data objects
"""
return [c for c in unscaled_constraints_generator(blk, descend_into)]
[docs]
def get_nominal_value(component):
"""
Get the signed nominal value for a VarData or ParamData component.
For Params, the current value of the component will be returned.
For Vars, the nominal value is determined using the assigned scaling factor
and the sign determined based on the bounds and domain of the variable (defaulting to
positive). If no scaling factor is set, then the current value will be used if set,
otherwise the absolute nominal value will be equal to 1.
Args:
component: component to determine nominal value for
Returns:
signed float with nominal value
Raises:
TypeError if component is not instance of VarData or ParamData
"""
# Determine if Var or Param
if isinstance(component, VarData):
# Get scaling factor for Var
sf = get_scaling_factor(component)
if sf is None:
# No scaling factor - see if Var has a value
if component.value is not None:
# If it has a value, use that as the nominal value
# As we are using the actual value, do not need to determine sign
return value(component)
else:
# Otherwise assign a nominal value of 1
sf = 1
# Try to determine expected sign of node
ub = component.ub
lb = component.lb
domain = component.domain
# To avoid NoneType errors, assign dummy values in place of None
if ub is None:
# No upper bound, take a positive value
ub = 1000
if lb is None:
# No lower bound, take a negative value
lb = -1000
if lb >= 0 or domain in [
NonNegativeReals,
PositiveReals,
PositiveIntegers,
NonNegativeIntegers,
Boolean,
Binary,
]:
# Strictly positive
sign = 1
elif ub <= 0 or domain in [
NegativeReals,
NonPositiveReals,
NegativeIntegers,
NonPositiveIntegers,
]:
# Strictly negative
sign = -1
else:
# Unbounded, see if there is a current value
# Assume positive until proven otherwise
sign = 1
if component.value is not None:
val = value(component)
if val < 0:
# Assigned negative value, assume value will remain negative
sign = -1
return sign / sf
elif isinstance(component, ParamData):
# Nominal value of a parameter is always its value
return value(component)
else:
# Not a Var or Param - invalid component type
raise TypeError(
f"get_nominal_value - {component.name} is not an instance of a Var or Param."
)
class NominalValueExtractionVisitor(EXPR.StreamBasedExpressionVisitor):
"""
Expression walker for collecting scaling factors in an expression and determining the
expected value of the expression using the scaling factors as nominal inputs.
By default, the get_nominal_value method is used to determine the nominal value for
all Vars and Params in the expression, however this can be changed by setting the
nominal_value_callback argument.
Returns a list of expected values for each additive term in the expression.
In order to properly assess the expected value of terms within functions, the sign
of each term is maintained throughout thus returned values may be negative. Functions
using this walker should handle these appropriately.
"""
def __init__(self, nominal_value_callback=get_nominal_value):
"""
Visitor class used to determine nominal values of all terms in an expression based on
scaling factors assigned to the associated variables. Do not use this class directly.
Args:
nominal_value_callback - method to use to get nominal value of root nodes.
Notes
-----
This class inherits from the :class:`StreamBasedExpressionVisitor` to implement
a walker that returns the nominal value corresponding to all additive terms in an
expression.
There are class attributes (dicts) that map the expression node type to the
particular method that should be called to return the nominal value of the node based
on the nominal value of its child arguments. This map is used in exitNode.
"""
super().__init__()
self._nominal_value_callback = nominal_value_callback
def _get_magnitude_base_type(self, node):
try:
return [self._nominal_value_callback(node)]
except TypeError:
# Not a Var or Param - something went wrong
raise BurntToast(
"NominalValueExtractionVisitor found root node that was not a Var or Param. "
"This should never happen - please contact the developers with this bug."
)
def _get_nominal_value_for_sum_subexpression(self, child_nominal_values):
return sum(i for i in child_nominal_values)
def _get_nominal_value_for_sum(self, node, child_nominal_values):
# For sums, collect all child values into a list
mag = []
for i in child_nominal_values:
for j in i:
mag.append(j)
return mag
def _get_nominal_value_for_product(self, node, child_nominal_values):
mag = []
for i in child_nominal_values[0]:
for j in child_nominal_values[1]:
mag.append(i * j)
return mag
def _get_nominal_value_for_division(self, node, child_nominal_values):
numerator = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[0]
)
denominator = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[1]
)
if denominator == 0:
# Assign a nominal value of 1 so that we can continue
denominator = 1
# Log a warning for the user
_log.warning(
"Nominal value of 0 found in denominator of division expression. "
"Assigning a value of 1. You should check you scaling factors and models to "
"ensure there are no values of 0 that can appear in these functions."
)
return [numerator / denominator]
def _get_nominal_value_for_power(self, node, child_nominal_values):
# Use the absolute value of the base term to avoid possible complex numbers
base = abs(
self._get_nominal_value_for_sum_subexpression(child_nominal_values[0])
)
exponent = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[1]
)
return [base**exponent]
def _get_nominal_value_single_child(self, node, child_nominal_values):
return child_nominal_values[0]
def _get_nominal_value_abs(self, node, child_nominal_values):
return [abs(i) for i in child_nominal_values[0]]
def _get_nominal_value_negation(self, node, child_nominal_values):
return [-i for i in child_nominal_values[0]]
def _get_nominal_value_for_unary_function(self, node, child_nominal_values):
func_name = node.getname()
func_nominal = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[0]
)
func = getattr(math, func_name)
try:
return [func(func_nominal)]
except ValueError:
raise ValueError(
f"Evaluation error occurred when getting nominal value in {func_name} "
f"expression with input {func_nominal}. You should check you scaling factors "
f"and model to address any numerical issues or scale this constraint manually."
)
def _get_nominal_value_expr_if(self, node, child_nominal_values):
return child_nominal_values[1] + child_nominal_values[2]
def _get_nominal_value_external_function(self, node, child_nominal_values):
# First, need to get expected magnitudes of input terms, which may be sub-expressions
input_mag = []
for i in child_nominal_values:
if isinstance(i[0], str):
# Sometimes external functions might have string arguments
# Check here, and return the string if true
input_mag.append(i[0])
else:
input_mag.append(self._get_nominal_value_for_sum_subexpression(i))
# Next, create a copy of the external function with expected magnitudes as inputs
newfunc = node.create_node_with_local_data(input_mag)
# Evaluate new function and return the absolute value
return [value(newfunc)]
node_type_method_map = {
EXPR.EqualityExpression: _get_nominal_value_for_sum,
EXPR.InequalityExpression: _get_nominal_value_for_sum,
EXPR.RangedExpression: _get_nominal_value_for_sum,
EXPR.SumExpression: _get_nominal_value_for_sum,
EXPR.NPV_SumExpression: _get_nominal_value_for_sum,
EXPR.ProductExpression: _get_nominal_value_for_product,
EXPR.MonomialTermExpression: _get_nominal_value_for_product,
EXPR.NPV_ProductExpression: _get_nominal_value_for_product,
EXPR.DivisionExpression: _get_nominal_value_for_division,
EXPR.NPV_DivisionExpression: _get_nominal_value_for_division,
EXPR.PowExpression: _get_nominal_value_for_power,
EXPR.NPV_PowExpression: _get_nominal_value_for_power,
EXPR.NegationExpression: _get_nominal_value_negation,
EXPR.NPV_NegationExpression: _get_nominal_value_negation,
EXPR.AbsExpression: _get_nominal_value_abs,
EXPR.NPV_AbsExpression: _get_nominal_value_abs,
EXPR.UnaryFunctionExpression: _get_nominal_value_for_unary_function,
EXPR.NPV_UnaryFunctionExpression: _get_nominal_value_for_unary_function,
EXPR.Expr_ifExpression: _get_nominal_value_expr_if,
EXPR.ExternalFunctionExpression: _get_nominal_value_external_function,
EXPR.NPV_ExternalFunctionExpression: _get_nominal_value_external_function,
EXPR.LinearExpression: _get_nominal_value_for_sum,
}
def beforeChild(self, node, child, child_idx):
"""
Callback for :class:`pyomo.core.current.StreamBasedExpressionVisitor`. This method
is called before entering a child node. If we encounter a named Expression with
a scaling hint, then we use that scaling hint instead of descending further into
the expression tree.
"""
if isinstance(child, ExpressionData):
sf = get_scaling_factor(child, warning=False)
if sf is not None:
# Crude way to determine sign of expression. Maybe fbbt could be used here?
try:
val = value(child)
except ValueError:
# Some variable isn't defined, etc.
val = 1
if val < 0:
return (False, [-1 / sf])
else:
return (False, [1 / sf])
return (True, None)
def exitNode(self, node, data):
"""Callback for :class:`pyomo.core.current.StreamBasedExpressionVisitor`. This
method is called when moving back up the tree in a depth first search."""
# first check if the node is a leaf
nodetype = type(node)
if nodetype in native_types:
return [node]
node_func = self.node_type_method_map.get(nodetype, None)
if node_func is not None:
return node_func(self, node, data)
elif not node.is_expression_type():
# this is a leaf, but not a native type
if nodetype is _PyomoUnit:
return [1]
else:
return self._get_magnitude_base_type(node)
# might want to add other common types here
# not a leaf - check if it is a named expression
if (
hasattr(node, "is_named_expression_type")
and node.is_named_expression_type()
):
return self._get_nominal_value_single_child(node, data)
raise TypeError(
f"An unhandled expression node type: {str(nodetype)} was encountered while "
f"retrieving the nominal value of expression {str(node)}"
)
# Based off of John Eslick's functions in the old scaling tools
[docs]
def get_jacobian(
m,
include_scaling_factors=True,
include_ipopt_autoscaling=False,
equality_constraints_only=False,
max_grad=100,
min_scale=1e-8,
):
"""
Get the Jacobian matrix at the current model values. This function also
returns the Pynumero NLP which can be used to identify the constraints and
variables corresponding to the rows and columns.
Args:
m: model to get Jacobian from
include_scaling_factors: if True scale the rows and columns of the Jacobian
using the user-defined scaling factors in the scaling_factor suffix.
include_ipopt_autoscaling: if True, include the gradient-based autoscaling
step that IPOPT uses to scale the rows of the Jacobian
equality_constraints_only: Option to include only equality constraints
in the calculated Jacobian.
max_grad: The value of the norm of the constraint gradient above which
gradient-based autoscaling will be performed by IPOPT. This option
corresponds to the IPOPT option nlp_scaling_max_gradient. Used
only if include_ipopt_autoscaling is True.
min_scale: The smallest scaling factor that IPOPT gradient-based scaling
is allowed to produce. This option corresponds to the IPOPT option
nlp_scaling_min_value. Used only if include_ipopt_autoscaling is True.
Returns:
Jacobian matrix in Scipy CSR format, Pynumero nlp
"""
# Pynumero requires an objective, but I don't, so let's see if we have one
n_obj = 0
for _ in m.component_data_objects(Objective, active=True):
n_obj += 1
# Add an objective if there isn't one
if n_obj == 0:
dummy_objective_name = unique_component_name(m, "objective")
setattr(m, dummy_objective_name, Objective(expr=0))
# Create NLP and calculate the objective
if not AmplInterface.available():
raise RuntimeError("Pynumero not available.")
nlp = PyomoNLP(m)
if equality_constraints_only:
jac = nlp.evaluate_jacobian_eq().tocsr()
else:
jac = nlp.evaluate_jacobian().tocsr()
# Get lists of variables and constraints to translate Jacobian indexes
# save them on the NLP for later, since generating them seems to take a while
if equality_constraints_only:
clist = nlp.clist = nlp.get_pyomo_equality_constraints()
else:
clist = nlp.clist = nlp.get_pyomo_constraints()
vlist = nlp.vlist = nlp.get_pyomo_variables()
if include_scaling_factors:
# Preallocating arrays with NaNs to make it apparent if
# some element doesn't get set for some reason
var_scaling_factors = np.nan * np.ones(len(vlist))
for i, var in enumerate(vlist):
var_scaling_factors[i] = get_scaling_factor(var, default=1, warning=False)
con_scaling_factors = np.nan * np.ones(len(clist))
for i, con in enumerate(clist):
con_scaling_factors[i] = get_scaling_factor(con, default=1, warning=False)
# Scale the jacobian by multiplying each row/constraint by its scaling factor
# and dividing each column/variable by its scaling factor.
# Perform sparse matrix multiplication to take advantage of vectorized
# operations implemented in C++.
jac = diags(con_scaling_factors) @ (jac @ diags(1 / var_scaling_factors))
if include_ipopt_autoscaling:
row_norms = spnorm(jac, ord=np.inf, axis=1)
grad_scaling_factors = np.ones(len(clist))
for i, _ in enumerate(clist):
if row_norms[i] > max_grad:
grad_scaling_factors[i] = max(min_scale, max_grad / row_norms[i])
jac = diags(grad_scaling_factors) @ jac
# delete dummy objective
if n_obj == 0:
delattr(m, dummy_objective_name)
return jac, nlp
# TODO should we calculate the 2-norm condition number from the SVD
# once #1566 is merged?
[docs]
def jacobian_cond(m=None, scaled=True, jac=None):
"""
Get the Frobenius condition number of the scaled or unscaled Jacobian matrix
of a model.
Args:
m: calculate the condition number of the Jacobian from this model.
scaled: if True use scaled Jacobian, else use unscaled
jac: (optional) previously calculated Jacobian
Returns:
(float) Condition number
"""
if jac is None:
if m is None:
raise RuntimeError(
"User must provide either a Pyomo model or a Jacobian "
"to calculate the condition number."
)
jac, _ = get_jacobian(m, scaled)
jac = jac.tocsc()
if jac.shape[0] != jac.shape[1]:
_log.info(
"Nonsquare Jacobian. Using pseudoinverse to calculate Frobenius norm."
)
jac_inv = pinv(jac.toarray())
return spnorm(jac, ord="fro") * norm(jac_inv, ord="fro")
else:
jac_inv = spinv(jac)
return spnorm(jac, ord="fro") * spnorm(jac_inv, ord="fro")
[docs]
def scale_time_discretization_equations(blk, time_set, time_scaling_factor):
"""
Scales time discretization equations generated via a Pyomo discretization
transformation. Also scales continuity equations for collocation methods
of discretization that require them.
Args:
blk: Block whose time discretization equations are being scaled
time_set: Time set object. For an IDAES flowsheet object fs, this is fs.time.
time_scaling_factor: Scaling factor to use for time
Returns:
None
"""
tname = time_set.local_name
# Adapted from solvers.petsc.find_discretization_equations
for var in blk.component_objects(Var):
if isinstance(var, DerivativeVar):
cont_set_set = ComponentSet(var.get_continuousset_list())
if time_set in cont_set_set:
if len(cont_set_set) > 1:
_log.warning(
"IDAES presently does not support automatically scaling discretization equations for "
f"second order or higher derivatives like {var.name} that are differentiated at least once with "
"respect to time. Please scale the corresponding discretization equation yourself."
)
continue
state_var = var.get_state_var()
parent_block = var.parent_block()
disc_eq = getattr(parent_block, var.local_name + "_disc_eq")
# Look for continuity equation, which exists only for collocation with certain sets of polynomials
try:
cont_eq = getattr(
parent_block, state_var.local_name + "_" + tname + "_cont_eq"
)
except AttributeError:
cont_eq = None
deriv_dict = dict(
(key, Reference(slc))
for key, slc in slice_component_along_sets(var, (time_set,))
)
state_dict = dict(
(key, Reference(slc))
for key, slc in slice_component_along_sets(state_var, (time_set,))
)
disc_dict = dict(
(key, Reference(slc))
for key, slc in slice_component_along_sets(disc_eq, (time_set,))
)
if cont_eq is not None:
cont_dict = dict(
(key, Reference(slc))
for key, slc in slice_component_along_sets(cont_eq, (time_set,))
)
for key, deriv in deriv_dict.items():
state = state_dict[key]
disc = disc_dict[key]
if cont_eq is not None:
cont = cont_dict[key]
for t in time_set:
s_state = get_scaling_factor(state[t], default=1, warning=True)
set_scaling_factor(
deriv[t], s_state / time_scaling_factor, overwrite=False
)
s_deriv = get_scaling_factor(deriv[t])
# Check time index to decide what constraints to scale
if cont_eq is None:
if t == time_set.first() or t == time_set.last():
try:
set_scaling_factor(
disc[t], s_deriv, overwrite=False
)
except KeyError:
# Discretization and continuity equations may or may not exist at the first or last time
# points depending on the method. Backwards skips first, forwards skips last, central skips
# both (which means the user needs to provide additional equations)
pass
else:
set_scaling_factor(disc[t], s_deriv, overwrite=False)
else:
# Lagrange-Legendre is a pain, because it has continuity equations on the edges of finite elements
# instead of discretization equations, but no intermediate continuity equations, so we have to look
# for both at every timepoint
try:
set_scaling_factor(disc[t], s_deriv, overwrite=False)
except KeyError:
if t != time_set.first():
set_scaling_factor(
# pylint: disable-next=possibly-used-before-assignment
cont[t],
s_state,
overwrite=False,
)