Source code for libreco.data.split

"""Utility Functions for Splitting Data."""
import math

import numpy as np
from sklearn.model_selection import train_test_split


[docs]def random_split( data, shuffle=True, test_size=None, multi_ratios=None, filter_unknown=True, pad_unknown=False, pad_val=None, seed=42, ): """Split the data randomly. Parameters ---------- data : pandas.DataFrame The data to split. shuffle : bool, default: True Whether to shuffle data when splitting. test_size : float or None, default: None Test data ratio. multi_ratios : list of float, tuple of (float,) or None, default: None Ratios for splitting data in multiple parts. If ``test_size`` is not None, ``multi_ratios`` will be ignored. filter_unknown : bool, default: True Whether to filter out users and items that don't appear in the train data from eval and test data. Since models can only recommend items in the train data. pad_unknown : bool, default: False Fill the unknown users/items with ``pad_val``. If ``filter_unknown`` is True, this parameter will be ignored. pad_val : any, default: None Pad value used in ``pad_unknown``. seed : int, default: 42 Random seed. Returns ------- multiple data : list of pandas.DataFrame The split data. Raises ------ ValueError If neither `test_size` nor `multi_ratio` is provided. Examples -------- >>> train, test = random_split(data, test_size=0.2) >>> train_data, eval_data, test_data = random_split(data, multi_ratios=[0.8, 0.1, 0.1]) """ ratios, n_splits = _check_and_convert_ratio(test_size, multi_ratios) if not isinstance(ratios, list): ratios = list(ratios) # if we want to split data in multiple folds, # then iteratively split data based on modified ratios train_data = data.copy() split_data_all = [] for _ in range(n_splits - 1): size = ratios.pop(-1) ratios = [r / math.fsum(ratios) for r in ratios] train_data, split_data = train_test_split( train_data, test_size=size, shuffle=shuffle, random_state=seed ) split_data_all.insert(0, split_data) split_data_all.insert(0, train_data) # insert final fold of data if filter_unknown: split_data_all = _filter_unknown_user_item(split_data_all) elif pad_unknown and pad_val is not None: split_data_all = _pad_unknown_user_item(split_data_all, pad_val) return split_data_all
def _filter_unknown_user_item(data_list): train_data = data_list[0] unique_values = dict( user=set(train_data.user.tolist()), item=set(train_data.item.tolist()) ) split_data_all = [train_data] for test_data in data_list[1:]: # print(f"Non_train_data {i} size before filtering: {len(test_data)}") out_of_bounds_row_indices = set() for col in ["user", "item"]: for j, val in enumerate(test_data[col]): if val not in unique_values[col]: out_of_bounds_row_indices.add(j) mask = np.arange(len(test_data)) test_data_clean = test_data[~np.isin(mask, list(out_of_bounds_row_indices))] split_data_all.append(test_data_clean) # print(f"Non_train_data {i} size after filtering: " # f"{len(test_data_clean)}") return split_data_all def _pad_unknown_user_item(data_list, pad_val): train_data, test_data = data_list if isinstance(pad_val, (list, tuple)): user_pad_val, item_pad_val = pad_val else: user_pad_val = item_pad_val = pad_val unique_users = set(train_data.user.tolist()) unique_items = set(train_data.item.tolist()) split_data_all = [train_data] for test_data in data_list[1:]: test_data_copy = test_data.copy() test_data_copy.loc[~test_data.user.isin(unique_users), "user"] = user_pad_val test_data_copy.loc[~test_data.item.isin(unique_items), "item"] = item_pad_val test_data = test_data_copy split_data_all.append(test_data) return split_data_all
[docs]def split_by_ratio( data, order=True, shuffle=False, test_size=None, multi_ratios=None, filter_unknown=True, pad_unknown=False, pad_val=None, seed=42, ): """Assign certain ratio of items to test data for each user. .. NOTE:: If a user's total # of interacted items is less than 3, these items will all been assigned to train data. Parameters ---------- data : pandas.DataFrame The data to split. order : bool, default: True Whether to preserve order for user's item sequence. shuffle : bool, default: False Whether to shuffle data after splitting. test_size : float or None, default: None Test data ratio. multi_ratios : list of float, tuple of (float,) or None, default: None Ratios for splitting data in multiple parts. If ``test_size`` is not None, ``multi_ratios`` will be ignored. filter_unknown : bool, default: True Whether to filter out users and items that don't appear in the train data from eval and test data. Since models can only recommend items in the train data. pad_unknown : bool, default: False Fill the unknown users/items with ``pad_val``. If ``filter_unknown`` is True, this parameter will be ignored. pad_val : any, default: None Pad value used in ``pad_unknown``. seed : int, default: 42 Random seed. Returns ------- multiple data : list of pandas.DataFrame The split data. Raises ------ ValueError If neither `test_size` nor `multi_ratio` is provided. See Also -------- split_by_ratio_chrono """ assert "user" in data.columns, "data must contains user column" ratios, n_splits = _check_and_convert_ratio(test_size, multi_ratios) n_users = data.user.nunique() user_indices = data.user.to_numpy() user_split_indices = _groupby_user(user_indices, order) cum_ratios = np.cumsum(ratios).tolist()[:-1] split_indices_all = [[] for _ in range(n_splits)] for u in range(n_users): u_data = user_split_indices[u] u_data_len = len(u_data) if u_data_len <= 3: # keep items of rare users in trainset split_indices_all[0].extend(u_data) else: u_split_data = np.split( u_data, [round(cum * u_data_len) for cum in cum_ratios] ) for i in range(n_splits): split_indices_all[i].extend(list(u_split_data[i])) if shuffle: np_rng = np.random.default_rng(seed) split_data_all = tuple( data.iloc[np_rng.permutation(idx)] for idx in split_indices_all ) else: split_data_all = list(data.iloc[idx] for idx in split_indices_all) if filter_unknown: split_data_all = _filter_unknown_user_item(split_data_all) elif pad_unknown and pad_val is not None: split_data_all = _pad_unknown_user_item(split_data_all, pad_val) return split_data_all
[docs]def split_by_num( data, order=True, shuffle=False, test_size=1, filter_unknown=True, pad_unknown=False, pad_val=None, seed=42, ): """Assign a certain number of items to test data for each user. .. NOTE:: If a user's total # of interacted items is less than 3, these items will all been assigned to train data. Parameters ---------- data : pandas.DataFrame The data to split. order : bool, default: True Whether to preserve order for user's item sequence. shuffle : bool, default: False Whether to shuffle data after splitting. test_size : float or None, default: None Test data ratio. filter_unknown : bool, default: True Whether to filter out users and items that don't appear in the train data from eval and test data. Since models can only recommend items in the train data. pad_unknown : bool, default: False Fill the unknown users/items with ``pad_val``. If ``filter_unknown`` is True, this parameter will be ignored. pad_val : any, default: None Pad value used in ``pad_unknown``. seed : int, default: 42 Random seed. Returns ------- multiple data : list of pandas.DataFrame The split data. Raises ------ ValueError If neither `test_size` nor `multi_ratio` is provided. See Also -------- split_by_num_chrono """ assert "user" in data.columns, "data must contains user column" assert isinstance(test_size, int), "test_size must be int value" assert 0 < test_size < len(data), "test_size must be in (0, len(data))" n_users = data.user.nunique() user_indices = data.user.to_numpy() user_split_indices = _groupby_user(user_indices, order) train_indices = [] test_indices = [] for u in range(n_users): u_data = user_split_indices[u] u_data_len = len(u_data) if u_data_len <= 3: # keep items of rare users in trainset train_indices.extend(u_data) elif u_data_len <= test_size: train_indices.extend(u_data[:-1]) test_indices.extend(u_data[-1:]) else: k = test_size train_indices.extend(u_data[: (u_data_len - k)]) test_indices.extend(u_data[-k:]) if shuffle: np_rng = np.random.default_rng(seed) train_indices = np_rng.permutation(train_indices) test_indices = np_rng.permutation(test_indices) split_data_all = (data.iloc[train_indices], data.iloc[test_indices]) if filter_unknown: split_data_all = _filter_unknown_user_item(split_data_all) elif pad_unknown and pad_val is not None: split_data_all = _pad_unknown_user_item(split_data_all, pad_val) return split_data_all
[docs]def split_by_ratio_chrono( data, order=True, shuffle=False, test_size=None, multi_ratios=None, seed=42 ): """Assign a certain ratio of items to test data for each user, where items are sorted by time first. .. IMPORTANT:: This function implies the data should contain a **time** column. Parameters ---------- data : pandas.DataFrame The data to split. order : bool, default: True Whether to preserve order for user's item sequence. shuffle : bool, default: False Whether to shuffle data after splitting. test_size : float or None, default: None Test data ratio. multi_ratios : list of float, tuple of (float,) or None, default: None Ratios for splitting data in multiple parts. If ``test_size`` is not None, ``multi_ratios`` will be ignored. seed : int, default: 42 Random seed. Returns ------- multiple data : list of pandas.DataFrame The split data. Raises ------ ValueError If neither `test_size` nor `multi_ratio` is provided. See Also -------- split_by_ratio """ assert all( ["user" in data.columns, "time" in data.columns] ), "data must contains user and time column" data = data.sort_values(by=["time"]).reset_index(drop=True) return split_by_ratio(data, order, shuffle, test_size, multi_ratios, seed=seed)
[docs]def split_by_num_chrono(data, order=True, shuffle=False, test_size=1, seed=42): """Assign a certain number of items to test data for each user, where items are sorted by time first. .. IMPORTANT:: This function implies the data should contain a **time** column. Parameters ---------- data : pandas.DataFrame The data to split. order : bool, default: True Whether to preserve order for user's item sequence. shuffle : bool, default: False Whether to shuffle data after splitting. test_size : float or None, default: None Test data ratio. seed : int, default: 42 Random seed. Returns ------- multiple data : list of pandas.DataFrame The split data. Raises ------ ValueError If neither `test_size` nor `multi_ratio` is provided. See Also -------- split_by_num """ assert all( ["user" in data.columns, "time" in data.columns] ), "data must contains user and time column" data = data.sort_values(by=["time"]).reset_index(drop=True) return split_by_num(data, order, shuffle, test_size, seed=seed)
def _groupby_user(user_indices, order): sort_kind = "mergesort" if order else "quicksort" _, user_position, user_counts = np.unique( user_indices, return_inverse=True, return_counts=True ) user_split_indices = np.split( np.argsort(user_position, kind=sort_kind), np.cumsum(user_counts)[:-1] ) return user_split_indices def _check_and_convert_ratio(test_size, multi_ratios): if not test_size and not multi_ratios: raise ValueError("must provide either 'test_size' or 'multi_ratios'") elif test_size is not None: assert isinstance(test_size, float), "test_size must be float value" assert 0.0 < test_size < 1.0, "test_size must be in (0.0, 1.0)" ratios = [1 - test_size, test_size] return ratios, 2 elif isinstance(multi_ratios, (list, tuple)): assert len(multi_ratios) > 1, "multi_ratios must at least have two elements" assert all([r > 0.0 for r in multi_ratios]), "ratios should be positive values" if math.fsum(multi_ratios) != 1.0: ratios = [r / math.fsum(multi_ratios) for r in multi_ratios] else: ratios = multi_ratios return ratios, len(ratios) else: raise ValueError("multi_ratios should be list or tuple")