Source code for idaes.core.base.process_base

#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2024 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved.  Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################
"""
Base for IDAES process model objects.
"""
# TODO: Missing docstrings
# pylint: disable=missing-function-docstring

# TODO: Look into if this is necessary
# pylint: disable=protected-access

import sys
import logging
import textwrap
from enum import Enum

from pandas import DataFrame

from pyomo.core.base.block import BlockData
from pyomo.common.formatting import tabular_writer
from pyomo.environ import Block
from pyomo.gdp import Disjunct
from pyomo.common.config import ConfigBlock

from idaes.core.base.process_block import declare_process_block_class
from idaes.core.initialization import BlockTriangularizationInitializer
from idaes.core.util.exceptions import (
    ConfigurationError,
    DynamicError,
    PropertyPackageError,
)
from idaes.core.util.tables import stream_table_dataframe_to_string
from idaes.core.util.model_statistics import (
    degrees_of_freedom,
    number_variables,
    number_activated_constraints,
    number_activated_blocks,
)
from idaes.core.util.units_of_measurement import report_quantity


# Some more information about this module
__author__ = "John Eslick, Qi Chen, Andrew Lee"


__all__ = ["ProcessBlockData"]


useDefault = object()


# Set up logger
_log = logging.getLogger(__name__)


# Enumerate options for material flow basis
class MaterialFlowBasis(Enum):
    """
    Material flow basis Enum
    """

    molar = 0
    mass = 1
    other = 2


[docs] @declare_process_block_class("ProcessBaseBlock") class ProcessBlockData(BlockData): """ Base class for most IDAES process models and classes. The primary purpose of this class is to create the local config block to handle arguments provided by the user when constructing an object and to ensure that these arguments are stored in the config block. Additionally, this class contains a number of methods common to all IDAES classes. """ CONFIG = ConfigBlock("ProcessBlockData", implicit=False) # Set default initializer and scaler default_initializer = BlockTriangularizationInitializer default_scaler = None def __init__(self, component): """ Initialize a ProcessBlockData object. Args: component(Block): container Block instance to which this BlockData belongs. Returns: (ProcessBlockData): A new instance """ super(ProcessBlockData, self).__init__(component=component) self._pb_configured = False
[docs] def build(self): """ The build method is called by the default ProcessBlock rule. If a rule is specified other than the default it is important to call ProcessBlockData's build method to put information from the "default" and "initialize" arguments to a ProcessBlock derived class into the BlockData object's ConfigBlock. The build method should usually be overloaded in a subclass derived from ProcessBlockData. This method would generally add Pyomo components such as variables, expressions, and constraints to the object. It is important for build() methods implemented in derived classes to call build() from the super class. Args: None Returns: None """ self._get_config_args() # Add initialization order list, and populate with current model self.initialization_order = [self] # This is a dict to store default property scaling factors. They are # defined in the parameter block to provide a universal default for # quantities in a particular kind of state block. For example, you can # set flow scaling once instead of for every state block. Some of these # may be left for the user to set and some may be defined in a property # module where reasonable defaults can be defined a priori. See # set_default_scaling, get_default_scaling, and unset_default_scaling self._default_scaling_factors = {}
@property def default_scaling_factors(self): """ Dict of default scaling factors for components in model """ return self._default_scaling_factors @property def default_scaling_factor(self): # Backwards compatibility for name change # TODO: Deprecate once v2.0 is released return self._default_scaling_factors
[docs] def set_default_scaling(self, attribute: str, value: float, index: str = None): """Set a default scaling factor for a property. Args: attribute: property attribute name value: default scaling factor index: for indexed properties, if this is not provided the default scaling factor applies to all indexed elements where specific indexes are not specified. Returns: None """ self._default_scaling_factors[(attribute, index)] = value
[docs] def unset_default_scaling(self, attribute: str, index: str = None): """Remove a previously set default value Args: attribute: property attribute name index: optional index for indexed properties Returns: None """ try: del self._default_scaling_factors[(attribute, index)] except KeyError: pass
[docs] def get_default_scaling(self, attribute: str, index: str = None): """Returns a default scale factor for a property Args: attribute: property attribute name index: optional index for indexed properties Returns: None """ try: # If a specific component data index exists return self._default_scaling_factors[(attribute, index)] except KeyError: try: # indexed, but no specific index? return self._default_scaling_factors[(attribute, None)] except KeyError: # Can't find a default scale factor for what you asked for return None
[docs] def flowsheet(self): """ This method returns the components parent flowsheet object, i.e. the flowsheet component to which the model is attached. If the component has no parent flowsheet, the method returns None. Args: None Returns: Flowsheet object or None """ parent = self.parent_block() while True: if parent is None: return None if hasattr(parent, "is_flowsheet") and parent.is_flowsheet(): return parent else: parent = parent.parent_block()
def _get_config_args(self): """ Get config arguments for this element and put them in the ConfigBlock """ if self._pb_configured: return self._pb_configured = True idx_map = self.parent_component()._idx_map # index map function try: idx = self.index() except AttributeError: idx = None if idx_map is not None: idx = idx_map(idx) initialize = self.parent_component()._block_data_config_initialize if idx in initialize: kwargs = initialize[idx] else: kwargs = self.parent_component()._block_data_config_default self.config = self.CONFIG(kwargs)
[docs] def fix_initial_conditions(self, state="steady-state"): """This method fixes the initial conditions for dynamic models. Args: state : initial state to use for simulation (default = 'steady-state') Returns : None """ if state == "steady-state": for obj in self.component_objects((Block, Disjunct), descend_into=True): # Try to fix material_accumulation @ first time point try: obj.material_accumulation[obj.flowsheet().time.first(), ...].fix( 0.0 ) except AttributeError: pass # Try to fix element_accumulation @ first time point try: obj.element_accumulation[obj.flowsheet().time.first(), ...].fix(0.0) except AttributeError: pass # Try to fix energy_accumulation @ first time point try: obj.energy_accumulation[obj.flowsheet().time.first(), ...].fix(0.0) except AttributeError: pass else: raise ValueError( "Unrecognised value for argument 'state'. " "Valid values are 'steady-state'." )
[docs] def unfix_initial_conditions(self): """This method unfixed the initial conditions for dynamic models. Args: None Returns : None """ for obj in self.component_objects(Block, descend_into=True): # Try to unfix material_accumulation @ first time point try: obj.material_accumulation[obj.flowsheet().time.first(), ...].unfix() except AttributeError: pass # Try to fix element_accumulation @ first time point try: obj.element_accumulation[obj.flowsheet().time.first(), ...].unfix() except AttributeError: pass # Try to fix energy_accumulation @ first time point try: obj.energy_accumulation[obj.flowsheet().time.first(), ...].unfix() except AttributeError: pass
def report(self, time_point=0, dof=False, ostream=None, prefix=""): time_point = float(time_point) if ostream is None: ostream = sys.stdout # Get DoF and model stats if dof: dof_stat = degrees_of_freedom(self) nv = number_variables(self) nc = number_activated_constraints(self) nb = number_activated_blocks(self) # Get components to report in performance section performance = self._get_performance_contents(time_point=time_point) # Get stream table stream_table = self._get_stream_table_contents(time_point=time_point) # Set model type output if hasattr(self, "is_flowsheet") and self.is_flowsheet: model_type = "Flowsheet" else: model_type = "Unit" # Write output max_str_length = 84 tab = " " * 4 ostream.write("\n" + "=" * max_str_length + "\n") lead_str = f"{prefix}{model_type} : {self.name}" trail_str = f"Time: {time_point}" mid_str = " " * (max_str_length - len(lead_str) - len(trail_str)) ostream.write(lead_str + mid_str + trail_str) if dof: ostream.write("\n" + "=" * max_str_length + "\n") ostream.write(f"{prefix}{tab}Local Degrees of Freedom: {dof_stat}") ostream.write("\n") ostream.write( f"{prefix}{tab}Total Variables: {nv}{tab}" f"Activated Constraints: {nc}{tab}" f"Activated Blocks: {nb}" ) if performance is not None: # PYLINT-WHY: pylint has no way of knowing that performance is supposed to be dict-like # pylint: disable=unsubscriptable-object # PYLINT-TODO: alternatively, have the function return an empty dict and test with `if performance:` ostream.write("\n" + "-" * max_str_length + "\n") ostream.write(f"{prefix}{tab}Unit Performance") ostream.write("\n" * 2) if "vars" in performance.keys() and len(performance["vars"]) > 0: ostream.write(f"{prefix}{tab}Variables: \n\n") tabular_writer( ostream, prefix + tab, ((k, v) for k, v in performance["vars"].items()), ("Value", "Units", "Fixed", "Bounds"), lambda k, v: [ "{:#.5g}".format(report_quantity(v).m), report_quantity(v).u, v.fixed, v.bounds, ], ) if "exprs" in performance.keys() and len(performance["exprs"]) > 0: ostream.write("\n") ostream.write(f"{prefix}{tab}Expressions: \n\n") tabular_writer( ostream, prefix + tab, ((k, v) for k, v in performance["exprs"].items()), ( "Value", "Units", ), lambda k, v: [ "{:#.5g}".format(report_quantity(v).m), report_quantity(v).u, ], ) if "params" in performance.keys() and len(performance["params"]) > 0: ostream.write("\n") ostream.write(f"{prefix}{tab}Parameters: \n\n") tabular_writer( ostream, prefix + tab, ((k, v) for k, v in performance["params"].items()), ("Value", "Units", "Mutable"), lambda k, v: [ report_quantity(v).m, report_quantity(v).u, not v.is_constant(), ], ) if stream_table is not None: ostream.write("\n" + "-" * max_str_length + "\n") ostream.write(f"{prefix}{tab}Stream Table") ostream.write("\n") ostream.write( textwrap.indent( stream_table_dataframe_to_string(stream_table), prefix + tab ) ) ostream.write("\n" + "=" * max_str_length + "\n") def _get_performance_contents(self, time_point=0): return None def _get_stream_table_contents(self, time_point=0): return None
[docs] def serialize_contents(self, time_point=0): """ Return the performance contents and stream table NOTE: There is the possibility of a ConfigurationError because the names of the inlets and outlets of the unit model may not be standard. If this occurs then return an empty dataframe Args: time_point: The time Returns: performance_contents: Pandas dataframe with the performance contents stream_table: Pandas dataframe with the stream table for a unit model """ performance_contents = self._get_performance_contents(time_point) try: stream_table = self._get_stream_table_contents(time_point) except ConfigurationError as err: _log.warning(f"Could not serialize stream table: {err}") stream_table = DataFrame() return performance_contents, stream_table
def _setup_dynamics(self): """ This method automates the setting of the dynamic flag and time domain for unit models. Performs the following: 1) Determines if this is a top level flowsheet 2) Gets dynamic flag from parent if not top level, or checks validity of argument provided 3) Checks has_holdup flag if present and dynamic = True Args: None Returns: None """ # Get parent object if hasattr(self.parent_block(), "config"): # Parent block has a config block, so use this parent = self.parent_block() else: # Use parent flowsheet try: parent = self.flowsheet() except ConfigurationError: raise DynamicError( "{} has no parent flowsheet from which to " "get dynamic argument. Please provide a " "value for this argument when constructing " "the unit.".format(self.name) ) # Check the dynamic flag, and retrieve if necessary if self.config.dynamic == useDefault: # Get flag from parent flowsheet try: self.config.dynamic = parent.config.dynamic except AttributeError: # No flowsheet, raise exception raise DynamicError( "{} parent flowsheet has no dynamic " "argument. Please provide a " "value for this argument when constructing " "the unit.".format(self.name) ) # Check for case when dynamic=True, but parent dynamic=False if self.config.dynamic and not parent.config.dynamic: raise DynamicError( "{} trying to declare a dynamic model within " "a steady-state flowsheet. This is not " "supported by the IDAES framework. Try " "creating a dynamic flowsheet instead, and " "declaring some models as steady-state.".format(self.name) ) # Set and validate has_holdup argument if self.config.has_holdup == useDefault: # Default to same value as dynamic flag self.config.has_holdup = self.config.dynamic elif self.config.has_holdup is False: if self.config.dynamic is True: # Dynamic model must have has_holdup = True raise ConfigurationError( "{} invalid arguments for dynamic and has_holdup. " "If dynamic = True, has_holdup must also be True " "(was False)".format(self.name) ) def _get_property_package(self): """ This method gathers the necessary information about the property package to be used in the control volume block. If a property package has not been provided by the user, the method searches up the model tree until it finds an object with the 'default_property_package' attribute and uses this package for the control volume block. The method also gathers any default construction arguments specified for the property package and combines these with any arguments specified by the user for the control volume block (user specified arguments take priority over defaults). Args: None Returns: None """ # Get property_package block if not provided in arguments parent = self.parent_block() if self.config.property_package == useDefault: # Try to get property_package from parent try: if parent.config.property_package in [None, useDefault]: parent.config.property_package = self._get_default_prop_pack() self.config.property_package = parent.config.property_package except AttributeError: self.config.property_package = self._get_default_prop_pack() # Check for any flowsheet level build arguments for k in self.config.property_package.config.default_arguments: if k not in self.config.property_package_args: self.config.property_package_args[k] = ( self.config.property_package.config.default_arguments[k] ) def _get_default_prop_pack(self): """ This method is used to find a default property package defined at the flowsheet level if a package is not provided as an argument when instantiating the control volume block. Args: None Returns: None """ parent = self.flowsheet() while True: if parent is None: raise ConfigurationError( "{} no property package provided and " "no default defined by parent flowsheet(s).".format(self.name) ) elif parent.config.default_property_package is not None: _log.info(f"{self.name} Using default property package") return parent.config.default_property_package parent = parent.flowsheet() def _get_indexing_sets(self): """ This method collects all necessary indexing sets from property parameter block and makes references to these for use within the control volume block. Collected indexing sets are phase_list and component_list. Args: None Returns: None """ # Check for phase list(s) if not hasattr(self.config.property_package, "phase_list"): raise PropertyPackageError( "{} property_package provided does not " "contain a phase_list. " "Please contact the developer of the property package.".format( self.name ) ) # Check for component list(s) if not hasattr(self.config.property_package, "component_list"): raise PropertyPackageError( "{} property_package provided does not " "contain a component_list. " "Please contact the developer of the property package.".format( self.name ) ) def _get_reaction_package(self): """ This method gathers the necessary information about the reaction package to be used in the control volume block (if required). If a reaction package has been provided by the user, the method gathers any default construction arguments specified for the reaction package and combines these with any arguments specified by the user for the control volume block (user specified arguments take priority over defaults). Args: None Returns: None """ if self.config.reaction_package is not None: # Check for any flowsheet level build arguments for k in self.config.reaction_package.config.default_arguments: if k not in self.config.reaction_package_args: self.config.reaction_package_args[k] = ( self.config.reaction_package.config.default_arguments[k] ) def calculate_scaling_factors(self): # This lets you call super().calculate_scaling_factors() in a unit # model's calculate_scaling_factors method without worrying about # whether the parent class defines one. This allows for a more standized # form of calculate_scaling_factors() methods. pass