Source code for insolver.wrappers.gbm

from base64 import b64encode

from numpy import cumsum, diff, exp, true_divide, add, append, nan, concatenate, array, abs as npabs
from pandas import DataFrame, Series

from sklearn.metrics import mean_squared_error
from sklearn.metrics._scorer import _SCORERS
from xgboost import XGBClassifier, XGBRegressor
from lightgbm import LGBMClassifier, LGBMRegressor
from catboost import CatBoostClassifier, CatBoostRegressor
from shap import TreeExplainer, summary_plot

from plotly.graph_objects import Figure, Waterfall
from plotly.io import to_image

from .base import InsolverBaseWrapper
from .extensions import InsolverCVHPExtension, InsolverPDPExtension
from .extensions.cvnhp import AUTO_SPACE_CONFIG


[docs] class InsolverGBMWrapper(InsolverBaseWrapper, InsolverCVHPExtension, InsolverPDPExtension): """Insolver wrapper for Gradient Boosting Machines. Parameters: backend (str): Framework for building GBM, 'xgboost', 'lightgbm' and 'catboost' are supported. task (str): Task that GBM should solve: Classification or Regression. Values 'reg' and 'class' are supported. n_estimators (int, optional): Number of boosting rounds. Equals 100 by default. objective (str, callable): Objective function for GBM to optimize. load_path (str, optional): Path to GBM model to load from disk. **kwargs: Parameters for GBM estimators except `n_estimators` and `objective`. Will not be changed in hyperopt. """ def __init__(self, backend, task=None, objective=None, n_estimators=100, load_path=None, **kwargs): super(InsolverGBMWrapper, self).__init__(backend) self.init_args = self._get_init_args(vars()) self.algo, self._backends = 'gbm', ['xgboost', 'lightgbm', 'catboost'] self._tasks = ['class', 'reg'] self._back_load_dict = { 'xgboost': self._pickle_load, 'lightgbm': self._pickle_load, 'catboost': self._pickle_load, } self._back_save_dict = { 'xgboost': self._pickle_save, 'lightgbm': self._pickle_save, 'catboost': self._pickle_save, } self.n_estimators, self.objective, self.params = n_estimators, objective, None if backend not in self._backends: raise NotImplementedError(f'Error with the backend choice. Supported backends: {self._backends}') if load_path is not None: self.load_model(load_path) else: if task in self._tasks: gbm_init = { 'class': {'xgboost': XGBClassifier, 'lightgbm': LGBMClassifier, 'catboost': CatBoostClassifier}, 'reg': {'xgboost': XGBRegressor, 'lightgbm': LGBMRegressor, 'catboost': CatBoostRegressor}, } objectives = { 'regression': {'xgboost': 'reg:squarederror', 'lightgbm': 'regression', 'catboost': 'RMSE'}, 'binary': {'xgboost': 'binary:logistic', 'lightgbm': 'binary', 'catboost': 'Logloss'}, 'multiclass': {'xgboost': 'multi:softmax', 'lightgbm': 'multiclass', 'catboost': 'MultiClass'}, 'poisson': {'xgboost': 'count:poisson', 'lightgbm': 'poisson', 'catboost': 'Poisson'}, 'gamma': { 'xgboost': 'reg:gamma', 'lightgbm': 'gamma', 'catboost': 'Tweedie:variance_power=1.9999999', }, } self.objective_ = ( objectives[self.objective][self.backend] if self.objective in objectives.keys() else self.objective ) kwargs.update({'objective': self.objective_, 'n_estimators': self.n_estimators}) self.model, self.params = gbm_init[task][self.backend](**(kwargs if kwargs is not None else {})), kwargs def __params_gbm(**params): params.update(self.params) return gbm_init[task][self.backend](**params) self.object = __params_gbm else: raise NotImplementedError(f'Task parameter supports values in {self._tasks}.') self._update_meta()
[docs] def fit(self, X, y, report=None, **kwargs): """Fit a Gradient Boosting Machine. Args: X (pd.DataFrame, pd.Series): Training data. y (pd.DataFrame, pd.Series): Training target values. report (list, tuple, optional): A list of metrics to report after model fitting, optional. **kwargs: Other parameters passed to Scikit-learn API .fit(). """ self.model.fit(X, y, **kwargs) if not hasattr(self.model, 'feature_name_'): self.model.feature_name_ = X.columns if isinstance(X, DataFrame) else [X.name] self._update_meta() if report is not None: if isinstance(report, (list, tuple)): prediction = self.model.predict(X) print( DataFrame([[x.__name__, x(y, prediction)] for x in report]) .rename({0: 'Metrics', 1: 'Value'}, axis=1) .set_index('Metrics') )
[docs] def predict(self, X, **kwargs): """Predict using GBM with feature matrix X. Args: X (pd.DataFrame, pd.Series): Samples. **kwargs: Other parameters passed to Scikit-learn API .predict(). Returns: array: Returns predicted values. """ return self.model.predict( X if not hasattr(self.model, 'feature_name_') else X[self.model.feature_name_], **kwargs )
[docs] def shap(self, X, show=False, plot_type='bar'): """Method for shap values calculation and corresponding plot of feature importances. Args: X (pd.DataFrame, pd.Series): Data for shap values calculation. show (boolean, optional): Whether to plot a graph. plot_type (str, optional): Type of feature importance graph, takes value in ['dot', 'bar']. Returns: JSON containing shap values. """ explainer = TreeExplainer(self.model) X = DataFrame(X).T if isinstance(X, Series) else X shap_values = explainer.shap_values(X) shap_values = shap_values[0] if isinstance(shap_values, list) and (len(shap_values) == 2) else shap_values variables = list(X.columns) mean_shap = npabs(shap_values).mean(axis=0).tolist() if show: summary_plot(shap_values, X, plot_type=plot_type, feature_names=variables) return {variables[i]: mean_shap[i] for i in range(len(variables))}
[docs] def shap_explain(self, data, index=None, link=None, show=True, layout_dict=None): """Method for plotting a waterfall graph or return corresponding JSON if show=False. Args: data (pd.DataFrame, pd.Series): Data for shap values calculation. index (int, optional): Index of the observation of interest, if data is pd.DataFrame. link (callable, optional): A function for transforming shap values into predictions. Unnecessary if self.objective is present and it takes values in ['binary', 'poisson', 'gamma']. show (boolean, optional): Whether to plot a graph or return a json. layout_dict (boolean, optional): Dictionary containing the parameters of plotly figure layout. Returns: None or dict: Waterfall graph or corresponding JSON. """ def logit(x): return true_divide(1, add(1, exp(x))) explainer = TreeExplainer(self.model) if isinstance(self.model, (XGBClassifier, XGBRegressor)): feature_names = self.model.get_booster().feature_names elif isinstance(self.model, (LGBMClassifier, LGBMRegressor)): feature_names = self.model.feature_name_ elif isinstance(self.model, (CatBoostClassifier, CatBoostRegressor)): feature_names = self.model.feature_names_ else: raise NotImplementedError(f'Error with the backend choice. Supported backends: {self._backends}') index = index if (isinstance(data, DataFrame)) and (index is not None) else None data = DataFrame(data).T[feature_names] if isinstance(data, Series) else data[feature_names] data = data if index is None else data.loc[[index], :] shap_values = explainer.shap_values(data) cond_bool = isinstance(shap_values, list) and (len(shap_values) == 2) shap_values = shap_values[0] if cond_bool else shap_values expected_value = explainer.expected_value[0] if cond_bool else explainer.expected_value prediction = DataFrame( [expected_value] + shap_values.reshape(-1).tolist(), index=['Intercept'] + feature_names, columns=['SHAP Value'], ) prediction['CumSum'] = cumsum(prediction['SHAP Value']) prediction['Value'] = append(nan, data.values.reshape(-1)) if (self.objective is not None) and (link is None): link = exp if self.objective in ['poisson', 'gamma'] else logit if self.objective == 'binary' else None if link is not None: prediction['Link'] = link(prediction['CumSum']) prediction['Contribution'] = [link(expected_value)] + list(diff(prediction['Link'])) else: prediction['Contribution'] = [expected_value] + list(diff(prediction['CumSum'])) fig = Figure( Waterfall( name=f'Prediction {index}', orientation='h', measure=['relative'] * len(prediction), y=[ prediction.index[i] if i == 0 else f'{prediction.index[i]}={data.values.reshape(-1)[i-1]}' for i in range(len(prediction.index)) ], x=prediction['Contribution'], ) ) fig.update_layout(**(layout_dict if layout_dict is not None else {})) if show: fig.show() else: json_ = prediction[['Value', 'SHAP Value', 'Contribution']].T.to_dict() fig_base64 = b64encode(to_image(fig, format='jpeg')).decode('ascii') json_.update( {'id': int(data.index.values), 'predict': prediction['Link'][-1], "ShapValuesPlot": fig_base64} ) return json_
[docs] def cross_val(self, X, y, scoring=None, cv=None, **kwargs): """Method for performing cross-validation given the hyperparameters of initialized or fitted model. Args: X (pd.DataFrame, pd.Series): Training data. y (pd.DataFrame, pd.Series): Training target values. scoring (callable): Metrics passed to sklearn.model_selection.cross_validate calculation. cv (int, cross-validation generator or an iterable`, optional): Cross-validation strategy from sklearn. Performs 5-fold cv by default. **kwargs: Other parameters passed to sklearn.model_selection.cross_validate. Returns: pd.DataFrame, pd.DataFrame: DataFrame with metrics on folds, DataFrame with shap values on folds. """ scoring = mean_squared_error if scoring is None else scoring models, metrics = self._cross_val(X, y, scoring=scoring, cv=cv, **kwargs) if callable(scoring): scorers = {scoring.__name__.replace('_', ' '): array([scoring(y, self.model.predict(X))])} elif isinstance(scoring, (tuple, list)): scorers = { scorer.__name__.replace('_', ' '): array([scorer(y, self.model.predict(X))]) for scorer in scoring } elif isinstance(scoring, str): if scoring in _SCORERS: scorers = {scoring.replace('_', ' '): array([_SCORERS[scoring](self.model, X=X, y=y)])} else: raise ValueError(f'Scorer {scoring} is not supported.') else: raise NotImplementedError(f'Scoring of type {scoring} is not supported') metrics = DataFrame({key: concatenate((scorers[key], metrics[key])) for key in scorers.keys()}).T metrics.columns = [f'Fold {i}' if i != 0 else 'Overall' for i in range(metrics.shape[1])] shap_coefs = [] explainer = TreeExplainer(self.model) shap_coefs.append( ([explainer.expected_value] if explainer.expected_value is None else explainer.expected_value.tolist()) + explainer.shap_values(X).mean(axis=0).tolist() ) for model in models: explainer = TreeExplainer(model) shap_coefs.append( ([explainer.expected_value] if explainer.expected_value is None else explainer.expected_value.tolist()) + explainer.shap_values(X).mean(axis=0).tolist() ) shapdf = DataFrame( array(shap_coefs).T, columns=['Overall'] + [f'Fold {x}' for x in range(1, len(models) + 1)], index=['Intercept'] + X.columns.tolist(), ) return metrics, shapdf
def auto(self, x_train, y_train, metric, offset=None, selection='shap', selection_thresh=0.05): self.hyperopt_cv( x_train, y_train, AUTO_SPACE_CONFIG[self.backend], max_evals=50, fn_params={'scoring': metric, 'fit_params': {'sample_weight': offset}}, ) if selection: shaps = self.shap(x_train) shaps = DataFrame.from_dict({'shap': shaps}).abs().sort_values('shap', ascending=False) shaps = shaps / shaps.sum() columns = shaps[shaps['shap'] >= selection_thresh].index.tolist() self.hyperopt_cv( x_train[columns], y_train, AUTO_SPACE_CONFIG[self.backend], max_evals=50, fn_params={'scoring': metric, 'fit_params': {'sample_weight': offset}}, )