##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018-2019, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory, National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes-pse".
##############################################################################
"""
Base for IDAES process model objects.
"""
from __future__ import absolute_import # disable implicit relative imports
from __future__ import division # No integer division
from __future__ import print_function # Python 3 style print
import sys
import logging
import textwrap
from pyomo.core.base.block import _BlockData
from pyomo.core.base.misc import tabular_writer
from pyomo.environ import Block, value
from pyomo.gdp import Disjunct
from pyomo.common.config import ConfigBlock
from pyutilib.enum import Enum
from idaes.core.process_block import declare_process_block_class
from idaes.core.util.exceptions import (ConfigurationError,
DynamicError,
PropertyPackageError)
from idaes.core.util.tables import stream_table_dataframe_to_string
from idaes.core.util.model_statistics import (degrees_of_freedom,
number_variables,
number_activated_constraints,
number_activated_blocks)
# Some more inforation about this module
__author__ = "John Eslick, Qi Chen, Andrew Lee"
__all__ = ['ProcessBlockData']
useDefault = object()
# Set up logger
_log = logging.getLogger(__name__)
# Enumerate options for material flow basis
MaterialFlowBasis = Enum(
'molar',
'mass',
'other')
[docs]@declare_process_block_class("ProcessBaseBlock")
class ProcessBlockData(_BlockData):
"""
Base class for most IDAES process models and classes.
The primary purpose of this class is to create the local config block to
handle arguments provided by the user when constructing an object and to
ensure that these arguments are stored in the config block.
Additionally, this class contains a number of methods common to all IDAES
classes.
"""
CONFIG = ConfigBlock("ProcessBlockData", implicit=False)
def __init__(self, component):
"""
Initialize a ProcessBlockData object.
Args:
component(Block): container Block instance to which this _BlockData
belongs.
Returns:
(ProcessBlockData): A new instance
"""
super(ProcessBlockData, self).__init__(component=component)
self._pb_configured = False
[docs] def build(self):
"""
The build method is called by the default ProcessBlock rule. If a rule
is sepecified other than the default it is important to call
ProcessBlockData's build method to put information from the "default"
and "initialize" arguments to a ProcessBlock derived class into the
BlockData object's ConfigBlock.
The the build method should usually be overloaded in a subclass derived
from ProcessBlockData. This method would generally add Pyomo components
such as variables, expressions, and constraints to the object. It is
important for build() methods implimented in derived classes to call
build() from the super class.
Args:
None
Returns:
None
"""
self._get_config_args()
[docs] def flowsheet(self):
"""
This method returns the components parent flowsheet object, i.e. the
flowsheet component to which the model is attached. If the component
has no parent flowsheet, the method returns None.
Args:
None
Returns:
Flowsheet object or None
"""
parent = self.parent_block()
while True:
if parent is None:
return None
if hasattr(parent, 'is_flowsheet') and parent.is_flowsheet():
return parent
else:
parent = parent.parent_block()
def _get_config_args(self):
"""
Get config arguments for this element and put them in the ConfigBlock
"""
if self._pb_configured:
return
self._pb_configured = True
idx_map = self.parent_component()._idx_map # index map function
try:
idx = self.index()
except:
idx = None
if idx_map is not None:
idx = idx_map(idx)
initialize = self.parent_component()._block_data_config_initialize
if idx in initialize:
kwargs = initialize[idx]
else:
kwargs = self.parent_component()._block_data_config_default
self.config = self.CONFIG(kwargs)
[docs] def fix_initial_conditions(self, state="steady-state"):
"""This method fixes the initial conditions for dynamic models.
Args:
state : initial state to use for simulation (default =
'steady-state')
Returns :
None
"""
if state == 'steady-state':
for obj in self.component_objects((Block, Disjunct),
descend_into=True):
# Try to fix material_accumulation @ first time point
try:
obj.material_accumulation[
obj.flowsheet().config.time.first(), ...].fix(0.0)
except AttributeError:
pass
# Try to fix element_accumulation @ first time point
try:
obj.element_accumulation[
obj.flowsheet().config.time.first(), ...].fix(0.0)
except AttributeError:
pass
# Try to fix enthalpy_accumulation @ first time point
try:
obj.enthalpy_accumulation[
obj.flowsheet().config.time.first(), ...].fix(0.0)
except AttributeError:
pass
else:
raise ValueError("Unrecognised value for argument 'state'. "
"Valid values are 'steady-state'.")
[docs] def unfix_initial_conditions(self):
"""This method unfixed the initial conditions for dynamic models.
Args:
None
Returns :
None
"""
for obj in self.component_objects(Block, descend_into=True):
# Try to unfix material_accumulation @ first time point
try:
obj.material_accumulation[
obj.flowsheet().config.time.first(), ...].unfix()
except AttributeError:
pass
# Try to fix element_accumulation @ first time point
try:
obj.element_accumulation[
obj.flowsheet().config.time.first(), ...].unfix()
except AttributeError:
pass
# Try to fix enthalpy_accumulation @ first time point
try:
obj.enthalpy_accumulation[
obj.flowsheet().config.time.first(), ...].unfix()
except AttributeError:
pass
def report(self, time_point=0, dof=False, ostream=None, prefix=""):
time_point = float(time_point)
if ostream is None:
ostream = sys.stdout
# Get DoF and model stats
if dof:
dof_stat = degrees_of_freedom(self)
nv = number_variables(self)
nc = number_activated_constraints(self)
nb = number_activated_blocks(self)
# Get components to report in performance section
performance = self._get_performance_contents(time_point=time_point)
# Get stream table
stream_table = self._get_stream_table_contents(time_point=time_point)
# Set model type output
if hasattr(self, "is_flowsheet") and self.is_flowsheet:
model_type = "Flowsheet"
else:
model_type = "Unit"
# Write output
max_str_length = 84
tab = " "*4
ostream.write("\n"+"="*max_str_length+"\n")
lead_str = f"{prefix}{model_type} : {self.name}"
trail_str = f"Time: {time_point}"
mid_str = " "*(max_str_length-len(lead_str)-len(trail_str))
ostream.write(lead_str+mid_str+trail_str)
if dof:
ostream.write("\n"+"="*max_str_length+"\n")
ostream.write(f"{prefix}{tab}Local Degrees of Freedom: {dof_stat}")
ostream.write('\n')
ostream.write(f"{prefix}{tab}Total Variables: {nv}{tab}"
f"Activated Constraints: {nc}{tab}"
f"Activated Blocks: {nb}")
if performance is not None:
ostream.write("\n"+"-"*max_str_length+"\n")
ostream.write(f"{prefix}{tab}Unit Performance")
ostream.write("\n"*2)
if "vars" in performance.keys() and len(performance["vars"]) > 0:
ostream.write(f"{prefix}{tab}Variables: \n\n")
tabular_writer(
ostream,
prefix+tab,
((k, v) for k, v in performance["vars"].items()),
("Value", "Fixed", "Bounds"),
lambda k, v: [
"{:#.5g}".format(value(v)),
v.fixed,
v.bounds])
if "exprs" in performance.keys() and len(performance["exprs"]) > 0:
ostream.write("\n")
ostream.write(f"{prefix}{tab}Expressions: \n\n")
tabular_writer(
ostream,
prefix+tab,
((k, v) for k, v in performance["exprs"].items()),
("Value",),
lambda k, v: [
"{:#.5g}".format(value(v))])
if ("params" in performance.keys() and
len(performance["params"]) > 0):
ostream.write("\n")
ostream.write(f"{prefix}{tab}Parameters: \n\n")
tabular_writer(
ostream,
prefix+tab,
((k, v) for k, v in performance["params"].items()),
("Value", "Mutable"),
lambda k, v: [value(v),
not v.is_constant()])
if stream_table is not None:
ostream.write("\n"+"-"*max_str_length+"\n")
ostream.write(f"{prefix}{tab}Stream Table")
ostream.write('\n')
ostream.write(
textwrap.indent(
stream_table_dataframe_to_string(stream_table),
prefix+tab))
ostream.write("\n"+"="*max_str_length+"\n")
def _get_performance_contents(self, time_point):
return None
def _get_stream_table_contents(self, time_point):
return None
def _setup_dynamics(self):
"""
This method automates the setting of the dynamic flag and time domain
for unit models.
Performs the following:
1) Determines if this is a top level flowsheet
2) Gets dynamic flag from parent if not top level, or checks validity
of argument provided
3) Checks has_holdup flag if present and dynamic = True
Args:
None
Returns:
None
"""
# Get parent object
if hasattr(self.parent_block(), "config"):
# Parent block has a config block, so use this
parent = self.parent_block()
else:
# Use parent flowsheet
try:
parent = self.flowsheet()
except ConfigurationError:
raise DynamicError('{} has no parent flowsheet from which to '
'get dynamic argument. Please provide a '
'value for this argument when constructing '
'the unit.'
.format(self.name))
# Check the dynamic flag, and retrieve if necessary
if self.config.dynamic == useDefault:
# Get flag from parent flowsheet
try:
self.config.dynamic = parent.config.dynamic
except AttributeError:
# No flowsheet, raise exception
raise DynamicError('{} parent flowsheet has no dynamic '
'argument. Please provide a '
'value for this argument when constructing '
'the unit.'
.format(self.name))
# Check for case when dynamic=True, but parent dynamic=False
if (self.config.dynamic and not parent.config.dynamic):
raise DynamicError('{} trying to declare a dynamic model within '
'a steady-state flowsheet. This is not '
'supported by the IDAES framework. Try '
'creating a dynamic flowsheet instead, and '
'declaring some models as steady-state.'
.format(self.name))
# Set and validate has_holdup argument
if self.config.has_holdup == useDefault:
# Default to same value as dynamic flag
self.config.has_holdup = self.config.dynamic
elif self.config.has_holdup is False:
if self.config.dynamic is True:
# Dynamic model must have has_holdup = True
raise ConfigurationError(
"{} invalid arguments for dynamic and has_holdup. "
"If dynamic = True, has_holdup must also be True "
"(was False)".format(self.name))
def _get_property_package(self):
"""
This method gathers the necessary information about the property
package to be used in the control volume block.
If a property package has not been provided by the user, the method
searches up the model tree until it finds an object with the
'default_property_package' attribute and uses this package for the
control volume block.
The method also gathers any default construction arguments specified
for the property package and combines these with any arguments
specified by the user for the control volume block (user specified
arguments take priority over defaults).
Args:
None
Returns:
None
"""
# Get property_package block if not provided in arguments
parent = self.parent_block()
if self.config.property_package == useDefault:
# Try to get property_package from parent
try:
if parent.config.property_package in [None, useDefault]:
parent.config.property_package = \
self._get_default_prop_pack()
self.config.property_package = parent.config.property_package
except AttributeError:
self.config.property_package = self._get_default_prop_pack()
# Check for any flowsheet level build arguments
for k in self.config.property_package.config.default_arguments:
if k not in self.config.property_package_args:
self.config.property_package_args[k] = \
self.config.property_package.config.default_arguments[k]
def _get_default_prop_pack(self):
"""
This method is used to find a default property package defined at the
flowsheet level if a package is not provided as an argument when
instantiating the control volume block.
Args:
None
Returns:
None
"""
parent = self.flowsheet()
while True:
if parent is None:
raise ConfigurationError(
'{} no property package provided and '
'no default defined by parent flowsheet(s).'
.format(self.name))
elif parent.config.default_property_package is not None:
_log.info('{} Using default property package'
.format(self.name))
return parent.config.default_property_package
parent = parent.flowsheet()
def _get_indexing_sets(self):
"""
This method collects all necessary indexing sets from property
parameter block and makes references to these for use within the
control volume block. Collected indexing sets are phase_list and
component_list.
Args:
None
Returns:
None
"""
# Check for phase list(s)
if not hasattr(self.config.property_package, "phase_list"):
raise PropertyPackageError(
'{} property_package provided does not '
'contain a phase_list. '
'Please contact the developer of the property package.'
.format(self.name))
# Check for component list(s)
if not hasattr(self.config.property_package, "component_list"):
raise PropertyPackageError(
'{} property_package provided does not '
'contain a component_list. '
'Please contact the developer of the property package.'
.format(self.name))
def _get_reaction_package(self):
"""
This method gathers the necessary information about the reaction
package to be used in the control volume block (if required).
If a reaction package has been provided by the user, the method
gathers any default construction arguments specified
for the reaction package and combines these with any arguments
specified by the user for the control volume block (user specified
arguments take priority over defaults).
Args:
None
Returns:
None
"""
if self.config.reaction_package is not None:
# Check for any flowsheet level build arguments
for k in self.config.reaction_package.config.default_arguments:
if k not in self.config.reaction_package_args:
self.config.reaction_package_args[k] = \
self.config.reaction_package.config.default_arguments[k]
def _get_phase_comp_list(self):
"""
Method to collect phase-component list from property package.
If property package does not define a phase-component list, then it is
assumed that all components are present in all phases.
Args:
None
Returns:
phase_component_list
"""
# Get phase component list(s)
if hasattr(self.config.property_package, "phase_component_list"):
phase_component_list =\
self.config.property_package.phase_component_list
else:
# Otherwise assume all components in all phases
phase_component_list = {}
for p in self.config.property_package.phase_list:
phase_component_list[p] = self.config.property_package.\
component_list
return phase_component_list