import numpy as np
from sklearn.preprocessing import (
MinMaxScaler,
PowerTransformer,
RobustScaler,
StandardScaler,
)
def process_data(
data, dense_col=None, normalizer="min_max", transformer=("log", "sqrt", "square")
):
if not isinstance(dense_col, list):
raise ValueError("dense_col must be a list...")
if normalizer.lower() == "min_max":
scaler = MinMaxScaler()
elif normalizer.lower() == "standard":
scaler = StandardScaler()
elif normalizer.lower() == "robust":
scaler = RobustScaler()
elif normalizer.lower() == "power":
scaler = PowerTransformer()
else:
raise ValueError("unknown normalize type...")
dense_col_transformed = dense_col.copy()
if isinstance(data, (list, tuple)):
for i, d in enumerate(data):
if i == 0: # assume train_data is the first one
d[dense_col] = scaler.fit_transform(d[dense_col]).astype(np.float32)
else:
d[dense_col] = scaler.transform(d[dense_col]).astype(np.float32)
for col in dense_col:
if d[col].min() < 0.0:
print("can't transform negative values...")
continue
if transformer is not None:
if "log" in transformer:
name = col + "_log"
d[name] = np.log1p(d[col])
if i == 0:
dense_col_transformed.append(name)
if "sqrt" in transformer:
name = col + "_sqrt"
d[name] = np.sqrt(d[col])
if i == 0:
dense_col_transformed.append(name)
if "square" in transformer:
name = col + "_square"
d[name] = np.square(d[col])
if i == 0:
dense_col_transformed.append(name)
else:
data[dense_col] = scaler.fit_transform(data[dense_col])
for col in dense_col:
if data[col].min() < 0.0:
print("can't transform negative values...")
continue
if transformer is not None:
if "log" in transformer:
name = col + "_log"
data[name] = np.log1p(data[col])
dense_col_transformed.append(name)
if "sqrt" in transformer:
name = col + "_sqrt"
data[name] = np.sqrt(data[col])
dense_col_transformed.append(name)
if "square" in transformer:
name = col + "_square"
data[name] = np.square(data[col])
dense_col_transformed.append(name)
return data, dense_col_transformed
[docs]def split_multi_value(
data,
multi_value_col,
sep,
max_len=None,
pad_val="missing",
user_col=None,
item_col=None,
):
"""Transform multi-valued features to the divided sub-features.
Parameters
----------
data : pandas.DataFrame
Original data.
multi_value_col : list of str
Multi-value columns names.
sep : str
Delimiter to use.
max_len : list or tuple of int or None, default: None
The maximum number of sub-features after transformation.
If it is None, the maximum category length of all the samples will be used.
If not None, it should be a list or tuple,
because there are possibly many ``multi_value`` features.
pad_val : Any or list of Any, default: "missing"
The padding value used for missing features.
user_col : list of str or None, default: None
User column names.
item_col : list of str or None, default: None
Item column names.
Returns
-------
data : pandas.DataFrame
Transformed data.
multi_sparse_col : list of str
Transformed multi-sparse column names.
user_sparse_col : list of str
Transformed user columns.
item_sparse_col : list of str
Transformed item columns.
Raises
------
AssertionError
If ``max_len`` is not list or tuple.
AssertionError
If ``max_len`` size != ``multi_value_col`` size.
"""
if max_len is not None:
assert isinstance(max_len, (list, tuple)), "`max_len` must be list or tuple"
assert len(max_len) == len(multi_value_col), (
"`max_len` must have same length as `multi_value_col`"
) # fmt: skip
if not isinstance(pad_val, (list, tuple)):
pad_val = [pad_val] * len(multi_value_col)
assert len(multi_value_col) == len(pad_val), (
"length of `multi_sparse_col` and `pad_val` doesn't match"
) # fmt: skip
user_sparse_col, item_sparse_col, multi_sparse_col = [], [], []
for j, col in enumerate(multi_value_col):
sparse_col = []
data[col] = (
data[col]
.str.strip(sep + " ")
.str.replace("\\s+", "", regex=True)
.str.lower()
)
data.loc[data[col] == "", col] = pad_val[j]
split_col = data[col].str.split(sep)
col_len = int(split_col.str.len().max()) if max_len is None else max_len[j]
for i in range(col_len):
new_col_name = col + f"_{i+1}"
sparse_col.append(new_col_name)
data[new_col_name] = split_col.str.get(i)
data[new_col_name] = data[new_col_name].fillna(pad_val[j])
multi_sparse_col.append(sparse_col)
if user_col is not None and col in user_col:
user_sparse_col.extend(sparse_col)
elif item_col is not None and col in item_col:
item_sparse_col.extend(sparse_col)
data = data.fillna(pad_val[0]).drop(multi_value_col, axis=1)
return data, multi_sparse_col, user_sparse_col, item_sparse_col