Source code for libreco.data.data_info

"""Classes for Storing Various Data Information."""
import inspect
import json
import pickle
from collections import namedtuple
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List

import numpy as np
import pandas as pd

from ..feature.update import (
    get_row_id_masks,
    update_new_dense_feats,
    update_new_sparse_feats,
)

Feature = namedtuple("Feature", ["name", "index"])

EmptyFeature = Feature(name=[], index=[])


# noinspection PyUnresolvedReferences
[docs]@dataclass class MultiSparseInfo: """`dataclasses <https://docs.python.org/3/library/dataclasses.html>`_ for storing multi-sparse features information. A group of multi-sparse features are considered as a "field", e.g., ("genre1", "genre2", "genre3") form a "genre" field, and features belong to the same field share the same oov. Attributes ---------- field_offset : list of int All multi-sparse fields' offset in all expanded sparse features. field_len : list of int All multi-sparse fields' sizes. feat_oov : numpy.ndarray All multi-sparse fields' oov. pad_val : dict of {str : Any} Padding value in multi-sparse columns. """ __slots__ = ("field_offset", "field_len", "feat_oov", "pad_val") field_offset: Iterable[int] field_len: Iterable[int] feat_oov: np.ndarray pad_val: Dict[str, Any]
[docs]class DataInfo: """Object for storing and updating information of indices and features. Parameters ---------- col_name_mapping : dict of {dict : int} or None, default: None Column name to index mapping, which has the format: ``{column_family_name: {column_name: index}}``. If no such family, the default format would be: {column_family_name: {[]: []}} interaction_data : pandas.DataFrame or None, default: None Data contains ``user``, ``item`` and ``label`` columns user_sparse_unique : numpy.ndarray or None, default: None Unique sparse features for all users in train data. user_dense_unique : numpy.ndarray or None, default: None Unique dense features for all users in train data. item_sparse_unique : numpy.ndarray or None, default: None Unique sparse features for all items in train data. item_dense_unique : numpy.ndarray or None, default: None Unique dense features for all items in train data. user_consumed : dict of {int : list} or None, default: None All consumed items by each user. item_consumed : dict of {int : list} or None, default: None All consumed users by each item. user_unique_vals : numpy.ndarray or None, default: None All the unique users in train data. item_unique_vals : numpy.ndarray or None, default: None All the unique items in train data. sparse_unique_vals : dict of {str : numpy.ndarray} or None, default: None All sparse features' unique values. sparse_offset : numpy.ndarray or None, default: None Offset for each sparse feature in all sparse values. Often used in the ``embedding`` layer. sparse_oov : numpy.ndarray or None, default: None Out-of-vocabulary place for each sparse feature. Often used in cold-start. multi_sparse_unique_vals : dict of {str : numpy.ndarray} or None, default: None All multi-sparse features' unique values. multi_sparse_combine_info : MultiSparseInfo or None, default: None Multi-sparse field information. seed: int, default: 42 Random seed. Attributes ---------- col_name_mapping : dict of {dict : int} or None See Parameters user_consumed : dict of {int, list} Every users' consumed items in train data. item_consumed : dict of {int, list} Every items' consumed users in train data. See Also -------- MultiSparseInfo """ def __init__( self, col_name_mapping=None, interaction_data=None, user_sparse_unique=None, user_dense_unique=None, item_sparse_unique=None, item_dense_unique=None, user_consumed=None, item_consumed=None, user_unique_vals=None, item_unique_vals=None, sparse_unique_vals=None, sparse_offset=None, sparse_oov=None, multi_sparse_unique_vals=None, multi_sparse_combine_info=None, seed=42, ): self.col_name_mapping = col_name_mapping self.interaction_data = interaction_data self.user_sparse_unique = user_sparse_unique self.user_dense_unique = user_dense_unique self.item_sparse_unique = item_sparse_unique self.item_dense_unique = item_dense_unique self.user_consumed = user_consumed self.item_consumed = item_consumed self.user_unique_vals = user_unique_vals self.item_unique_vals = item_unique_vals self.sparse_unique_vals = sparse_unique_vals self.sparse_offset = sparse_offset self.sparse_oov = sparse_oov self.multi_sparse_unique_vals = multi_sparse_unique_vals self.multi_sparse_combine_info = multi_sparse_combine_info self.sparse_idx_mapping = DataInfo.map_sparse_vals( sparse_unique_vals, multi_sparse_unique_vals ) # Numpy doc states that it is recommended to use new random API # https://numpy.org/doc/stable/reference/random/index.html self.np_rng = np.random.default_rng(seed) self._n_users = None self._n_items = None self._user2id = None self._item2id = None self._id2user = None self._id2item = None self._data_size = None self._popular_items = None # store old info for rebuild models self.old_info = None self.all_args = locals() self.add_oovs() @staticmethod def map_sparse_vals(sparse_unique_vals, multi_sparse_unique_vals): if sparse_unique_vals is None and multi_sparse_unique_vals is None: return def _map_vals(unique_vals): mapping = dict() if unique_vals is not None: for col, vals in unique_vals.items(): size = len(vals) mapping[col] = dict(zip(vals, range(size))) return mapping res = dict() res.update(_map_vals(sparse_unique_vals)) res.update(_map_vals(multi_sparse_unique_vals)) assert len(res) > 0 return res @property def global_mean(self): """Mean value of all labels in `rating` task.""" return self.interaction_data.label.mean() @property def min_max_rating(self): """Min and max value of all labels in `rating` task.""" return self.interaction_data.label.min(), self.interaction_data.label.max() @property def sparse_col(self): """Sparse column name to index mapping.""" if not self.col_name_mapping or "sparse_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["sparse_col"].keys()), index=list(self.col_name_mapping["sparse_col"].values()), ) @property def dense_col(self): """Dense column name to index mapping.""" if not self.col_name_mapping or "dense_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["dense_col"].keys()), index=list(self.col_name_mapping["dense_col"].values()), ) @property def user_sparse_col(self): """User sparse column name to index mapping.""" if not self.col_name_mapping or "user_sparse_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["user_sparse_col"].keys()), index=list(self.col_name_mapping["user_sparse_col"].values()), ) @property def user_dense_col(self): """User dense column name to index mapping.""" if not self.col_name_mapping or "user_dense_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["user_dense_col"].keys()), index=list(self.col_name_mapping["user_dense_col"].values()), ) @property def item_sparse_col(self): """Item sparse column name to index mapping.""" if not self.col_name_mapping or "item_sparse_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["item_sparse_col"].keys()), index=list(self.col_name_mapping["item_sparse_col"].values()), ) @property def item_dense_col(self): """Item dense column name to index mapping.""" if not self.col_name_mapping or "item_dense_col" not in self.col_name_mapping: return EmptyFeature return Feature( name=list(self.col_name_mapping["item_dense_col"].keys()), index=list(self.col_name_mapping["item_dense_col"].values()), ) @property def user_col(self): """All the user column names, including sparse and dense.""" if not self.col_name_mapping: return [] user_sparse, user_dense = [], [] if "user_sparse_col" in self.col_name_mapping: user_sparse = list(self.col_name_mapping["user_sparse_col"].keys()) if "user_dense_col" in self.col_name_mapping: user_dense = list(self.col_name_mapping["user_dense_col"].keys()) # The result columns will be sorted by key return user_sparse + user_dense @property def item_col(self): """All the item column names, including sparse and dense.""" if not self.col_name_mapping: return [] item_sparse, item_dense = [], [] if "item_sparse_col" in self.col_name_mapping: item_sparse = list(self.col_name_mapping["item_sparse_col"].keys()) if "item_dense_col" in self.col_name_mapping: item_dense = list(self.col_name_mapping["item_dense_col"].keys()) # The result columns will be sorted by key return item_sparse + item_dense @property def n_users(self): """Number of users in train data.""" if self._n_users is None: self._n_users = len(self.user_unique_vals) return self._n_users @property def n_items(self): """Number of items in train data.""" if self._n_items is None: self._n_items = len(self.item_unique_vals) return self._n_items @property def user2id(self): """User original id to inner id mapping.""" if self._user2id is None: self._user2id = dict(zip(self.user_unique_vals, range(self.n_users))) return self._user2id @property def item2id(self): """Item original id to inner id mapping.""" if self._item2id is None: self._item2id = dict(zip(self.item_unique_vals, range(self.n_items))) return self._item2id @property def id2user(self): """User inner id to original id mapping.""" if self._id2user is None: self._id2user = {j: user for user, j in self.user2id.items()} return self._id2user @property def id2item(self): """User inner id to original id mapping.""" if self._id2item is None: self._id2item = {j: item for item, j in self.item2id.items()} return self._id2item @property def data_size(self): """Train data size.""" if self._data_size is None: self._data_size = len(self.interaction_data) return self._data_size
[docs] def __repr__(self): r"""Output train data information: \"n_users, n_items, data density\".""" n_users = self.n_users n_items = self.n_items n_labels = len(self.interaction_data) return "n_users: %d, n_items: %d, data density: %.4f %%" % ( n_users, n_items, 100 * n_labels / (n_users * n_items), )
[docs] def assign_user_features(self, user_data): """Assign user features to this ``data_info`` object from ``user_data``. Parameters ---------- user_data : pandas.DataFrame Data contains new user features. """ assert "user" in user_data.columns, "Data must contain `user` column." user_data = user_data.drop_duplicates(subset=["user"], keep="last") user_row_idx, user_id_mask = get_row_id_masks( user_data["user"], self.user_unique_vals ) self.user_sparse_unique = update_new_sparse_feats( user_data, user_row_idx, user_id_mask, self.user_sparse_unique, self.sparse_unique_vals, self.multi_sparse_unique_vals, self.user_sparse_col, self.col_name_mapping, self.sparse_offset, ) self.user_dense_unique = update_new_dense_feats( user_data, user_row_idx, user_id_mask, self.user_dense_unique, self.user_dense_col, )
[docs] def assign_item_features(self, item_data): """Assign item features to this ``data_info`` object from ``item_data``. Parameters ---------- item_data : pandas.DataFrame Data contains new item features. """ assert "item" in item_data.columns, "Data must contain `item` column." item_data = item_data.drop_duplicates(subset=["item"], keep="last") item_row_idx, item_id_mask = get_row_id_masks( item_data["item"], self.item_unique_vals ) self.item_sparse_unique = update_new_sparse_feats( item_data, item_row_idx, item_id_mask, self.item_sparse_unique, self.sparse_unique_vals, self.multi_sparse_unique_vals, self.item_sparse_col, self.col_name_mapping, self.sparse_offset, ) self.item_dense_unique = update_new_dense_feats( item_data, item_row_idx, item_id_mask, self.item_dense_unique, self.item_dense_col, )
def add_oovs(self): def _concat_oov(uniques, cols=None): if uniques is None: return oov = self.sparse_oov[cols] if cols else np.mean(uniques, axis=0) return np.vstack([uniques, oov]) self.user_sparse_unique = _concat_oov( self.user_sparse_unique, self.user_sparse_col.index ) self.item_sparse_unique = _concat_oov( self.item_sparse_unique, self.item_sparse_col.index ) self.user_dense_unique = _concat_oov(self.user_dense_unique) self.item_dense_unique = _concat_oov(self.item_dense_unique) @property def popular_items(self): """A number of popular items in train data which often used in cold-start.""" if self._popular_items is None: self._popular_items = self._get_popular_items(100) return self._popular_items def _get_popular_items(self, num): count_items = ( self.interaction_data.drop_duplicates(subset=["user", "item"]) .groupby("item")["user"] .count() ) selected_items = count_items.sort_values(ascending=False).index.tolist()[:num] # if not enough items, add old populars if len(selected_items) < num and self.old_info is not None: diff = num - len(selected_items) selected_items.extend(self.old_info.popular_items[:diff]) return selected_items
[docs] def save(self, path, model_name): """Save :class:`DataInfo` Object. Parameters ---------- path : str File folder path to save :class:`DataInfo`. model_name : str Name of the saved file. """ path = Path(path) if not path.is_dir(): print(f"file folder {path} doesn't exists, creating a new one...") path.mkdir() if self.col_name_mapping is not None: with open(path / f"{model_name}_data_info_name_mapping.json", "w") as f: json.dump( self.all_args["col_name_mapping"], f, separators=(",", ":"), indent=4, ) if self.user_consumed is not None: with open(path / f"{model_name}_user_consumed.pkl", "wb") as f: pickle.dump(self.user_consumed, f, protocol=pickle.HIGHEST_PROTOCOL) if self.item_consumed is not None: with open(path / f"{model_name}_item_consumed.pkl", "wb") as f: pickle.dump(self.item_consumed, f, protocol=pickle.HIGHEST_PROTOCOL) hparams = dict() arg_names = inspect.signature(self.__init__).parameters.keys() for arg in arg_names: if ( arg in ("col_name_mapping", "user_consumed", "item_consumed") or arg not in self.all_args or self.all_args[arg] is None ): continue if arg == "interaction_data": hparams[arg] = self.all_args[arg].to_numpy() elif arg == "sparse_unique_vals": sparse_unique_vals = self.all_args[arg] for col, val in sparse_unique_vals.items(): hparams["unique_" + str(col)] = np.asarray(val) elif arg == "multi_sparse_unique_vals": multi_sparse_unique_vals = self.all_args[arg] for col, val in multi_sparse_unique_vals.items(): hparams["munique_" + str(col)] = np.asarray(val) else: hparams[arg] = self.all_args[arg] np.savez_compressed(path / f"{model_name}_data_info", **hparams)
[docs] @classmethod def load(cls, path, model_name): """Load saved :class:`DataInfo`. Parameters ---------- path : str File folder path to save :class:`DataInfo`. model_name : str Name of the saved file. """ path = Path(path) if not path.exists(): raise OSError(f"file folder {path} doesn't exists...") hparams = dict() name_mapping_path = path / f"{model_name}_data_info_name_mapping.json" if name_mapping_path.exists(): with open(name_mapping_path, "r") as f: hparams["col_name_mapping"] = json.load(f) user_consumed_path = path / f"{model_name}_user_consumed.pkl" if user_consumed_path.exists(): with open(user_consumed_path, "rb") as f: hparams["user_consumed"] = pickle.load(f) item_consumed_path = path / f"{model_name}_item_consumed.pkl" if item_consumed_path.exists(): with open(item_consumed_path, "rb") as f: hparams["item_consumed"] = pickle.load(f) info = np.load(path / f"{model_name}_data_info.npz", allow_pickle=True) info = dict(info.items()) for arg in info: if arg == "interaction_data": hparams[arg] = pd.DataFrame( info[arg], columns=["user", "item", "label"] ) elif arg in ("multi_sparse_combine_info", "seed"): # numpy can save MultiSparseInfo and seed in 0-d array. hparams[arg] = info[arg].item() elif arg.startswith("unique_"): if "sparse_unique_vals" not in hparams: hparams["sparse_unique_vals"] = dict() hparams["sparse_unique_vals"][arg[7:]] = info[arg] elif arg.startswith("munique_"): if "multi_sparse_unique_vals" not in hparams: hparams["multi_sparse_unique_vals"] = dict() hparams["multi_sparse_unique_vals"][arg[8:]] = info[arg] else: hparams[arg] = info[arg] return cls(**hparams)
@dataclass class OldInfo: n_users: int n_items: int sparse_len: List[int] sparse_oov: List[int] popular_items: List[Any] def store_old_info(data_info): sparse_len = list() sparse_oov = list() sparse_unique = data_info.sparse_unique_vals multi_sparse_unique = data_info.multi_sparse_unique_vals for i, col in enumerate(data_info.sparse_col.name): if sparse_unique is not None and col in sparse_unique: sparse_len.append(len(sparse_unique[col])) sparse_oov.append(data_info.sparse_oov[i]) elif multi_sparse_unique is not None and col in multi_sparse_unique: sparse_len.append(len(multi_sparse_unique[col])) sparse_oov.append(data_info.sparse_oov[i]) elif ( multi_sparse_unique is not None and "multi_sparse" in data_info.col_name_mapping and col in data_info.col_name_mapping["multi_sparse"] ): # multi_sparse case, second to last cols are redundant. # Used in `rebuild_tf_model`, `rebuild_torch_model` sparse_len.append(-1) return OldInfo( data_info.n_users, data_info.n_items, sparse_len, sparse_oov, data_info.popular_items, )