#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2023 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################
"""
Utility functions for scaling.
Author: Andrew Lee
"""
from copy import deepcopy
import math
import sys
import json
from pyomo.environ import (
Binary,
Block,
Boolean,
Constraint,
NegativeIntegers,
NegativeReals,
NonNegativeIntegers,
NonNegativeReals,
NonPositiveIntegers,
NonPositiveReals,
PositiveIntegers,
PositiveReals,
Suffix,
value,
Var,
)
from pyomo.core.base.block import BlockData
from pyomo.core.base.var import VarData
from pyomo.core.base.param import ParamData
from pyomo.core import expr as EXPR
from pyomo.common.numeric_types import native_types
from pyomo.core.base.units_container import _PyomoUnit
from idaes.core.util.exceptions import BurntToast
import idaes.logger as idaeslog
_log = idaeslog.getLogger(__name__)
TAB = " " * 4
[docs]
def get_scaling_suffix(component):
"""
Get scaling suffix for component.
If component is not a Block, gets scaling suffix from parent block.
Creates a new suffix if one is not found.
Args:
component: component to get suffix for
Returns:
Pyomo scaling Suffix
Raises:
TypeError is component is an IndexedBlock
"""
if isinstance(component, BlockData):
blk = component
elif isinstance(component, Block):
raise TypeError(
"IndexedBlocks cannot have scaling factors attached to them. "
"Please assign scaling factors to the elements of the IndexedBlock."
)
else:
blk = component.parent_block()
try:
sfx = blk.scaling_factor
except AttributeError:
# No existing suffix, create one
_log.debug(f"Created new scaling suffix for {blk.name}")
sfx = blk.scaling_factor = Suffix(direction=Suffix.EXPORT)
return sfx
[docs]
def scaling_factors_to_dict(blk_or_suffix, descend_into: bool = True):
"""
Write scaling suffixes to a serializable json dict.
Component objects are replaced by their local names so they can be
serialized.
Args:
blk_or_suffix: Pyomo Block or Suffix object to covert to dict
descend_into: for Blocks, whether to descend into any child blocks
Returns
dict containing scaling factors indexed by component names
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
# First, determine what type of component we have
if isinstance(blk_or_suffix, Suffix):
# Suffix
sdict = _suffix_to_dict(blk_or_suffix)
blk = blk_or_suffix.parent_block()
elif isinstance(blk_or_suffix, BlockData):
# Scalar block or element of indexed block
sdict = _collect_block_suffixes(blk_or_suffix, descend_into=descend_into)
blk = blk_or_suffix
elif isinstance(blk_or_suffix, Block):
# Indexed block
blk = blk_or_suffix
sdict = {}
sdict["block_datas"] = {}
for bd in blk_or_suffix.values():
sdict["block_datas"][bd.name] = _collect_block_suffixes(
bd, descend_into=descend_into
)
else:
# Not a block or suffix
raise TypeError(
f"{blk_or_suffix.name} is not an instance of a Block of Suffix."
)
# Attach block name for future verification
sdict["block_name"] = blk.name
return sdict
[docs]
def scaling_factors_from_dict(
blk_or_suffix, json_dict: dict, overwrite: bool = False, verify_names: bool = True
):
"""
Set scaling factors based on values in a serializable json dict.
This method expects components to be referenced by their local names.
Args:
blk_or_suffix: Pyomo Block or Suffix object to set scaling factors on
json_dict: dict of scaling factors to load into model
overwrite: (bool) whether to overwrite existing scaling factors or not
verify_names: (bool) whether to verify that all names in dict exist on block
Returns
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
# First, copy json_dict so we do not mutate original
sdict = deepcopy(json_dict)
# Pop block name for verification
block_name = sdict.pop("block_name")
# Next, determine what type of component we have
if isinstance(blk_or_suffix, Suffix):
# Suffix
if verify_names and block_name != blk_or_suffix.parent_block().name:
raise ValueError(
f"Name of parent block ({blk_or_suffix.parent_block().name}) does "
f"not match that recorded in json_dict ({block_name})"
)
_suffix_from_dict(
blk_or_suffix, sdict, overwrite=overwrite, verify_names=verify_names
)
elif isinstance(blk_or_suffix, BlockData):
# Scalar block or element of indexed block
if verify_names and block_name != blk_or_suffix.name:
raise ValueError(
f"Block name ({blk_or_suffix.name}) does "
f"not match that recorded in json_dict ({block_name})"
)
_set_block_suffixes_from_dict(
blk_or_suffix, sdict, overwrite=overwrite, verify_names=verify_names
)
elif isinstance(blk_or_suffix, Block):
# Indexed block
if verify_names and block_name != blk_or_suffix.name:
raise ValueError(
f"Block name ({blk_or_suffix.name}) does "
f"not match that recorded in json_dict ({block_name})"
)
for bd_name, bd_dict in sdict["block_datas"].items():
bd = blk_or_suffix.parent_block().find_component(bd_name)
_set_block_suffixes_from_dict(
bd, bd_dict, overwrite=overwrite, verify_names=verify_names
)
else:
# Not a block or suffix
raise TypeError(
f"{blk_or_suffix.name} is not an instance of a Block of Suffix."
)
[docs]
def scaling_factors_to_json_file(blk_or_suffix, filename: str):
"""
Serialize scaling factors to file in json format.
Args:
blk_of_suffix: Block or Suffix to save scaling factors for
filename: name of file to write to as string
Returns:
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
with open(filename, "w") as fd:
json.dump(scaling_factors_to_dict(blk_or_suffix), fd, indent=3)
[docs]
def scaling_factors_from_json_file(
blk_or_suffix, filename: str, overwrite: bool = False, verify_names: bool = True
):
"""
Load scaling factors from json file.
Args:
blk_of_suffix: Block or Suffix to load scaling factors for
filename: name of file to load as string
overwrite: (bool) whether to overwrite existing scaling factors or not
verify_names: (bool) whether to verify that all names in dict exist on block
Returns:
None
Raises:
TypeError if blk_or_suffix is not an instance of Block or Suffix
"""
with open(filename, "r") as f:
scaling_factors_from_dict(
blk_or_suffix, json.load(f), overwrite=overwrite, verify_names=verify_names
)
f.close()
def _collect_block_suffixes(block_data, descend_into=True):
suffix = get_scaling_suffix(block_data)
sdict = _suffix_to_dict(suffix)
if descend_into:
sdict["subblock_suffixes"] = {}
for sb in block_data.component_data_objects(Block, descend_into=False):
sdict["subblock_suffixes"][sb.local_name] = _collect_block_suffixes(
sb, descend_into
)
return sdict
def _set_block_suffixes_from_dict(
block_data, json_dict, verify_names=True, overwrite=False
):
# First, copy dict so we can take it apart
sdict = deepcopy(json_dict)
# Pop any subblock suffixes
sb_dict = sdict.pop("subblock_suffixes", None)
# Set local suffix values
suffix = get_scaling_suffix(block_data)
_suffix_from_dict(suffix, sdict, verify_names=verify_names, overwrite=overwrite)
if sb_dict is not None:
# Get each subblock and apply function recursively
for sb, sb_dict_value in sb_dict.items():
subblock = block_data.find_component(sb)
if subblock is not None:
_set_block_suffixes_from_dict(
subblock,
sb_dict_value,
verify_names=verify_names,
overwrite=overwrite,
)
elif verify_names:
raise AttributeError(
f"Block {block_data.name} does not have a subblock named {sb}."
)
def _suffix_to_dict(suffix):
sdict = {}
for k, v in suffix.items():
# Record components by their local name so we can use
# find_Component to retrieve them later
sdict[k.local_name] = v
return sdict
def _suffix_from_dict(suffix, json_dict, verify_names=True, overwrite=False):
parent_block = suffix.parent_block()
for k, v in json_dict.items():
# Safety catch in case this gets left in by other functions
if k == "parent_name":
continue
comp = parent_block.find_component(k)
if comp is not None:
if overwrite or comp not in suffix:
suffix[comp] = v
elif verify_names:
raise ValueError(
f"Could not find component {k} on block {parent_block.name}."
)
[docs]
def get_scaling_factor(component):
"""
Get scaling factor for component.
Args:
component: component to get scaling factor for
Returns:
float scaling factor
Raises:
TypeError if component is a Block
"""
if isinstance(component, (Block, BlockData)):
raise TypeError("Blocks cannot have scaling factors.")
try:
sfx = get_scaling_suffix(component)
return sfx[component]
except (AttributeError, KeyError):
# No scaling factor found, return None
return None
[docs]
def set_scaling_factor(component, scaling_factor: float, overwrite: bool = False):
"""
Set scaling factor for component.
Scaling factors must be positive, non-zero floats.
Args:
component: component to set scaling factor for
scaling_factor: scaling factor to assign
overwrite: (bool) whether to overwrite existing scaling factor
Returns:
None
Raises:
ValueError is scaling_factor is 0 or negative
"""
# Cast to float to catch any garbage inputs
scaling_factor = float(scaling_factor)
# Check for negative or 0 scaling factors
if scaling_factor < 0:
raise ValueError(
f"scaling factor for {component.name} is negative ({scaling_factor}). "
"Scaling factors must be strictly positive."
)
elif scaling_factor == 0:
raise ValueError(
f"scaling factor for {component.name} is zero. "
"Scaling factors must be strictly positive."
)
# Get suffix and assign scaling factor
sfx = get_scaling_suffix(component)
if not overwrite and component in sfx:
_log.debug(
f"Existing scaling factor for {component.name} found and overwrite=False. "
"Scaling factor unchanged."
)
else:
sfx[component] = scaling_factor
[docs]
def del_scaling_factor(component, delete_empty_suffix: bool = False):
"""
Delete scaling factor for component.
Args:
component: component to delete scaling factor for
delete_empty_suffix: (bool) whether to delete scaling Suffix if it is
empty after deletion.
"""
# Get suffix
parent = component.parent_block()
sfx = get_scaling_suffix(parent)
# Delete entry for component if it exists
# Pyomo handles case where value does not exist in suffix with a no-op
sfx.clear_value(component)
if delete_empty_suffix:
# Check if Suffix is empty (i.e. length 0)
if len(sfx) == 0:
# If so, delete suffix from parent block of component
_log.debug(f"Deleting empty scaling suffix from {parent.name}")
parent.del_component(sfx)
[docs]
def report_scaling_factors(
blk: Block, ctype=None, descend_into: bool = False, stream=None
):
"""
Write the scaling factors for all components in a Block to a stream.
Args:
blk: Block to get scaling factors from.
ctype: None, Var or Constraint. Type of component to show scaling factors for
(if None, shows both Vars and Constraints).
descend_into: whether to show scaling factors for components in sub-blocks.
stream: StringIO object to write results to. If not provided, writes to stdout.
Raises:
TypeError if blk is not a Pyomo Block.
ValueError is ctype is not None, Var or Constraint.
"""
if stream is None:
stream = sys.stdout
if ctype not in [None, Var, Constraint]:
raise ValueError(
f"report_scaling_factors only supports None, Var or Constraint for argument ctype: "
f"received {ctype}."
)
if not isinstance(blk, (Block, BlockData)):
raise TypeError(
"report_scaling_factors: blk must be an instance of a Pyomo Block."
)
stream.write(f"Scaling Factors for {blk.name}\n")
# We will report Vars and Constraint is separate sections for clarity - iterate separately
if ctype != Constraint:
# Collect Vars
vdict = {}
for blkdata in blk.values():
for vardata in blkdata.component_data_objects(
Var, descend_into=descend_into
):
val = vardata.value
sf = get_scaling_factor(vardata)
if sf is not None:
sfstr = "{:.3E}".format(sf)
else:
sfstr = "None "
if val is not None:
vstr = "{:.3E}".format(val)
if sf is not None:
sval = "{:.3E}".format(value(vardata * sf))
else:
sval = vstr
else:
vstr = "None "
sval = "None"
vdict[vardata.name] = (sfstr, vstr, sval)
# Write Var section - skip if no Vars
if len(vdict) > 0:
# Get longest var name
header = "Variable"
maxname = len(max(vdict.keys(), key=len))
if maxname < len(header):
maxname = len(header)
stream.write(
f"\n{header}{' '*(maxname-len(header))}{TAB}Scaling Factor{TAB}Value{' '*4}{TAB}Scaled Value\n"
)
for n, i in vdict.items():
# Pad name to length
stream.write(
f"{n + ' '*(maxname-len(n))}{TAB}{i[0]}{' '*5}{TAB}{i[1]}{TAB}{i[2]}\n"
)
if ctype != Var:
# Collect Constraints
for blkdata in blk.values():
cdict = {}
for condata in blkdata.component_data_objects(
Constraint, descend_into=descend_into
):
sf = get_scaling_factor(condata)
if sf is not None:
sfstr = "{:.3E}".format(sf)
else:
sfstr = "None"
cdict[condata.name] = sfstr
# Write Constraint section - skip if no Constraints
if len(cdict) > 0:
# Get longest con name
header = "Constraint"
maxname = len(max(cdict.keys(), key=len))
if maxname < len(header):
maxname = len(header)
stream.write(
f"\n{header}{' ' * (maxname - len(header))}{TAB}Scaling Factor\n"
)
for n, i in cdict.items():
# Pad name to length
stream.write(f"{n + ' ' * (maxname - len(n))}{TAB}{i}\n")
[docs]
def get_nominal_value(component):
"""
Get the signed nominal value for a VarData or ParamData component.
For fixed Vars and Params, the current value of the component will be returned.
For unfixed Vars, the nominal value is determined using the assigned scaling factor
and the sign determined based on the bounds and domain of the variable (defaulting to
positive). If no scaling factor is set, then the current value will be used if set,
otherwise the absolute nominal value will be equal to 1.
Args:
component: component to determine nominal value for
Returns:
signed float with nominal value
Raises:
TypeError if component is not instance of VarData or ParamData
"""
# Determine if Var or Param
if isinstance(component, VarData):
if component.fixed:
# Nominal value of a fixed Var is its value
return value(component)
# Get scaling factor for Var
sf = get_scaling_factor(component)
if sf is None:
# No scaling factor - see if Var has a value
if component.value is not None:
# If it has a value, use that as the nominal value
# As we are using the actual value, do not need to determine sign
return value(component)
else:
# Otherwise assign a nominal value of 1
sf = 1
# Try to determine expected sign of node
ub = component.ub
lb = component.lb
domain = component.domain
# To avoid NoneType errors, assign dummy values in place of None
if ub is None:
# No upper bound, take a positive value
ub = 1000
if lb is None:
# No lower bound, take a negative value
lb = -1000
if lb >= 0 or domain in [
NonNegativeReals,
PositiveReals,
PositiveIntegers,
NonNegativeIntegers,
Boolean,
Binary,
]:
# Strictly positive
sign = 1
elif ub <= 0 or domain in [
NegativeReals,
NonPositiveReals,
NegativeIntegers,
NonPositiveIntegers,
]:
# Strictly negative
sign = -1
else:
# Unbounded, see if there is a current value
# Assume positive until proven otherwise
sign = 1
if component.value is not None:
val = value(component)
if val < 0:
# Assigned negative value, assume value will remain negative
sign = -1
return sign / sf
elif isinstance(component, ParamData):
# Nominal value of a parameter is always its value
return value(component)
else:
# Not a Var or Param - invalid component type
raise TypeError(
f"get_nominal_value - {component.name} is not an instance of a Var or Param."
)
class NominalValueExtractionVisitor(EXPR.StreamBasedExpressionVisitor):
"""
Expression walker for collecting scaling factors in an expression and determining the
expected value of the expression using the scaling factors as nominal inputs.
By default, the get_nominal_value method is used to determine the nominal value for
all Vars and Params in the expression, however this can be changed by setting the
nominal_value_callback argument.
Returns a list of expected values for each additive term in the expression.
In order to properly assess the expected value of terms within functions, the sign
of each term is maintained throughout thus returned values may be negative. Functions
using this walker should handle these appropriately.
"""
def __init__(self, nominal_value_callback=get_nominal_value):
"""
Visitor class used to determine nominal values of all terms in an expression based on
scaling factors assigned to the associated variables. Do not use this class directly.
Args:
nominal_value_callback - method to use to get nominal value of root nodes.
Notes
-----
This class inherits from the :class:`StreamBasedExpressionVisitor` to implement
a walker that returns the nominal value corresponding to all additive terms in an
expression.
There are class attributes (dicts) that map the expression node type to the
particular method that should be called to return the nominal value of the node based
on the nominal value of its child arguments. This map is used in exitNode.
"""
super().__init__()
self._nominal_value_callback = nominal_value_callback
def _get_magnitude_base_type(self, node):
try:
return [self._nominal_value_callback(node)]
except TypeError:
# Not a Var or Param - something went wrong
raise BurntToast(
"NominalValueExtractionVisitor found root node that was not a Var or Param. "
"This should never happen - please contact the developers with this bug."
)
def _get_nominal_value_for_sum_subexpression(self, child_nominal_values):
return sum(i for i in child_nominal_values)
def _get_nominal_value_for_sum(self, node, child_nominal_values):
# For sums, collect all child values into a list
mag = []
for i in child_nominal_values:
for j in i:
mag.append(j)
return mag
def _get_nominal_value_for_product(self, node, child_nominal_values):
mag = []
for i in child_nominal_values[0]:
for j in child_nominal_values[1]:
mag.append(i * j)
return mag
def _get_nominal_value_for_division(self, node, child_nominal_values):
numerator = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[0]
)
denominator = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[1]
)
if denominator == 0:
# Assign a nominal value of 1 so that we can continue
denominator = 1
# Log a warning for the user
_log.warning(
"Nominal value of 0 found in denominator of division expression. "
"Assigning a value of 1. You should check you scaling factors and models to "
"ensure there are no values of 0 that can appear in these functions."
)
return [numerator / denominator]
def _get_nominal_value_for_power(self, node, child_nominal_values):
# Use the absolute value of the base term to avoid possible complex numbers
base = abs(
self._get_nominal_value_for_sum_subexpression(child_nominal_values[0])
)
exponent = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[1]
)
return [base**exponent]
def _get_nominal_value_single_child(self, node, child_nominal_values):
return child_nominal_values[0]
def _get_nominal_value_abs(self, node, child_nominal_values):
return [abs(i) for i in child_nominal_values[0]]
def _get_nominal_value_negation(self, node, child_nominal_values):
return [-i for i in child_nominal_values[0]]
def _get_nominal_value_for_unary_function(self, node, child_nominal_values):
func_name = node.getname()
func_nominal = self._get_nominal_value_for_sum_subexpression(
child_nominal_values[0]
)
func = getattr(math, func_name)
try:
return [func(func_nominal)]
except ValueError:
raise ValueError(
f"Evaluation error occurred when getting nominal value in {func_name} "
f"expression with input {func_nominal}. You should check you scaling factors "
f"and model to address any numerical issues or scale this constraint manually."
)
def _get_nominal_value_expr_if(self, node, child_nominal_values):
return child_nominal_values[1] + child_nominal_values[2]
def _get_nominal_value_external_function(self, node, child_nominal_values):
# First, need to get expected magnitudes of input terms, which may be sub-expressions
input_mag = [
self._get_nominal_value_for_sum_subexpression(i)
for i in child_nominal_values
]
# Next, create a copy of the external function with expected magnitudes as inputs
newfunc = node.create_node_with_local_data(input_mag)
# Evaluate new function and return the absolute value
return [value(newfunc)]
node_type_method_map = {
EXPR.EqualityExpression: _get_nominal_value_for_sum,
EXPR.InequalityExpression: _get_nominal_value_for_sum,
EXPR.RangedExpression: _get_nominal_value_for_sum,
EXPR.SumExpression: _get_nominal_value_for_sum,
EXPR.NPV_SumExpression: _get_nominal_value_for_sum,
EXPR.ProductExpression: _get_nominal_value_for_product,
EXPR.MonomialTermExpression: _get_nominal_value_for_product,
EXPR.NPV_ProductExpression: _get_nominal_value_for_product,
EXPR.DivisionExpression: _get_nominal_value_for_division,
EXPR.NPV_DivisionExpression: _get_nominal_value_for_division,
EXPR.PowExpression: _get_nominal_value_for_power,
EXPR.NPV_PowExpression: _get_nominal_value_for_power,
EXPR.NegationExpression: _get_nominal_value_negation,
EXPR.NPV_NegationExpression: _get_nominal_value_negation,
EXPR.AbsExpression: _get_nominal_value_abs,
EXPR.NPV_AbsExpression: _get_nominal_value_abs,
EXPR.UnaryFunctionExpression: _get_nominal_value_for_unary_function,
EXPR.NPV_UnaryFunctionExpression: _get_nominal_value_for_unary_function,
EXPR.Expr_ifExpression: _get_nominal_value_expr_if,
EXPR.ExternalFunctionExpression: _get_nominal_value_external_function,
EXPR.NPV_ExternalFunctionExpression: _get_nominal_value_external_function,
EXPR.LinearExpression: _get_nominal_value_for_sum,
}
def exitNode(self, node, data):
"""Callback for :class:`pyomo.core.current.StreamBasedExpressionVisitor`. This
method is called when moving back up the tree in a depth first search."""
# first check if the node is a leaf
nodetype = type(node)
if nodetype in native_types:
return [node]
node_func = self.node_type_method_map.get(nodetype, None)
if node_func is not None:
return node_func(self, node, data)
elif not node.is_expression_type():
# this is a leaf, but not a native type
if nodetype is _PyomoUnit:
return [1]
else:
return self._get_magnitude_base_type(node)
# might want to add other common types here
# not a leaf - check if it is a named expression
if (
hasattr(node, "is_named_expression_type")
and node.is_named_expression_type()
):
return self._get_nominal_value_single_child(node, data)
raise TypeError(
f"An unhandled expression node type: {str(nodetype)} was encountered while "
f"retrieving the nominal value of expression {str(node)}"
)