# Source code for idaes.surrogate.pysmo.polynomial_regression

#################################################################################
# The Institute for the Design of Advanced Energy Systems Integrated Platform
# Framework (IDAES IP) was produced under the DOE Institute for the
# Design of Advanced Energy Systems (IDAES), and is copyright (c) 2018-2021
# by the software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory,  National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia University
#
#################################################################################

# Imports from the python standard library
from __future__ import division

# from builtins import int, str
import os.path
import pprint
import random
import warnings

# Imports from third parties
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import pickle
from pyomo.environ import *
from pyomo.core.expr.visitor import replace_expressions
import scipy.optimize as opt
from scipy.special import comb as comb
from six import string_types

# Imports from IDAES namespace
from idaes.surrogate.pysmo.utils import NumpyEvaluator

__author__ = "Oluwamayowa Amusat"

"""
The purpose of this file is to perform polynomial regression in Pyomo.
This will be done in two stages. First, a sampling plan will
be used to select samples for generating a surrogate model.
In the second stage, the surrogate model is constructed by fitting to
different order polynomials. Long term, an iterative adaptive sampling
approach will be incorporated for model improvement.
Cross-validation is used to select the best model.

FeatureScaling:
Simple script for scaling and un-scaling the input data

Polynomial Regression:
Three approaches are implemented for evaluating polynomial coefficients -
a. Moore-Penrose maximum likelihood method (Forrester et al.)
b. Optimization using the BFGS algorithm.
c. Optimization with Pyomo
The Pyomo optimization approach is enabled as the default at this time.
"""

[docs]class FeatureScaling: """ A class for scaling and unscaling input and output data. The class contains two main methods: data_scaling and data_unscaling """ def __init__(self): pass
[docs] @staticmethod def data_scaling(data): """ data_scaling performs column-wise minimax scaling on the input dataset. Args: data : The input data set to be scaled. Must be a numpy array or dataframe. Returns: (tuple): tuple containing: - **scaled_data** : A 2-D Numpy Array containing the scaled data. All array values will be between [0, 1]. - **data_minimum** : A 2-D row vector containing the column-wise minimums of the input data. - **data_maximum** : A 2-D row vector containing the column-wise maximums of the input data. Raises: TypeError: Raised when the input data is not a numpy array or dataframe """ # Confirm that data type is an array or DataFrame if isinstance(data, np.ndarray): input_data = data data_headers = [] elif isinstance(data, pd.DataFrame): input_data = data.values data_headers = data.columns.values.tolist() else: raise TypeError( "original_data_input: Pandas dataframe or numpy array required." ) if input_data.ndim == 1: input_data = input_data.reshape(len(input_data), 1) data_minimum = np.min(input_data, axis=0) data_maximum = np.max(input_data, axis=0) scale = data_maximum - data_minimum scale[scale == 0.0] = 1.0 scaled_data = (input_data - data_minimum) / scale # scaled_data = (input_data - data_minimum)/(data_maximum - data_minimum) data_minimum = data_minimum.reshape(1, data_minimum.shape[0]) data_maximum = data_maximum.reshape(1, data_maximum.shape[0]) if len(data_headers) > 0: scaled_data = pd.DataFrame(scaled_data, columns=data_headers) return scaled_data, data_minimum, data_maximum
[docs] @staticmethod def data_unscaling(x_scaled, x_min, x_max): """ data_unscaling performs column-wise un-scaling on the a minmax-scaled input dataset. Args: x_scaled (NumPy Array) : Data to be un-scaled. Data values should be between 0 and 1. x_min (NumPy vector) : :math:n \\times 1 vector containing the actual minimum value for each column. Must contain same number of elements as the number of columns in x_scaled. x_max (NumPy vector) : :math:n \\times 1 vector vector containing the actual minimum value for each column. Must contain same number of elements as the number of columns in x_scaled. Returns: NumPy Array : A 2-D numpy array containing the scaled data, :math:x_{min} + x_{scaled} * (x_{max} - x_{min}) Raises: IndexError: Raised when the dimensions of the arrays are inconsistent. """ # Check if it can be evaluated. Will return index error if dimensions are wrong if x_scaled.ndim == 1: # Check if 1D, and convert to 2D if required. x_scaled = x_scaled.reshape(len(x_scaled), 1) if (x_scaled.shape[1] != x_min.size) or (x_scaled.shape[1] != x_max.size): raise IndexError("Dimensionality problems with data for un-scaling.") unscaled_data = x_min + x_scaled * (x_max - x_min) return unscaled_data
[docs]class PolynomialRegression: """ The PolynomialRegression class performs polynomial regression on a training data set. The class must first be initialized by calling PolynomialRegression. Regression is then carried out by calling training. For a given dataset with :math:n features :math:x_{1},x_{2},\ldots,x_{n}, Polyregression is able to consider three types of basis functions: (a) Mononomial terms (:math:x_{i}^{p},p \leq 10) for all individual features. The maximum degree to be considered can be set by the user (**maximum_polynomial_order**) (b) All first order interaction terms :math:x_{1}x_{2}, :math:x_{1}x_{3} etc. This can be turned on or off by the user (set **multinomials**) (c) User defined input features, e.g. :math:\sin(x_{1}). These must be Pyomo functions and should be provided as a list by the user calling set_additional_terms method before the polynomial training is done. **Example:** .. code-block:: python # Initialize the class and set additional terms >>> d = PolynomialRegression(full_data, training_data, maximum_polynomial_order=2, max_iter=20, multinomials=1, solution_method='pyomo') >>> p = d.get_feature_vector() >>> d.set_additional_terms([...extra terms...]) # Train polynomial model and predict output for an test data x_test >>> d.training() >>> predictions = d.predict_output(x_test) Args: regression_data_input(NumPy Array of Pandas Dataframe) : The dataset for regression training. It is expected to contain the features (X) and output (Y) data, with the output values (Y) in the last column. original_data_input(NumPy Array of Pandas Dataframe) : If **regression_data_input** was drawn from a larger dataset by some sampling approach, the larger dataset may be provided here. When additional data is not available, the same data supplied for training_data can be supplied - this tells the algorithm not to carry out adaptive sampling. maximum_polynomial_order(int): The maximum polynomial order to be considered. Further details about the optional inputs may be found under the __init__ method. """
[docs] def get_feature_vector(self): """ The get_feature_vector method generates the list of regression features from the column headers of the input dataset. Returns: Pyomo IndexedParam : An indexed parameter list of the variables supplied in the original data **Example:** .. code-block:: python # Create a small dataframe with three columns ('one', 'two', 'three') and two rows (A, B) >>> xy_data = pd.DataFrame.from_items([('A', [1, 2, 3]), ('B', [4, 5, 6])], orient='index', columns=['one', 'two', 'three']) # Initialize the **PolynomialRegression** class and print the column headers for the variables >>> f = PolynomialRegression(xy_data, xy_data, maximum_polynomial_order=1, multinomials=True, training_split=0.8) >>> p = f.get_feature_vector() >>> for i in p.keys(): >>> print(i) one two """ p = Param(self.regression_data_columns, mutable=True, initialize=0) p.index_set().construct() p.construct() self.feature_list = p return p
[docs] def set_additional_terms(self, term_list): """ set_additional_terms accepts additional user-defined features for consideration during regression. Args: term_list (list) : List of additional terms to be considered as regression features. Each term in the list must be a Pyomo-supported intrinsic function. **Example:** .. code-block:: python # To add the sine and cosine of a variable with header 'X1' in the dataset as additional regression features: >>> xy_data = pd.DataFrame.from_items([('A', [1, 2, 3]), ('B', [4, 5, 6])], orient='index', columns=['X1', 'X2', 'Y']) >>> A = PolynomialRegression(xy_data, xy_data, maximum_polynomial_order=5) >>> p = A.get_feature_vector() >>> A.set_additional_terms([ pyo.sin(p['X1']) , pyo.cos(p['X1']) ]) """ self.additional_term_expressions = term_list
[docs] def training(self): """ The training method trains a polynomial model to an input dataset. It calls the core method which is called in the PolynomialRegression class (polynomial_regression_fitting). It accepts no user input, inheriting the information passed in class initialization. Returns: tuple : Python Object (**results**) containing the results of the polynomial regression process including: - the polynomial order (**self.final_polynomial_order**) - polynomial coefficients (**self.optimal_weights_array**), and - MAE and MSE errors as well as the :math:R^{2} (**results.errors**). """ cMap = ComponentMap() for i, col in enumerate(self.regression_data_columns): cMap[self.feature_list[col]] = self.regression_data[:, i] npe = NumpyEvaluator(cMap) additional_data = list( npe.walk_expression(term) for term in self.additional_term_expressions ) return self.polynomial_regression_fitting(additional_data)
[docs] def generate_expression(self, variable_list): """ The generate_expression method returns the Pyomo expression for the polynomial model trained. The expression is constructed based on a supplied list of variables **variable_list** and the output of training. Args: variable_list(list) : List of input variables to be used in generating expression. This can be the a list generated from the results of get_feature_vector. The user can also choose to supply a new list of the appropriate length. Returns: Pyomo Expression : Pyomo expression of the polynomial model based on the variables provided in **variable_list**. """ terms = PolynomialRegression.polygeneration( self.final_polynomial_order, self.multinomials, np.array([variable_list]) ).transpose() n = len(terms) ans = sum( w * t for w, t in zip( np.nditer(self.optimal_weights_array), np.nditer(terms, flags=["refs_ok"]), ) ) user_term_map = dict( (id(a), b) for a, b in zip( self.extra_terms_feature_vector, variable_list, ) ) if len(self.additional_term_expressions) > 0: for w, expr in zip( np.nditer(self.optimal_weights_array[n:]), self.additional_term_expressions, ): ans += float(w) * replace_expressions(expr, user_term_map) return ans
[docs] def predict_output(self, x_data): """ The predict_output method generates output predictions for input data x_data based a previously generated polynomial fitting. Args: x_data : Numpy array of designs for which the output is to be evaluated/predicted. Returns: Numpy Array : Output variable predictions based on the polynomial fit. """ nf = x_data.shape[1] x_list = [i for i in range(0, nf)] import pyomo.environ as aml m = aml.ConcreteModel() i = aml.Set(initialize=x_list) m.xx = aml.Var(i) m.o2 = aml.Objective(expr=self.generate_expression([m.xx[i] for i in x_list])) y_eq = np.zeros((x_data.shape[0], 1)) for j in range(0, x_data.shape[0]): for i in x_list: m.xx[i] = x_data[j, i] y_eq[j, 0] = aml.value(m.o2([m.xx[i] for i in x_list])) return y_eq
[docs] def confint_regression(self, confidence=0.95): """ The confint_regression method prints the confidence intervals for the regression patamaters. Args: confidence : Required confidence interval level, default = 0.95 (95%) """ from scipy.stats import t data = self.final_training_data y_pred = self.predict_output(data[:, :-1]) dof = ( data.shape[0] - len(self.optimal_weights_array) + 1 ) # be careful when there are additional features ssr = np.sum((data[:, -1] - y_pred[:, 0]) ** 2) sig_sq = ssr / dof if (self.additional_features_data is None) or ( len(self.additional_features_data) == 0 ): x_exp = self.polygeneration( self.final_polynomial_order, self.multinomials, data[:, :-1] ) # will not account for additional features else: x_exp = self.polygeneration( self.final_polynomial_order, self.multinomials, data[:, :-1], additional_x_training_data=self.additional_features_data, ) covar = sig_sq * np.linalg.pinv(x_exp.transpose() @ x_exp) ss_reg_params = np.sqrt( np.diag(covar) ) # standard error for each regression parameter t_dist = t.ppf( (1 + confidence) / 2, dof ) # alternatively, t_dist_data = st.t.interval(0.99, 8) # Evaluate confidence intervals, Tabulate and print results c_data = np.zeros((self.optimal_weights_array.shape[0], 4)) c_data[:, 0] = self.optimal_weights_array[:, 0] c_data[:, 1] = ss_reg_params[ :, ] c_data[:, 2] = ( self.optimal_weights_array[:, 0] - t_dist * ss_reg_params[ :, ] ) c_data[:, 3] = ( self.optimal_weights_array[:, 0] + t_dist * ss_reg_params[ :, ] ) headers = [ "Regression coeff.", "Std. error", "Conf. int. lower", "Conf. int. upper", ] c_data_df = pd.DataFrame(data=c_data, columns=headers) print(c_data_df) return c_data_df