"""CF model base class."""
import abc
import os
import random
from functools import partial
from itertools import islice, takewhile
from operator import itemgetter
import numpy as np
from scipy.sparse import issparse
from scipy.sparse import load_npz as load_sparse
from scipy.sparse import save_npz as save_sparse
from tqdm import tqdm
from .base import Base
from ..evaluation import print_metrics
from ..prediction.preprocess import convert_id
from ..recommendation import construct_rec, popular_recommendations
from ..recommendation.ranking import filter_items
from ..utils.misc import colorize, time_block
from ..utils.save_load import load_params, save_params
from ..utils.similarities import cosine_sim, jaccard_sim, pearson_sim
from ..utils.validate import check_fitting, check_unknown, check_unknown_user
[docs]class CfBase(Base):
"""Base class for CF models.
Parameters
----------
task : {'rating', 'ranking'}
Recommendation task. See :ref:`Task`.
data_info : :class:`~libreco.data.DataInfo` object
Object that contains useful information for training and inference.
cf_type : {'user_cf', 'item_cf'}
Specific CF type.
sim_type : {'cosine', 'pearson', 'jaccard'}, default: 'cosine'
Types for computing similarities.
k_sim : int, default: 20
Number of similar items to use.
store_top_k : bool, default: True
Whether to store top k similar users after training.
block_size : int or None, default: None
Block size for computing similarity matrix. Large block size makes computation
faster, but may cause memory issue.
num_threads : int, default: 1
Number of threads to use.
min_common : int, default: 1
Number of minimum common users to consider when computing similarities.
mode : {'forward', 'invert'}, default: 'invert'
Whether to use forward index or invert index.
seed : int, default: 42
Random seed.
lower_upper_bound : tuple or None, default: None
Lower and upper score bound for `rating` task.
See Also
--------
~libreco.algorithms.UserCF
~libreco.algorithms.ItemCF
"""
def __init__(
self,
task,
data_info,
cf_type,
sim_type="cosine",
k_sim=20,
store_top_k=True,
block_size=None,
num_threads=1,
min_common=1,
mode="invert",
seed=42,
lower_upper_bound=None,
):
super().__init__(task, data_info, lower_upper_bound)
assert cf_type in ("user_cf", "item_cf")
self.cf_type = cf_type
self.k_sim = k_sim
self.sim_type = sim_type
self.store_top_k = store_top_k
self.block_size = block_size
self.num_threads = num_threads
self.min_common = min_common
self.mode = mode
self.seed = seed
# sparse matrix, user as row and item as column
self.user_interaction = None
# sparse matrix, item as row and user as column
self.item_interaction = None
# sparse similarity matrix
self.sim_matrix = None
self.topk_sim = None
self.print_count = 0
self._caution_sim_type()
def _caution_sim_type(self):
if self.task == "ranking" and self.sim_type == "pearson":
caution_str = "Warning: pearson is not suitable for implicit data"
print(f"{colorize(caution_str, 'red')}")
if self.task == "rating" and self.sim_type == "jaccard":
caution_str = "Warning: jaccard is not suitable for explicit data"
print(f"{colorize(caution_str, 'red')}")
[docs] def fit(
self,
train_data,
neg_sampling,
verbose=1,
eval_data=None,
metrics=None,
k=10,
eval_batch_size=8192,
eval_user_num=None,
):
"""Fit CF model on the training data.
Parameters
----------
train_data : :class:`~libreco.data.TransformedSet` object
Data object used for training.
neg_sampling : bool
Whether to perform negative sampling for evaluating data.
.. versionadded:: 1.1.0
verbose : int, default: 1
Print verbosity. If `eval_data` is provided, setting it to higher than 1
will print evaluation metrics during training.
eval_data : :class:`~libreco.data.TransformedSet` object, default: None
Data object used for evaluating.
metrics : list or None, default: None
List of metrics for evaluating.
k : int, default: 10
Parameter of metrics, e.g. recall at k, ndcg at k
eval_batch_size : int, default: 8192
Batch size for evaluating.
eval_user_num : int or None, default: None
Number of users for evaluating. Setting it to a positive number will sample
users randomly from eval data.
"""
check_fitting(self, train_data, eval_data, neg_sampling, k)
self.show_start_time()
self.user_interaction = train_data.sparse_interaction
self.item_interaction = self.user_interaction.T.tocsr()
with time_block("sim_matrix", verbose=1):
if self.sim_type == "cosine":
sim_func = cosine_sim
elif self.sim_type == "pearson":
sim_func = pearson_sim
elif self.sim_type == "jaccard":
sim_func = jaccard_sim
else:
raise ValueError(
"sim_type must be one of (`cosine`, `pearson`, `jaccard`)"
)
sim_func = partial(
sim_func,
block_size=self.block_size,
num_threads=self.num_threads,
min_common=self.min_common,
mode=self.mode,
)
if self.cf_type == "user_cf":
self.sim_matrix = sim_func(
self.user_interaction,
self.item_interaction,
self.n_users,
self.n_items,
)
else:
self.sim_matrix = sim_func(
self.item_interaction,
self.user_interaction,
self.n_items,
self.n_users,
)
assert self.sim_matrix.has_sorted_indices
if issparse(self.sim_matrix):
n_elements = self.sim_matrix.getnnz()
if self.cf_type == "user_cf":
density_ratio = 100 * n_elements / (self.n_users * self.n_users)
else:
density_ratio = 100 * n_elements / (self.n_items * self.n_items)
print(
f"sim_matrix, shape: {self.sim_matrix.shape}, "
f"num_elements: {n_elements}, "
f"density: {density_ratio:5.4f} %"
)
if self.store_top_k:
self.compute_top_k()
if verbose > 1:
print_metrics(
model=self,
neg_sampling=neg_sampling,
eval_data=eval_data,
metrics=metrics,
eval_batch_size=eval_batch_size,
k=k,
sample_user_num=eval_user_num,
seed=self.seed,
)
print("=" * 30)
def pre_predict_check(self, user, item, inner_id, cold_start):
user_arr, item_arr = convert_id(self, user, item, inner_id)
unknown_num, _, user_arr, item_arr = check_unknown(self, user_arr, item_arr)
if unknown_num > 0 and cold_start != "popular":
raise ValueError(f"{self.model_name} only supports popular strategy")
return user_arr, item_arr
def compute_pred(self, user, item, common_size, common_sims, common_labels):
if common_size == 0 or np.all(common_sims <= 0.0):
self.print_count += 1
no_str = (
f"No common interaction or similar neighbor "
f"for user {user} and item {item}, "
f"proceed with default prediction"
)
if self.print_count < 7:
print(f"{colorize(no_str, 'red')}")
return self.default_pred
else:
k_neighbor_labels, k_neighbor_sims = zip(
*islice(
takewhile(
lambda x: x[1] > 0,
sorted(
zip(common_labels, common_sims),
key=itemgetter(1),
reverse=True,
),
),
self.k_sim,
)
)
if self.task == "rating":
sim_weights = k_neighbor_sims / np.sum(k_neighbor_sims)
weighted_pred = np.average(k_neighbor_labels, weights=sim_weights)
return np.clip(weighted_pred, self.lower_bound, self.upper_bound)
elif self.task == "ranking":
return np.mean(k_neighbor_sims)
[docs] def recommend_user(
self,
user,
n_rec,
cold_start="popular",
inner_id=False,
filter_consumed=True,
random_rec=False,
):
"""Recommend a list of items for given user(s).
Parameters
----------
user : int or str or array_like
User id or batch of user ids to recommend.
n_rec : int
Number of recommendations to return.
cold_start : {'popular'}, default: 'popular'
Cold start strategy, CF models can only use 'popular' strategy.
inner_id : bool, default: False
Whether to use inner_id defined in `libreco`. For library users inner_id
may never be used.
filter_consumed : bool, default: True
Whether to filter out items that a user has previously consumed.
random_rec : bool, default: False
Whether to choose items for recommendation based on their prediction scores.
Returns
-------
recommendation : dict[Union[int, str, array_like], numpy.ndarray]
Recommendation result with user ids as keys
and array_like recommended items as values.
"""
result_recs = dict()
user_ids, unknown_users = check_unknown_user(self.data_info, user, inner_id)
if unknown_users:
if cold_start != "popular":
raise ValueError(
f"{self.model_name} only supports `popular` cold start strategy"
)
for u in unknown_users:
result_recs[u] = popular_recommendations(
self.data_info, inner_id, n_rec
)
if user_ids:
computed_recs = [
self.recommend_one(u, n_rec, filter_consumed, random_rec)
for u in user_ids
]
user_recs = construct_rec(self.data_info, user_ids, computed_recs, inner_id)
result_recs.update(user_recs)
return result_recs
# all the items returned by this function will be inner_ids
@abc.abstractmethod
def recommend_one(self, user_id, n_rec, filter_consumed, random_rec):
...
def rank_recommendations(
self,
user,
ids,
preds,
n_rec,
consumed,
filter_consumed,
random_rec,
):
if filter_consumed:
ids, preds = filter_items(ids, preds, consumed)
# all filtered out by consumed
if len(ids) == 0:
self.print_count += 1
no_str = (
f"no suitable recommendation for user {user}, "
f"return default recommendation"
)
if self.print_count < 11:
print(f"{colorize(no_str, 'red')}")
return popular_recommendations(self.data_info, inner_id=True, n_rec=n_rec)
if random_rec and len(ids) > n_rec:
ids = random.sample(list(ids), k=n_rec)
else:
indices = np.argsort(preds)[::-1]
ids = ids[indices][:n_rec]
return np.asarray(ids)
def get_top_k_sims(self, ui_id):
sim_mat = self.sim_matrix
if sim_mat.indptr[ui_id] == sim_mat.indptr[ui_id + 1]:
return
idx_slice = slice(sim_mat.indptr[ui_id], sim_mat.indptr[ui_id + 1])
sim_ids = sim_mat.indices[idx_slice].tolist()
sim_values = sim_mat.data[idx_slice].tolist()
sorted_sims = sorted(zip(sim_ids, sim_values), key=itemgetter(1), reverse=True)
return sorted_sims[: self.k_sim]
def compute_top_k(self):
num = self.n_users if self.cf_type == "user_cf" else self.n_items
top_k = dict()
for i in tqdm(range(num), desc="top_k"):
top_k[i] = self.get_top_k_sims(i)
self.topk_sim = top_k
[docs] def save(self, path, model_name, **kwargs):
if not os.path.isdir(path):
print(f"file folder {path} doesn't exists, creating a new one...")
os.makedirs(path)
save_params(self, path, model_name)
model_path = os.path.join(path, model_name)
save_sparse(f"{model_path}_sim_matrix", self.sim_matrix)
save_sparse(f"{model_path}_user_inter", self.user_interaction)
save_sparse(f"{model_path}_item_inter", self.item_interaction)
[docs] @classmethod
def load(cls, path, model_name, data_info, **kwargs):
hparams = load_params(path, data_info, model_name)
model = cls(**hparams)
model_path = os.path.join(path, model_name)
model.sim_matrix = load_sparse(f"{model_path}_sim_matrix.npz")
model.user_interaction = load_sparse(f"{model_path}_user_inter.npz")
model.item_interaction = load_sparse(f"{model_path}_item_inter.npz")
return model