"""Implementation of Alternating Least Squares."""
import logging
import os
from functools import partial
import numpy as np
from ..bases import EmbedBase
from ..evaluation import print_metrics
from ..recommendation import recommend_from_embedding
from ..utils.initializers import truncated_normal
from ..utils.misc import time_block
from ..utils.save_load import save_default_recs, save_params
from ..utils.validate import check_fitting
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT)
[docs]class ALS(EmbedBase):
"""*Alternating Least Squares* algorithm.
One can use conjugate gradient optimization and set more `n_threads`
to accelerate training.
Parameters
----------
task : {'rating', 'ranking'}
Recommendation task. See :ref:`Task`.
data_info : :class:`~libreco.data.DataInfo` object
Object that contains useful information for training and inference.
embed_size : int, default: 16
Vector size of embeddings.
n_epochs : int, default: 10
Number of epochs for training.
reg : float or None, default: None
Regularization parameter, must be non-negative or None.
alpha : int, default: 10
Parameter used for increasing confidence level, only applied for `ranking` task.
use_cg : bool, default: True
Whether to use *conjugate gradient* optimization. See `reference <http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.379.6473&rep=rep1&type=pdf>`_.
n_threads : int, default: 1
Number of threads to use.
seed : int, default: 42
Random seed.
lower_upper_bound : tuple or None, default: None
Lower and upper score bound for `rating` task.
References
----------
[1] *Haoming Li et al.* `Matrix Completion via Alternating Least Square(ALS)
<https://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf>`_.
[2] *Yifan Hu et al.* `Collaborative Filtering for Implicit Feedback Datasets
<http://yifanhu.net/PUB/cf.pdf>`_.
[3] *Gábor Takács et al.* `Applications of the Conjugate Gradient Method for Implicit Feedback Collaborative Filtering
<http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.379.6473&rep=rep1&type=pdf>`_.
"""
def __init__(
self,
task,
data_info,
embed_size=16,
n_epochs=10,
reg=None,
alpha=10,
use_cg=True,
n_threads=1,
seed=42,
lower_upper_bound=None,
):
super().__init__(task, data_info, embed_size, lower_upper_bound)
self.all_args = locals()
self.n_epochs = n_epochs
self.reg = self._check_reg(reg)
self.alpha = alpha
self.use_cg = use_cg
self.n_threads = n_threads
self.seed = seed
def build_model(self):
np_rng = np.random.default_rng(self.seed)
self.user_embeds_np = truncated_normal(
np_rng, shape=[self.n_users, self.embed_size], mean=0.0, scale=0.03
)
self.item_embeds_np = truncated_normal(
np_rng, shape=[self.n_items, self.embed_size], mean=0.0, scale=0.03
)
[docs] def fit(
self,
train_data,
neg_sampling,
verbose=1,
shuffle=True,
eval_data=None,
metrics=None,
k=10,
eval_batch_size=8192,
eval_user_num=None,
**kwargs,
):
"""Fit ALS model on the training data.
Parameters
----------
train_data : :class:`~libreco.data.TransformedSet` object
Data object used for training.
neg_sampling : bool
Whether to perform negative sampling for evaluating data.
.. versionadded:: 1.1.0
verbose : int, default: 1
Print verbosity. If `eval_data` is provided, setting it to higher than 1
will print evaluation metrics during training.
shuffle : bool, default: True
Whether to shuffle the training data.
eval_data : :class:`~libreco.data.TransformedSet` object, default: None
Data object used for evaluating.
metrics : list or None, default: None
List of metrics for evaluating.
k : int, default: 10
Parameter of metrics, e.g. recall at k, ndcg at k
eval_batch_size : int, default: 8192
Batch size for evaluating.
eval_user_num : int or None, default: None
Number of users for evaluating. Setting it to a positive number will sample
users randomly from eval data.
"""
try:
from ._als import als_update
except (ImportError, ModuleNotFoundError):
logging.warning("Als cython version is not available")
raise
check_fitting(self, train_data, eval_data, neg_sampling, k)
self.show_start_time()
if not self.model_built:
self.build_model()
self.model_built = True
user_interaction = train_data.sparse_interaction # sparse.csr_matrix
item_interaction = user_interaction.T.tocsr()
if self.task == "ranking":
user_interaction.data = user_interaction.data * self.alpha + 1
item_interaction.data = item_interaction.data * self.alpha + 1
trainer = partial(als_update, task=self.task, use_cg=self.use_cg)
for epoch in range(1, self.n_epochs + 1):
with time_block(f"Epoch {epoch}", verbose):
trainer(
interaction=user_interaction,
X=self.user_embeds_np,
Y=self.item_embeds_np,
reg=self.reg,
num_threads=self.n_threads,
)
trainer(
interaction=item_interaction,
X=self.item_embeds_np,
Y=self.user_embeds_np,
reg=self.reg,
num_threads=self.n_threads,
)
if verbose > 1:
print_metrics(
model=self,
neg_sampling=neg_sampling,
eval_data=eval_data,
metrics=metrics,
eval_batch_size=eval_batch_size,
k=k,
sample_user_num=eval_user_num,
seed=self.seed,
)
print("=" * 30)
self.assign_embedding_oov()
self.default_recs = recommend_from_embedding(
model=self,
user_ids=[self.n_users],
n_rec=min(2000, self.n_items),
user_embeddings=self.user_embeds_np,
item_embeddings=self.item_embeds_np,
filter_consumed=False,
random_rec=False,
).flatten()
@staticmethod
def _check_reg(reg):
if not isinstance(reg, float) or reg <= 0.0:
raise ValueError(f"`reg` must be float and positive, got {reg}")
return reg
[docs] def save(self, path, model_name, **kwargs):
"""Save model for inference or retraining.
Parameters
----------
path : str
File folder path to save model.
model_name : str
Name of the saved model file.
"""
if not os.path.isdir(path):
print(f"file folder {path} doesn't exists, creating a new one...")
os.makedirs(path)
save_params(self, path, model_name)
save_default_recs(self, path, model_name)
variable_path = os.path.join(path, model_name)
np.savez_compressed(
variable_path,
user_embed=self.user_embeds_np,
item_embed=self.item_embeds_np,
)
def set_embeddings(self): # pragma: no cover
pass
[docs] def rebuild_model(self, path, model_name):
"""Reconstruct model for retraining.
Parameters
----------
path : str
File folder path for saved model.
model_name : str
Name of the saved model file.
"""
self.model_built = True
self.build_model()
variable_path = os.path.join(path, f"{model_name}.npz")
variables = np.load(variable_path)
# remove oov values
old_var = variables["user_embed"][:-1]
self.user_embeds_np[: len(old_var)] = old_var
old_var = variables["item_embed"][:-1]
self.item_embeds_np[: len(old_var)] = old_var
def least_squares(sparse_interaction, X, Y, reg, embed_size, num, mode):
"""Least squares optimization showcase for ALS."""
indices = sparse_interaction.indices
indptr = sparse_interaction.indptr
data = sparse_interaction.data
if mode == "explicit":
for m in range(num):
m_slice = slice(indptr[m], indptr[m + 1])
interacted = Y[indices[m_slice]]
labels = data[m_slice]
A = interacted.T @ interacted + reg * np.eye(embed_size)
b = interacted.T @ labels
X[m] = np.linalg.solve(A, b)
elif mode == "implicit":
init_A = Y.T @ Y + reg * np.eye(embed_size, dtype=np.float32)
for m in range(num):
A = init_A.copy()
b = np.zeros(embed_size, dtype=np.float32)
for i in range(indptr[m], indptr[m + 1]):
factor = Y[indices[i]]
confidence = data[i]
# If confidence = 1, r_ui = 0 means no interaction.
A += (confidence - 1) * np.outer(factor, factor)
b += confidence * factor
X[m] = np.linalg.solve(A, b)
else:
raise ValueError("mode must either be 'explicit' or 'implicit'")
# O(f^3) * m
def least_squares_cg(sparse_interaction, X, Y, reg, embed_size, num, mode, cg_steps=3):
"""Conjugate Gradient optimization showcase for ALS."""
indices = sparse_interaction.indices
indptr = sparse_interaction.indptr
data = sparse_interaction.data
if mode == "explicit":
for m in range(num):
m_slice = slice(indptr[m], indptr[m + 1])
interacted = Y[indices[m_slice]]
labels = data[m_slice]
A = interacted.T @ interacted + reg * np.eye(embed_size)
b = interacted.T @ labels
X[m] = np.linalg.solve(A, b)
elif mode == "implicit":
init_A = Y.T @ Y + reg * np.eye(embed_size, dtype=np.float32)
for m in range(num):
x = X[m]
r = -init_A @ x
# compute r = b - Ax
for i in range(indptr[m], indptr[m + 1]):
y = Y[indices[i]]
confidence = data[i]
r += (confidence - (confidence - 1) * (y @ x)) * y
p = r.copy()
rs_old = r @ r
if rs_old < 1e-10:
continue
for _ in range(cg_steps):
Ap = init_A @ p
for i in range(indptr[m], indptr[m + 1]):
y = Y[indices[i]]
confidence = data[i]
Ap += (confidence - 1) * (y @ p) * y
# standard CG update
ak = rs_old / (p @ Ap)
x += ak * p
r -= ak * Ap
rs_new = r @ r
if rs_new < 1e-10:
break
p = r + (rs_new / rs_old) * p
rs_old = rs_new
X[m] = x
else:
raise ValueError("mode must either be 'explicit' or 'implicit'")