Source code for libserving.serialization.redis

import contextlib
import os

import redis
import ujson


@contextlib.contextmanager
def redis_connection(host: str, port: int, db: int):
    r = None
    try:
        r = redis.Redis(host=host, port=port, db=db, decode_responses=True)
        yield r
    except (redis.ConnectionError, redis.DataError):
        raise
    finally:
        if r:
            r.close()


[docs]def knn2redis(path: str, host: str = "localhost", port: int = 6379, db: int = 0): """Save KNN model to redis. Parameters ---------- path : str Model saving path. host : str, default: "localhost" Redis host. port : int, default: 6379 Redis port db : int, default: 0 Redis db number """ with redis_connection(host, port, db) as r: model_name2redis(path, r) id_mapping2redis(path, r) user_consumed2redis(path, r) sim2redis(path, r)
[docs]def embed2redis(path: str, host: str = "localhost", port: int = 6379, db: int = 0): """Save Embed model to redis. Parameters ---------- path : str Model saving path. host : str, default: "localhost" Redis host. port : int, default: 6379 Redis port db : int, default: 0 Redis db number """ with redis_connection(host, port, db) as r: model_name2redis(path, r) id_mapping2redis(path, r) user_consumed2redis(path, r) user_embed2redis(path, r)
[docs]def tf2redis(path: str, host: str = "localhost", port: int = 6379, db: int = 0): """Save TF model to redis. Parameters ---------- path : str Model saving path. host : str, default: "localhost" Redis host. port : int, default: 6379 Redis port db : int, default: 0 Redis db number """ with redis_connection(host, port, db) as r: model_name2redis(path, r) id_mapping2redis(path, r) user_consumed2redis(path, r) features2redis(path, r)
[docs]def online2redis(path: str, host: str = "localhost", port: int = 6379, db: int = 0): """Save online computing model to redis. Parameters ---------- path : str Model saving path. host : str, default: "localhost" Redis host. port : int, default: 6379 Redis port db : int, default: 0 Redis db number """ with redis_connection(host, port, db) as r: model_name2redis(path, r) id_mapping2redis(path, r) user_consumed2redis(path, r) features2redis(path, r) user_sparse2redis(path, r) user_dense2redis(path, r)
def model_name2redis(path: str, r: redis.Redis): model_name_path = os.path.join(path, "model_name.json") with open(model_name_path) as f: m = ujson.load(f) r.set("model_name", m["model_name"]) def id_mapping2redis(path: str, r: redis.Redis): user2id_path = os.path.join(path, "user2id.json") id2item_path = os.path.join(path, "id2item.json") item2id_path = os.path.join(path, "item2id.json") with open(user2id_path) as f1, open(id2item_path) as f2, open(item2id_path) as f3: user2id = ujson.load(f1) id2item = ujson.load(f2) item2id = ujson.load(f3) r.hset("user2id", mapping=user2id) r.hset("id2item", mapping=id2item) r.hset("item2id", mapping=item2id) def user_consumed2redis(path: str, r: redis.Redis): user_consumed_path = os.path.join(path, "user_consumed.json") with open(user_consumed_path) as f: user_consumed = ujson.load(f) pipe = r.pipeline() for u, items in user_consumed.items(): pipe.hset("user_consumed", u, ujson.dumps(items)) pipe.execute() def sim2redis(path: str, r: redis.Redis): sim_path = os.path.join(path, "sim.json") with open(sim_path) as f: sim = ujson.load(f) pipe = r.pipeline() for k, k_sims in sim.items(): pipe.hset("k_sims", k, ujson.dumps(k_sims)) pipe.execute() def user_embed2redis(path: str, r: redis.Redis): embed_path = os.path.join(path, "user_embed.json") with open(embed_path) as f: user_embeds = ujson.load(f) pipe = r.pipeline() for u, embed in user_embeds.items(): pipe.hset("user_embed", u, ujson.dumps(embed)) pipe.execute() def features2redis(path: str, r: redis.Redis): feature_path = os.path.join(path, "features.json") with open(feature_path) as f: feats = ujson.load(f) r.set("n_users", feats["n_users"]) r.set("n_items", feats["n_items"]) if "max_seq_len" in feats: r.set("max_seq_len", feats["max_seq_len"]) if "user_sparse_col_index" in feats: r.hset("feature", "user_sparse", 1) r.set("user_sparse_col_index", ujson.dumps(feats["user_sparse_col_index"])) pipe = r.pipeline() for u, vals in enumerate(feats["user_sparse_values"]): pipe.hset("user_sparse_values", str(u), ujson.dumps(vals)) pipe.execute() if "item_sparse_col_index" in feats: r.hset("feature", "item_sparse", 1) r.set("item_sparse_col_index", ujson.dumps(feats["item_sparse_col_index"])) pipe = r.pipeline() for vals in feats["item_sparse_values"]: pipe.rpush("item_sparse_values", ujson.dumps(vals)) pipe.execute() if "user_dense_col_index" in feats: r.hset("feature", "user_dense", 1) r.set("user_dense_col_index", ujson.dumps(feats["user_dense_col_index"])) pipe = r.pipeline() for u, vals in enumerate(feats["user_dense_values"]): pipe.hset("user_dense_values", str(u), ujson.dumps(vals)) pipe.execute() if "item_dense_col_index" in feats: r.hset("feature", "item_dense", 1) r.set("item_dense_col_index", ujson.dumps(feats["item_dense_col_index"])) pipe = r.pipeline() for vals in feats["item_dense_values"]: pipe.rpush("item_dense_values", ujson.dumps(vals)) pipe.execute() def user_sparse2redis(path: str, r: redis.Redis): user_sparse_fields_path = os.path.join(path, "user_sparse_fields.json") if os.path.exists(user_sparse_fields_path): with open(user_sparse_fields_path) as f: user_sparse_fields = ujson.load(f) r.hset("user_sparse_fields", mapping=user_sparse_fields) user_sparse_idx_mapping_path = os.path.join(path, "user_sparse_idx_mapping.json") if os.path.exists(user_sparse_idx_mapping_path): with open(user_sparse_idx_mapping_path) as f: user_sparse_idx_mapping = ujson.load(f) for col, idx_mapping in user_sparse_idx_mapping.items(): col_name = f"user_sparse_idx_mapping__{col}" r.hset(col_name, mapping=idx_mapping) def user_dense2redis(path: str, r: redis.Redis): user_dense_fields_path = os.path.join(path, "user_dense_fields.json") if os.path.exists(user_dense_fields_path): with open(user_dense_fields_path) as f: user_dense_fields = ujson.load(f) r.hset("user_dense_fields", mapping=user_dense_fields)