import os
import traceback
from glob import glob
from numpy import min, max, mean, var, std, quantile, median
from pandas import DataFrame, concat
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from insolver.wrappers import InsolverGLMWrapper, InsolverGBMWrapper, InsolverRFWrapper, InsolverTrivialWrapper
[docs]
class ModelMetricsCompare:
"""Class for model comparison.
It will compute statistics and metrics for the regression task and metrics for the classification task.
You can compare created models with the `source` parameter or if `source` is `None` it use current working directory
as a source. If you want to create new models set the `create_models` parameter to True. If you already have source
parameter and set `create_models` parameter to `True`, new models will be added to the source list.
Parameters:
X (pd.DataFrame, pd.Series): Data for making predictions.
y (pd.DataFrame, pd.Series): Actual target values for X.
task (str, None): A task for models and metrics. If `task` new models will be created
Supports 'reg' and 'class'.
create_models (bool): If True, new models will be created and added to the comparison list.
source (str, list, tuple, None): List or tuple of insolver wrappers or path to the
folder with models. If `None`, taking current working directory as source.
metrics (list, tuple, callable, optional): Metrics or list of metrics to compute.
stats (list, tuple, callable, optional): Statistics or list of statistics to compute.
h2o_init_params (dict, optional): Parameters passed to `h2o.init()`, when `backend` == 'h2o'.
predict_params (list, optional): List of dictionaries containing parameters passed to predict methods
for each model.
features (list, optional): List of lists containing features for predict method for each model.
names (list, optional): List of model names.
"""
def __init__(
self,
X,
y,
task=None,
create_models=False,
source=None,
metrics=None,
stats=None,
h2o_init_params=None,
predict_params=None,
features=None,
names=None,
):
self.X = X
self.y = y
self.task = task
self.create_models = create_models
self.source = source
self.metrics = [] if metrics is None else metrics
self.stats = stats
self.h2o_init_params = h2o_init_params
self.predict_params = predict_params
self.features = features
self.names = names
self.stats_results, self.metrics_results = DataFrame(), DataFrame()
def __repr__(self):
stk = traceback.extract_stack()
if not ('IPython' in stk[-2][0] and 'info' == stk[-2][2]):
import IPython.display
if self.task == 'reg':
print('Model comparison statistics:')
IPython.display.display(self.stats_results)
print('\nModels comparison metrics:')
IPython.display.display(self.metrics_results)
else:
if self.task == 'reg':
print('Model comparison statistics:')
print(self.stats_results)
print('\nModels comparison metrics:')
print(self.metrics_results)
return ''
[docs]
def compare(self):
"""Compares models using initialized parameters.
If `self.create_models` == True, new models will be created and added to the source list.
Raises:
Exception: `task` parameter must be initialized and be `class` or `reg`.
"""
if self.task not in ['reg', 'class']:
raise Exception('Task must be "reg" or "class".')
if self.create_models:
self._init_new_models()
self._init_default_metrics()
self._init_source_models()
self._calc_metrics()
self.__repr__()
[docs]
def _init_new_models(self):
"""Initializes new models using the `task` parameter.
If `class` then Gradient Boosting model with the catboost backend and Random Forest with the sklearn backend
will be created.
If `reg` then Gradient Boosting model with the catboost backend, Random Forest with the sklearn backend and
Linear Model with the sklearn backend will be created.
This method uses train_test_split from sklearn.model_selection, fits models with train values and changes
`self.X`, `self.y` to test values. Thus, when calculating metrics, it will use test values.
"""
self.source = [] if self.source is None else self.source
models_dict = {}
if self.task == 'class':
models_dict = {
'new_gbm': InsolverGBMWrapper(backend='catboost', task='class', n_estimators=10),
'new_rf': InsolverRFWrapper(backend='sklearn', task='class'),
}
elif self.task == 'reg':
models_dict = {
'new_glm': InsolverGLMWrapper(backend='sklearn', family=0, standardize=True),
'new_gbm': InsolverGBMWrapper(backend='catboost', task='reg', n_estimators=10),
'new_rf': InsolverRFWrapper(backend='sklearn', task='reg'),
}
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y)
for model_name in models_dict:
_model = models_dict[model_name]
_model.fit(X_train, y_train)
_model.algo = model_name
self.source.insert(0, _model)
self.X, self.y = X_test, y_test
[docs]
def _init_default_metrics(self):
"""Initializes default metrics and adds them to the metrics list.
If `class` then accuracy score and f1 score will be added.
If `reg` then mean absolute error and r2 score will be added.
If `self.metrics` is callable it will be changed to the list type.
"""
if callable(self.metrics):
self.metrics = [self.metrics]
if self.task == 'class':
self.metrics.insert(0, accuracy_score)
self.metrics.insert(1, f1_score)
elif self.task == 'reg':
self.metrics.insert(0, mean_absolute_error)
self.metrics.insert(1, r2_score)
[docs]
def _init_source_models(self):
"""Initializes source models.
if `source` is `None` it use current working directory as a source.
Raises:
Exception: Models with the insolver name format were not found in the current working directory.
TypeError: Source type is not supported.
"""
assert (
True if self.names is None else len(self.names) == len(self.source)
), 'Check length of list containing model names.'
wrappers = {'glm': InsolverGLMWrapper, 'gbm': InsolverGBMWrapper, 'rf': InsolverRFWrapper}
if (self.source is None) or isinstance(self.source, str):
self.source = os.getcwd() if self.source is None else self.source
files = glob(os.path.join(self.source, '*'))
files = [file for file in files if os.path.basename(file).split('_')[0] == 'insolver']
if files:
model_list = []
for file in files:
algo, backend = os.path.basename(file).split('_')[1:3]
model_list.append(
wrappers[algo](backend=backend, load_path=file)
if backend != 'h2o'
else wrappers[algo](backend=backend, load_path=file, h2o_init_params=self.h2o_init_params)
)
self.models = model_list
else:
raise Exception('No models with the insolver name format found.')
elif isinstance(self.source, (list, tuple)):
self.models = self.source
else:
raise TypeError(f'Source of type {type(self.source)} is not supported.')
[docs]
def _calc_metrics(self):
"""Computes metrics and statistics for the models.
Raises:
TypeError: Statistics type are not supported.
TypeError: Metrics type are not supported.
Returns:
Returns `None`, but results available in `self.stats`, `self.metrics`.
"""
stats_df, model_metrics = DataFrame(), DataFrame()
algos, backend = [], []
trivial = InsolverTrivialWrapper(task='reg', agg=lambda x: x)
trivial.fit(self.X, self.y)
models = [trivial] + self.models
features = [None] + self.features if self.features is not None else None
for model in models:
algos.append(model.algo.upper()) if hasattr(model, 'algo') else algos.append('-')
(
backend.append(model.backend.capitalize())
if hasattr(model, 'backend')
else backend.append(model.__class__.__name__)
)
p = model.predict(
(
self.X
if (features is None) or (features[models.index(model)] is None)
else self.X[features[models.index(model)]]
),
**(
{}
if (self.predict_params is None) or (self.predict_params[models.index(model)] is None)
else self.predict_params[models.index(model)]
),
)
stats_val = [mean(p), var(p), std(p), min(p), quantile(p, 0.25), median(p), quantile(p, 0.75), max(p)]
name_stats = ['Mean', 'Variance', 'St. Dev.', 'Min', 'Q1', 'Median', 'Q3', 'Max']
if self.stats is not None:
if isinstance(self.stats, (list, tuple)):
for stat in self.stats:
if callable(stat):
stats_val.append(stat(p))
name_stats.append(stat.__name__.replace('_', ' '))
else:
raise TypeError(f'Statistics with type {type(stat)} are not supported.')
elif callable(self.stats):
stats_val.append(self.stats(p))
name_stats.append(self.stats.__name__.replace('_', ' '))
else:
raise TypeError(f'Statistics with type {type(self.stats)} are not supported.')
stats_df = concat([stats_df, DataFrame([stats_val], columns=name_stats)])
if (self.metrics is not None) and not models.index(model) == 0:
if isinstance(self.metrics, (list, tuple)):
m_metrics = []
for metric in self.metrics:
if callable(metric):
m_metrics.append(metric(self.y, p))
else:
raise TypeError(f'Metrics with type {type(metric)} are not supported.')
metrics_names = [m.__name__.replace('_', ' ') for m in self.metrics]
model_metrics = concat([model_metrics, DataFrame(dict(zip(metrics_names, m_metrics), index=[0]))])
else:
raise TypeError(f'Metrics with type {type(self.metrics)} are not supported.')
model_metrics = model_metrics.reset_index(drop=True)
model_metrics = (
model_metrics if 'index' not in model_metrics.columns else model_metrics.drop(['index'], axis=1)
)
model_metrics.index = list(range(len(model_metrics))) if self.names is None else self.names
stats_df.index = ['Actual'] + model_metrics.index.tolist()
stats_df[['Algo', 'Backend']] = DataFrame(
{'Algo': ['-'] + algos[1:], 'Backend': ['-'] + backend[1:]}, index=stats_df.index
)
model_metrics[['Algo', 'Backend']] = DataFrame(
{'Algo': algos[1:], 'Backend': backend[1:]}, index=model_metrics.index
)
stats_df = stats_df[list(stats_df.columns[-2:]) + list(stats_df.columns[:-2])]
model_metrics = model_metrics[list(model_metrics.columns[-2:]) + list(model_metrics.columns[:-2])]
self.stats_results = concat([self.stats_results, stats_df])
self.metrics_results = concat([self.metrics_results, model_metrics])