Source code for idaes.apps.grid_integration.pricetaker.price_taker_model

#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES).
#
# Copyright (c) 2018-2023 by the software owners: The Regents of the
# University of California, through Lawrence Berkeley National Laboratory,
# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon
# University, West Virginia University Research Corporation, et al.
# All rights reserved.  Please see the files COPYRIGHT.md and LICENSE.md
# for full copyright and license information.
#################################################################################

from typing import Optional, Union, Callable, List
import pandas as pd
from pyomo.environ import (
    ConcreteModel,
    Block,
    Var,
    Param,
    RangeSet,
    Objective,
    Constraint,
    NonNegativeReals,
    Expression,
    maximize,
)
from pyomo.common.config import (
    ConfigDict,
    ConfigValue,
    NonNegativeInt,
    NonNegativeFloat,
    ListOf,
    PositiveInt,
)

from idaes.apps.grid_integration.pricetaker.design_and_operation_models import (
    DesignModelData,
)
from idaes.apps.grid_integration.pricetaker.clustering import (
    generate_daily_data,
    cluster_lmp_data,
    get_optimal_num_clusters,
)
from idaes.apps.grid_integration.pricetaker.unit_commitment import (
    UnitCommitmentData,
    capacity_limits,
    ramping_limits,
    startup_shutdown_constraints,
)
from idaes.core.util.config import ConfigurationError, is_in_range
import idaes.logger as idaeslog

_logger = idaeslog.getLogger(__name__)


# Defining a ConfigDict to simplify the domain validation of
# arguments needed for all the methods of the PriceTakerModel class
CONFIG = ConfigDict()

# List of arguments needed for the `append_lmp_data` method
CONFIG.declare(
    "seed",
    ConfigValue(
        default=20,
        domain=NonNegativeInt,
        doc="Seed value for clustering techniques",
    ),
)
CONFIG.declare(
    "horizon_length",
    ConfigValue(
        domain=PositiveInt,
        doc="Length of each representative day/period",
    ),
)
CONFIG.declare(
    "num_clusters_range",
    ConfigValue(
        default=(5, 30),
        domain=ListOf(int, PositiveInt),
        doc="Range of number of clusters for generating elbow plot",
    ),
)
CONFIG.declare(
    "num_clusters",
    ConfigValue(
        domain=PositiveInt,
        doc="Number of clusters/representaive days/periods",
    ),
)
CONFIG.declare(
    "lmp_data",
    ConfigValue(
        domain=ListOf(float),
        doc="List containing locational marginal prices (LMP)",
    ),
)

# List of arguments needed for adding startup/shutdown constraints
CONFIG.declare(
    "up_time",
    ConfigValue(
        domain=PositiveInt,
        doc="Minimum uptime [in hours]",
    ),
)
CONFIG.declare(
    "down_time",
    ConfigValue(
        domain=PositiveInt,
        doc="Minimum downtime [in hours]",
    ),
)

# List of arguments for NPV calculation
CONFIG.declare(
    "lifetime",
    ConfigValue(
        domain=PositiveInt,
        doc="Total lifetime of the system [in years]",
    ),
)
CONFIG.declare(
    "discount_rate",
    ConfigValue(
        domain=is_in_range(0, 1),
        doc="Discount rate for annualization [fraction]",
    ),
)
CONFIG.declare(
    "corporate_tax_rate",
    ConfigValue(
        domain=is_in_range(0, 1),
        doc="Effective corporate tax rate [fraction]",
    ),
)
CONFIG.declare(
    "annualization_factor",
    ConfigValue(
        domain=is_in_range(0, 1),
        doc="Capital cost annualization factor [fraction]",
    ),
)
CONFIG.declare(
    "cash_inflow_scale_factor",
    ConfigValue(
        domain=NonNegativeFloat,
        doc="Scaling factor for net cash inflow calculations",
    ),
)


# pylint: disable = too-many-ancestors, too-many-instance-attributes
[docs] class PriceTakerModel(ConcreteModel): """Builds a price-taker model for a given system""" def __init__(self, *args, **kwds): super().__init__(*args, **kwds) self._config = CONFIG() self._has_hourly_cashflows = False self._has_overall_cashflows = False self._op_blk_uc_data = {} # Save UC data for op. blocks self._op_blk_uptime_downtime = {} self._linking_constraint_counter = 1 @property def num_representative_days(self): """Returns the number of representative days""" return self._config.num_clusters @num_representative_days.setter def num_representative_days(self, value): """Setter for the num_representative_days property""" if self._config.num_clusters is not None: raise ConfigurationError( f"num_representative_days is already defined as " f"{self.num_representative_days} and it cannot be overwritten." f"\n\tInstantiate a new PriceTakerModel object." ) self._config.num_clusters = value @property def horizon_length(self): """Returns the length of each representative day""" if self._config.horizon_length is not None: return self._config.horizon_length _logger.warning( "Attribute horizon_length is not specified. Using 24 hours " "as the horizon length." ) return 24 @horizon_length.setter def horizon_length(self, value): """Setter for the horizon_length property""" if self._config.horizon_length is not None: raise ConfigurationError( f"horizon_length is already defined as {self.horizon_length} and " f"it cannot be overwritten.\n\tInstantiate a new PriceTakerModel object." ) self._config.horizon_length = value def _assert_lmp_data_exists(self): """Raise an error if LMP data does not exist""" if self._config.lmp_data is None: raise ConfigurationError( "LMP data is missing. Please append the LMP data using the " "`append_lmp_data` method." )
[docs] def append_lmp_data( self, lmp_data: Union[list, pd.DataFrame, pd.Series], num_representative_days: Optional[int] = None, horizon_length: Optional[int] = None, seed: Optional[int] = 42, ): """ Appends the locational marginal price (LMP) data to the PriceTakerModel object. If desired, the method can cluster the data into representative periods before appending the data. Args: lmp_data: Union[list, tuple, pd.DataFrame] List of locational marginal prices num_representative_days: Optional[int], default=None number of clusters or representative periods. horizon_length: Optional[int], default=None Length of each representative period seed: Optional[int], default=42 Seed value for k-means clustering technique """ # Perform domain validation (ConfigDict performs the validation) if self._config.lmp_data is None: assert len(lmp_data) >= 2 # Do not remove this check! self._config.lmp_data = lmp_data self._config.num_clusters = num_representative_days self._config.horizon_length = horizon_length self._config.seed = seed else: # We do not allow modification of lmp data after it is defined raise ConfigurationError( "Attempted to overwrite the LMP data. Instantiate a " "new PriceTakerModel object to change the LMP data." )
[docs] def get_optimal_representative_days( self, kmin: int = 4, kmax: int = 30, method: str = "silhouette", generate_elbow_plot: bool = True, ): """ Returns the optimal number of representative days for the given price signal. """ self._assert_lmp_data_exists() # Domain validation of kmin and kmax self._config.num_clusters_range = (kmin, kmax) daily_data = generate_daily_data(self._config.lmp_data, self.horizon_length) return get_optimal_num_clusters( daily_data, kmin, kmax, method, generate_elbow_plot, self._config.seed )
def _assert_mp_model_exists(self): """Raise an error if the multiperiod model does not exist""" if not hasattr(self, "period"): raise ConfigurationError( "Unable to find the multiperiod model. Please use the " "build_multiperiod_model method to construct one." )
[docs] def build_multiperiod_model( self, flowsheet_func: Callable, flowsheet_options: Optional[dict] = None, ): """ Builds the multiperiod model. Args: flowsheet_func : Callable A function that returns an instance of the flowsheet flowsheet_options : dict, Optional arguments needed for `flowsheet_func` """ self._assert_lmp_data_exists() if hasattr(self, "period"): # Object may contain a multiperiod model. so raise an error raise ConfigurationError( "A multiperiod model might already exist, as the object has " "`period` attribute." ) lmp_data = self._config.lmp_data # pylint: disable=attribute-defined-outside-init if self.num_representative_days is not None: # Need to use representative days rep_days_lmp, rep_days_weights = cluster_lmp_data( raw_data=lmp_data, horizon_length=self.horizon_length, n_clusters=self.num_representative_days, seed=self._config.seed, ) else: # Use full year price signal self.num_representative_days = 1 self.horizon_length = len(lmp_data) rep_days_lmp = {1: {t + 1: val for t, val in enumerate(lmp_data)}} rep_days_weights = {1: 1} self.set_days = RangeSet(self.num_representative_days) self.set_time = RangeSet(self.horizon_length) self.rep_days_lmp = rep_days_lmp self.rep_days_weights = rep_days_weights flowsheet_blk = ConcreteModel() if flowsheet_options is None: flowsheet_options = {} flowsheet_func(flowsheet_blk, **flowsheet_options) # Based on some earlier testing, it is faster to clone the model # than to build it from scratch. So, instead of using the `rule` # argument, we are cloning the model and then transferring the # attributes to the `period` block. self.period = Block(self.set_days, self.set_time) for d, t in self.period: self.period[d, t].transfer_attributes_from(flowsheet_blk.clone()) # If the flowsheet model has LMP defined, update it. if hasattr(self.period[d, t], "LMP"): self.period[d, t].LMP = self.rep_days_lmp[d][t] # Iterate through model to append LMP data if it's been defined for blk in self.period[d, t].component_data_objects(Block): if hasattr(blk, "LMP"): blk.LMP = self.rep_days_lmp[d][t]
def _get_operation_blocks( self, blk_name: str, attribute_list: list, ): """ Returns a dictionary of operational blocks named 'blk_name'. In addition, it also checks the existence of the operational blocks, and the existence of specified attributes. """ # Ensure that the multiperiod model exists self._assert_mp_model_exists() # pylint: disable=not-an-iterable op_blocks = { d: {t: self.period[d, t].find_component(blk_name) for t in self.set_time} for d in self.set_days } # NOTE: It is sufficient to perform checks only for one block, because # the rest of them are clones. blk = op_blocks[1][1] # This object always exists # First, check for the existence of the operational block if blk is None: raise AttributeError(f"Operational block {blk_name} does not exist.") # Next, check for the existence of attributes. for attribute in attribute_list: if not hasattr(blk, attribute): raise AttributeError( f"Required attribute {attribute} is not found in " f"the operational block {blk_name}." ) return op_blocks def _get_operation_vars(self, var_name: str): """ Returns a dictionary of pointers to the var_name variable located in each flowsheet instance. If the variable is not present, then an error is raised. """ # Ensure that the multiperiod model exists self._assert_mp_model_exists() # pylint: disable=not-an-iterable op_vars = { d: {t: self.period[d, t].find_component(var_name) for t in self.set_time} for d in self.set_days } # NOTE: It is sufficient to perform checks only for one variable if op_vars[1][1] is None: raise AttributeError( f"Variable {var_name} does not exist in the multiperiod model." ) return op_vars
[docs] def add_linking_constraints(self, previous_time_var: str, current_time_var: str): """ Adds constraints to relate variables across two consecutive time periods. This method is usually needed if the system has storage. Using this method, the holdup at the end of the previous time period can be equated to the holdup at the beginning of the current time period. Args: previous_time_var : str, Name of the operational variable at the end of the previous time step current_time_var : str, Name of the operational variable at the beginning of the current time step """ old_time_var = self._get_operation_vars(previous_time_var) new_time_var = self._get_operation_vars(current_time_var) def _rule_linking_constraints(_, d, t): if t == 1: return Constraint.Skip return old_time_var[d][t - 1] == new_time_var[d][t] setattr( self, "variable_linking_constraints_" + str(self._linking_constraint_counter), Constraint(self.set_days, self.set_time, rule=_rule_linking_constraints), ) _logger.info( f"Linking constraints are added to the model at " f"variable_linking_constraints_{self._linking_constraint_counter}" ) self._linking_constraint_counter += 1 # Increase the counter for the new pair
def _retrieve_uc_data(self, op_blk: str, commodity: str, **kwargs): """ Retrieves unit commitment data for the given operation block if it exists. Otherwise, it creates a new object and returns the object containing the unit commitment data """ if (op_blk, commodity) in self._op_blk_uc_data: self._op_blk_uc_data[op_blk, commodity].update(**kwargs) else: self._op_blk_uc_data[op_blk, commodity] = UnitCommitmentData( blk_name=op_blk, commodity_name=commodity, **kwargs ) return self._op_blk_uc_data[op_blk, commodity]
[docs] def add_capacity_limits( self, op_block_name: str, commodity: str, capacity: Union[float, Param, Var, Expression], op_range_lb: float, ): """ Adds capacity limit constraints of the form: op_range_lb * capacity * op_mode(t) <= commodity(t) <= capacity * op_mode(t) ex: 0.3 * P_max * op_mode(t) <= P(t) <= P_max * op_mode(t), where P(t) is power at time t and op_mode(t) = 1 if the system is operating at time t; and op_mode(t) = 0, otherwise. Args: op_block_name: str, Name of the operation model block, ex: ("fs.ngcc") commodity: str, Name of the commodity on the model the capacity constraints will be applied to, ex: ("total_power") capacity: float, or Pyomo Var, Param, or Expression Maximum capacity on the commodity, ex: (650.0, or m.min_power_capacity) op_range_lb: float, Ratio of the capacity at minimum stable operation to the maximum capacity. Must be a number in the interval [0, 1] """ # _get_operation_blocks method ensures that the operation block exists, and # the commodity variable also exists. op_blocks = self._get_operation_blocks( blk_name=op_block_name, attribute_list=["op_mode", commodity] ) # This method performs all the necessary data checks uc_data = self._retrieve_uc_data( op_blk=op_block_name, commodity=commodity, capacity=capacity, op_range_lb=op_range_lb, ) # Create a block for storing capacity limit constraints cap_limit_blk_name = op_block_name.split(".")[-1] + f"_{commodity}_limits" if hasattr(self, cap_limit_blk_name): raise ConfigurationError( f"Attempting to overwrite capacity limits for {commodity} in {op_block_name}." ) setattr(self, cap_limit_blk_name, Block(self.set_days)) cap_limit_blk = getattr(self, cap_limit_blk_name) # pylint: disable = not-an-iterable for d in self.set_days: capacity_limits( blk=cap_limit_blk[d], op_blocks=op_blocks[d], uc_data=uc_data, set_time=self.set_time, ) # Logger info for where constraint is located on the model _logger.info( f"Created capacity limit constraints for commodity {commodity} in " f"operation block {op_block_name} at {cap_limit_blk.name}" )
[docs] def add_ramping_limits( self, op_block_name: str, commodity: str, capacity: Union[float, Param, Var, Expression], startup_rate: float, shutdown_rate: float, rampup_rate: float, rampdown_rate: float, ): """ Adds ramping constraints of the form: ramping_var[t] - ramping_var[t-1] <= startup_rate * capacity * startup[t] + rampup_rate * capacity * op_mode[t-1]; ramping_var[t-1] - ramping_var[t] <= shutdown_rate * capacity * shutdown[t] + rampdown_rate * capacity * op_mode[t] Args: op_block_name: str, Name of the operation model block, e.g., ("fs.ngcc") commodity: str, Name of the variable that the ramping constraints will be applied to, e.g., "power" capacity: float, or Pyomo Var, or Param, or Expression String of the name of the entity on the model the ramping constraints will be applied to, ex: ("total_power") startup_rate: float, Fraction of the maximum capacity that variable ramping_var can increase during startup (between 0 and 1) shutdown_rate: float, Fraction of the maximum capacity that variable ramping_var can decrease during shutdown (between 0 and 1) rampup_rate: float, Fraction of the maximum capacity that variable ramping_var can increase during operation (between 0 and 1) rampdown_rate: float, Fraction of the maximum capacity that variable ramping_var can decrease during operation (between 0 and 1) """ # Get operational blocks op_blocks = self._get_operation_blocks( blk_name=op_block_name, attribute_list=["op_mode", "startup", "shutdown", commodity], ) # Perform all the necessary datachecks uc_data = self._retrieve_uc_data( op_blk=op_block_name, commodity=commodity, capacity=capacity, startup_rate=startup_rate, shutdown_rate=shutdown_rate, rampup_rate=rampup_rate, rampdown_rate=rampdown_rate, ) uc_data.assert_ramping_args_present() # Creating the pyomo block ramp_blk_name = op_block_name.split(".")[-1] + f"_{commodity}_ramping" if hasattr(self, ramp_blk_name): raise ConfigurationError( f"Attempting to overwrite ramping limits for {commodity} in {op_block_name}." ) setattr(self, ramp_blk_name, Block(self.set_days)) ramp_blk = getattr(self, ramp_blk_name) # pylint: disable=not-an-iterable for d in self.set_days: ramping_limits( blk=ramp_blk[d], op_blocks=op_blocks[d], uc_data=uc_data, set_time=self.set_time, ) # Logger info for where constraint is located on the model _logger.info( f"Created ramping constraints for variable {commodity} " f"on operational block {op_block_name} at {ramp_blk.name}" )
[docs] def add_startup_shutdown( self, op_block_name: str, des_block_name: Optional[str] = None, up_time: int = 1, down_time: int = 1, ): """ Adds minimum uptime/downtime constraints for a given unit/process Args: op_block_name: str, Name of the operation model block, e.g., "fs.ngcc" des_block_name: str, default=None, Name of the design model block for the operation block op_block_name. This argument is specified if the design is being optimized simultaneously, e.g., "ngcc_design" up_time: int, default=1, Total uptime (must be >= 1), e.g., 4 time periods Uptime must include the minimum uptime and the time required for shutdown. down_time: int, default=1, Total downtime (must be >= 1), e.g., 4 time periods Downtime must include the minimum downtime and the time required for startup """ # Check up_time and down_time for validity self.config.up_time = up_time self.config.down_time = down_time op_blocks = self._get_operation_blocks( blk_name=op_block_name, attribute_list=["op_mode", "startup", "shutdown"], ) if des_block_name is not None: install_unit = self.find_component(des_block_name + ".install_unit") if install_unit is None: raise AttributeError( f"Binary variable associated with unit installation is not found " f"in {des_block_name}. \n\tDo not specify des_block_name argument if " f"installation of the unit is not a decision variable." ) else: install_unit = 1 start_shut_blk_name = op_block_name.split(".")[-1] + "_startup_shutdown" if hasattr(self, start_shut_blk_name): raise ConfigurationError( f"Attempting to overwrite startup/shutdown constraints " f"for operation block {op_block_name}." ) setattr(self, start_shut_blk_name, Block(self.set_days)) start_shut_blk = getattr(self, start_shut_blk_name) # pylint: disable=not-an-iterable for d in self.set_days: startup_shutdown_constraints( blk=start_shut_blk[d], op_blocks=op_blocks[d], install_unit=install_unit, up_time=up_time, down_time=down_time, set_time=self.set_time, ) # Save the uptime and downtime data for reference self._op_blk_uptime_downtime[op_block_name] = { "up_time": up_time, "down_time": down_time, } # Logger info for where constraint is located on the model _logger.info( f"Created startup/shutdown constraints for operation model " f" {op_block_name} at {start_shut_blk.name}." )
[docs] def add_hourly_cashflows( self, revenue_streams: Optional[List] = None, operational_costs: Optional[List] = None, ): """ Adds an expression for the net cash inflow for each flowsheet instance. Operational costs in each flowsheet instance may include 'non_fuel_vom' (non-fuel variable operating costs), 'fuel_cost' (cost of fuel), and 'carbon_price' (cost associated with producing carbon; i.e., a carbon tax). Revenue streams in each flowsheet instance may include the revenue from the wholesale electricity market, revenue from alternative products (e.g., hydrogen), etc. The net cash inflow is calculated as: Sum(revenue streams) - Sum(costs) for each time period. Args: revenue_streams: List of strings representing the names of the revenue streams, default: None Example: :: ['elec_revenue', 'H2_revenue', ] costs: List of strings representing the names of the costs associated with operating at a time period. default: None Example: :: ['hourly_fixed_cost', 'electricity_cost',] """ # Ensure that multiperiod model exists self._assert_mp_model_exists() if operational_costs is None: _logger.warning( "Argument operational_costs is not specified, so the total " "operational cost will be set to 0." ) operational_costs = [] if revenue_streams is None: _logger.warning( "Argument revenue_streams is not specified, so the total " "revenue will be set to 0." ) revenue_streams = [] for p in self.period: # Set net profit contribution expressions to 0 total_cost_expr = 0 total_revenue_expr = 0 for blk in self.period[p].component_data_objects(Block): # Add costs for each block. If more than one block, may have # costs that exist on one block and not on another. (i.e., coproduction) for cost in operational_costs: curr_cost = blk.find_component(cost) total_cost_expr += curr_cost if curr_cost is not None else 0 # Add revenue streams for each block. If more than one block, may have # revenues that exist on one block and not on another. (i.e., coproduction) for revenue in revenue_streams: curr_rev = blk.find_component(revenue) total_revenue_expr += curr_rev if curr_rev is not None else 0 # Account for flowsheet-level operational_costs for cost in operational_costs: curr_cost = self.period[p].find_component(cost) total_cost_expr += curr_cost if curr_cost is not None else 0 # Account for flowsheet-level revenue_streams for revenue in revenue_streams: curr_rev = self.period[p].find_component(revenue) total_revenue_expr += curr_rev if curr_rev is not None else 0 # Set total cost expression self.period[p].total_hourly_cost = Expression(expr=total_cost_expr) # Set total revenue expression self.period[p].total_hourly_revenue = Expression(expr=total_revenue_expr) # Set net cash inflow expression self.period[p].net_hourly_cash_inflow = Expression( expr=self.period[p].total_hourly_revenue - self.period[p].total_hourly_cost ) # Logger info for where constraint is located on the model self._has_hourly_cashflows = True _logger.info( "Created hourly cashflow expressions at:\n\t period[d, t].total_hourly_cost, " "period[d, t].total_hourly_revenue, period[d, t].net_hourly_cash_inflow." )
[docs] def add_overall_cashflows( self, lifetime: int = 30, discount_rate: float = 0.08, corporate_tax_rate: float = 0.2, annualization_factor: Optional[float] = None, cash_inflow_scale_factor: Optional[float] = 1.0, ): """ Builds overall cashflow expressions. Args: lifetime: int, default=30, Lifetime of the unit/process [in years] discount_rate: float, default=0.08, Discount rate [fraction] for calculating the current value of the cashflow. It is also used to compute the annualization factor. Must be between 0 and 1. corporate_tax_rate: float, default=0.2, Fractional value of corporate tax used in NPV calculations. Must be between 0 and 1. annualization_factor: float, default=None, Annualization factor cash_inflow_scale_factor: float, default=1.0, Scaling factor for the sum of net hourly cashflows. If the specified price signal is for the full year, then the value of this argument must be 1.0. However, if the specified price signal is for a short period, say 1 month, then an appropriate scaling factor can be specified to compute the value for a year (12, for the case of 1 month's price signal). """ # Ensure that multiperiod model exists self._assert_mp_model_exists() # Domain validation of input arguments self.config.lifetime = lifetime self.config.discount_rate = discount_rate self.config.corporate_tax = corporate_tax_rate self.config.annualization_factor = annualization_factor self.config.cash_inflow_scale_factor = cash_inflow_scale_factor if not self._has_hourly_cashflows: raise ConfigurationError( "Hourly cashflows are not added to the model. Please run " "add_hourly_cashflows method before calling the " "add_overall_cashflows method." ) capex_expr = 0 fom_expr = 0 count_des_blks = 0 for blk in self.component_data_objects(Block): if isinstance(blk, DesignModelData): count_des_blks += 1 capex_expr += blk.capex fom_expr += blk.fom if count_des_blks == 0: _logger.warning( "No design blocks were found, so the overall capital cost " "(capex) and the fixed O&M cost (fom) are set to 0." ) # pylint: disable=attribute-defined-outside-init self.cashflows = Block() cf = self.cashflows cf.capex = Var(within=NonNegativeReals, doc="Total CAPEX") cf.capex_calculation = Constraint(expr=cf.capex == capex_expr) cf.fom = Var(within=NonNegativeReals, doc="Yearly Fixed O&M Cost") cf.fom_calculation = Constraint(expr=cf.fom == fom_expr) cf.depreciation = Var(within=NonNegativeReals, doc="Yearly depreciation") cf.depreciation_calculation = Constraint( expr=cf.depreciation == cf.capex / lifetime ) cf.net_cash_inflow = Var(doc="Net cash inflow") cf.net_cash_inflow_calculation = Constraint( expr=cf.net_cash_inflow == cash_inflow_scale_factor * sum( self.rep_days_weights[d] * self.period[d, t].net_hourly_cash_inflow for d, t in self.period ) ) cf.corporate_tax = Var(within=NonNegativeReals, doc="Corporate tax") cf.corporate_tax_calculation = Constraint( expr=cf.corporate_tax >= corporate_tax_rate * (cf.net_cash_inflow - cf.fom - cf.depreciation) ) cf.net_profit = Var(doc="Net profit after taxes") cf.net_profit_calculation = Constraint( expr=cf.net_profit == cf.net_cash_inflow - cf.fom - cf.corporate_tax ) if annualization_factor is None: # If the annualization factor is not specified annualization_factor = discount_rate / ( 1 - (1 + discount_rate) ** (-lifetime) ) cf.lifetime_npv = Expression( expr=(1 / annualization_factor) * cf.net_profit - cf.capex ) cf.npv = Expression( expr=cf.net_profit - annualization_factor * cf.capex, ) self._has_overall_cashflows = True _logger.info(f"Overall cashflows are added to the block {cf.name}")
[docs] def add_objective_function(self, objective_type="npv"): """ Appends the objective function to the model. Args: objective_type: str, default="npv", Supported objective functions are annualized net present value ("npv"), net profit ("net_profit"), and lifetime net present value ("lifetime_npv"). """ # pylint: disable = attribute-defined-outside-init if not self._has_overall_cashflows: raise ConfigurationError( "Overall cashflows are not appended. Please run the " "add_overall_cashflows method." ) try: self.obj = Objective( expr=getattr(self.cashflows, objective_type), sense=maximize, ) except AttributeError as msg: raise ConfigurationError( f"{objective_type} is not a supported objective function." f"Please specify either npv, or lifetime_npv, or net_profit " f"as the objective_type." ) from msg