Source code for insolver.feature_engineering.sampling

from numpy import arange, repeat, delete, append
from numpy.random import choice
from pandas import DataFrame, concat


[docs] class Sampling: """ Sampling class. It includes several different techniques: simple sampling, systematic sampling, cluster sampling, stratified sampling. Parameters: n (int): This parameter is used in chosen sampling method: for a `simple` sampling `n` is the number of values to keep; for a `systematic` sampling `n` is the number of step size; for a `cluster` sampling `n` is the number of clusters to keep; for a `stratified` sampling `n` is the number of values to keep in each cluster. n_clusters (int), default = 10: Number of clusters for the `cluster` and `stratified` sampling. cluster_column (str): Column name of the data frame used as clusters. method (str): Sampling method, supported methods: `simple`, `systematic`, `cluster`, `stratified`. """ def __init__(self, n, cluster_column=None, n_clusters=10, method='simple'): self.method = method self.n = n self.n_clusters = n_clusters self.cluster_column = cluster_column
[docs] def sample_dataset(self, df): """ A method for performing sampling with the dataset using selected method. Parameters: df (pandas.Dataframe): The dataframe. Raises: NotImplementedError: If `self.method` is not supported. Returns: New dataset with selected rows. """ # initialize all methods sampling_dict = { 'simple': lambda d: self._simple_sampling(d), 'systematic': lambda d: self._systematic_sampling(d), 'cluster': lambda d: self._cluster_sampling(d), 'stratified': lambda d: self._stratified_sampling(d), } # raise error if the method is not supported if self.method not in list(sampling_dict.keys()): raise NotImplementedError(f'{self.method} method is not supported.') # get and call the function and create new_df new_df = sampling_dict[self.method](df) return new_df
[docs] def _simple_sampling(self, df): """ Simple sampling. Parameters: df (pandas.Dataframe): The dataframe. Returns: New dataset with selected rows. """ # sample data with the DataFrame.sample() method simple_random_sample = df.sample(n=self.n) return simple_random_sample
[docs] def _systematic_sampling(self, df): """ Systematic sampling. Parameters: df (pandas.Dataframe): The dataframe. Returns: New dataset with selected rows. """ # get indexes with selected step indexes = arange(0, len(df), step=self.n) # get only selected indexes systematic_sample = df.iloc[indexes] return systematic_sample
[docs] def _cluster_sampling(self, df): """ Cluster sampling. Parameters: df (pandas.Dataframe): The dataframe. Returns: New dataset with selected rows. """ # create clusters cluster_df = self._create_clusters(df) # count clusters to check clusters_count = cluster_df['cluster_id'].unique().sum() cluster_sample = DataFrame() # if the selected number of clusters is bigger then the created number raise error if self.n > clusters_count: raise ValueError(f'{self.n} cannot be bigger then number of clusters.') # if the selected number of clusters equals the created number return df elif self.n == clusters_count: return df else: # randomly chose clusters to keep clusters_to_keep = choice(cluster_df['cluster_id'].unique(), self.n) for cluster in clusters_to_keep: # create a new DataFrame only with the selected clusters cluster_sample = concat([cluster_sample, cluster_df[cluster_df['cluster_id'] == cluster]]) return cluster_sample
[docs] def _stratified_sampling(self, df): """ Stratified sampling. Parameters: df (pandas.Dataframe): The dataframe. Returns: New dataset with selected rows. """ # create clusters cluster_df = self._create_clusters(df) stratified_sample = DataFrame() for cluster in cluster_df['cluster_id'].unique(): # get selected number of values from each cluster sample_cluster = cluster_df[cluster_df['cluster_id'] == cluster].sample(n=self.n) # create a new DataFrame only with the selected values in the cluster stratified_sample = concat([stratified_sample, sample_cluster]) return stratified_sample
[docs] def _create_clusters(self, df): """ Creating dataframe with clusters. If self.cluster_column is defined, the clusters column is created using the dataframe column. Otherwise the clusters are formed according to the existing order. Parameters: df (pandas.Dataframe): The dataframe. Raises: ValueError: Values in the column must be not null. Returns: New dataset with cluster column. """ # get the cluster size as DataFrame length divided by the number of clusters cluster_size = round(len(df) / self.n_clusters) new_df = df.copy() # if a column that is used as a clusters is initialized if self.cluster_column: # check for null values if df[self.cluster_column].isnull().sum() > 0: raise ValueError('All values in the column must be not null.') new_df = df.copy() # create 'cluster_id' column as a copy of cluster_column new_df['cluster_id'] = df[self.cluster_column] else: try: # try if the clusters can be filled exactly new_df['cluster_id'] = repeat(list(range(1, self.n_clusters + 1)), cluster_size) except ValueError: # if not get indexes indexes = repeat(list(range(1, self.n_clusters + 1)), cluster_size) # calculate the difference diff = len(indexes) - len(df) # if the difference is greater than 0 delete one row if diff > 0: for i in range(diff): new_df['cluster_id'] = delete(indexes, len(indexes) - 1) # if the difference is less than 0 add one row if diff < 0: for i in range(abs(diff)): new_df['cluster_id'] = append(indexes, self.n_clusters) return new_df