MyCache

本文最后更新于:1 分钟前

使用Map开发一个类似于redis的缓存工具

实现固定缓存

相关接口

为了方便后续扩展首先定义接口ICache继承于Map

public interface ICache<K, V> extends Map<K, V> {
}

因为缓存的大小有限,所以不能无限制的像cache(也就是Map集合)中添加元素 ,当到达容量上限时,需要从cache中淘汰元素,所以需要添加驱逐策略。

public interface ICacheEvict<K, V> {
        /**
     * 驱除策略
     *
     * @param context 上下文
     * @since 0.0.2
     * @return 被移除的明细,没有时返回 null
     */
   evict(final ICacheEvictContext<K, V> context);
}

其中ICacheEvictContext,包含了缓存对象,缓存大小限制,以及需要添加到缓存中的key

/**
 * 缓存驱逐上下文
 * @param <K>
 * @param <V>
 */
public interface ICacheEvictContext<K, V> {

    /**
     * 待缓存的key
     * @return
     */
    K key();

    /**
     * 缓存对象
     * @return
     */
    ICache<K, V > cache();

    /**
     * 缓存大小
     * @return
     */
    int size();
}

代码实现

CacheEvictContext

该类的实现比较简单,实现对单个成员变量的初始化,并返回该对象本身,便于实现fluent流式编程

public class CacheEvictContext<K,V> implements ICacheEvictContext<K, V> {


    /**
     * 新加的 key
     */
    private K key;

    /**
     * cache 实现
     */
    private ICache<K,V> cache;

    /**
     * 最大的大小
     */
    private int size;


    @Override
    public K key() {
        return key;
    }

    public CacheEvictContext<K, V> key(K key) {
        this.key = key;
        return this;
    }

    @Override
    public ICache<K, V> cache() {
        return cache;
    }

    public CacheEvictContext<K, V> cache(ICache<K, V> cache) {
        this.cache = cache;
        return this;
    }

    @Override
    public int size() {
        return size;
    }

    public CacheEvictContext<K, V> size(int size) {
        this.size = size;
        return this;
    }
}

淘汰策略实现cacheEvict.evict(context);如下

CacheEvictFIFO

淘汰策略可以有多种,比如 LRU/LFU/FIFO 等等,我们此处实现一个最基本的 FIFO。

拥有FIFO特性的数据结构就是queue的,那么就用java中LinkedList集合实现该方法。

public class CacheEvictFIFO<K,V> implements ICacheEvict<K,V> {

    /**
     * queue按添加顺序保存key信息
     */
    private Queue<K> queue = new LinkedList<>();

    @Override
    public void evict(ICacheEvictContext<K, V> context) {
        final ICache<K,V> cache = context.cache();
        // 超过限制,执行移除
        if(cache.size() >= context.size()) {
            K evictKey = queue.remove();
            // 移除最开始的元素
            cache.remove(evictKey);
        }

        // 将新加的元素放入队尾
        final K key = context.key();
        queue.add(key);
    }
}

cache

核心的cache类,向其中添加一些关键属性

public class Cache<K,V> implements ICache<K,V> {

    /**
     * map信息
     */
    private Map<K,V> map;

    /**
     * 缓存大小限制
     */
    private int sizeLimit;


    /**
     * 驱除策略
     */
    private ICacheEvict<K,V> evict;
  
    public Cache(Map<K, V> map, int sizeLimit, ICacheEvict<K, V> evict) {
        this.map = map;
        this.sizeLimit = sizeLimit;
        this.evict = evict;
    }
    
    // Override Map method
}

一些方法的重写没有在此贴出,使用快捷键补全后,调用map参数对应的方法即可。

对于put方法的改动比较大

  • 驱逐旧元素
  • 添加新元素
@Override
public V put(K key, V value) {
    //1.1 尝试驱除
    CacheEvictContext<K,V> context = new CacheEvictContext<>();
    context.key(key).size(sizeLimit).cache(this);
    cacheEvict.evict(context);
    //2. 判断驱除后的信息
    if(isSizeLimit()) {
        throw new CacheRuntimeException("当前队列已满,数据添加失败!");
    }
    //3. 执行添加
    return map.put(key, value);
}

private boolean isSizeLimit() {
    final int currentSize = this.size();
    return currentSize >= this.sizeLimit;
}

可以让用户动态指定大小,但是指定大小肯就要有对应的淘汰策略。否则,固定大小的 map 肯定无法放入元素。

CacheBs

为了方便用户创建客户端,可以创建一个引导类,这里使用到了fluent流式写法。

/**
 * 缓存引导类
 */
public final class CacheBs<K,V> {

    private CacheBs(){}

    /**
     * 创建对象实例
     * @param <K> key
     * @param <V> value
     * @return this
     */
    public static <K,V> CacheBs<K,V> newInstance() {
        return new CacheBs<>();
    }

    /**
     * map 实现
     */
    private Map<K,V> map = new HashMap<>();

    /**
     * 大小限制
     */
    private int size = Integer.MAX_VALUE;

    /**
     * 驱除策略
     */
    private ICacheEvict<K,V> evict = CacheEvicts.fifo();

    /**
     * map 实现
     * @param map map
     * @return this
     */
    public CacheBs<K, V> map(Map<K, V> map) {
        ArgUtil.notNull(map, "map");

        this.map = map;
        return this;
    }

    /**
     * 设置 size 信息
     * @param size size
     * @return this
     */
    public CacheBs<K, V> size(int size) {
        ArgUtil.notNegative(size, "size");

        this.size = size;
        return this;
    }

    /**
     * 设置驱除策略
     * @param evict 驱除策略
     * @return this
     */
    public CacheBs<K, V> evict(ICacheEvict<K, V> evict) {
        this.evict = evict;
        return this;
    }

    /**
     * 构建缓存信息
     * @return 缓存信息
     */
    public ICache<K,V> build() {
        CacheContext<K,V> context = new CacheContext<>();
        context.cacheEvict(evict);
        context.map(map);
        context.size(size);
        return new Cache<>(context);
    }

}

测试

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(2)
        .build();
cache.put("1", "1");
cache.put("2", "2");
cache.put("3", "3");
cache.put("4", "4");
System.out.println(cache.keySet());

默认为先进先出的策略,此时输出 keys,内容如下:

[3, 4]

实现key过期

redis中可以设置key的过期时间,这是一个非常有用的功能,比如将短信的验证码设置5分钟的过期时间;登录凭证设置一天有效等,这些场景都需要使用。

相关接口

ICache

首先在ICache中添加两个方法

  • expire()多久后过期

  • expireAt()在什么时间过期

public interface ICache<K, V> extends Map<K, V> {

    /**
     * 设置过期时间
     * (1)如果 key 不存在,则什么都不做。
     * (2)暂时不提供新建 key 指定过期时间的方式,会破坏原来的方法。
     *
     * 会做什么:
     * 类似于 redis
     * (1)惰性删除。
     * 在执行下面的方法时,如果过期则进行删除。
     * {@link ICache#get(Object)} 获取
     * {@link ICache#values()} 获取所有值
     * {@link ICache#entrySet()} 获取所有明细
     *
     * 【数据的不一致性】
     * 调用其他方法,可能得到的不是使用者的预期结果,因为此时的 expire 信息可能没有被及时更新。
     * 比如
     * {@link ICache#isEmpty()} 是否为空
     * {@link ICache#size()} 当前大小
     * 同时会导致以 size() 作为过期条件的问题。
     *
     * 解决方案:考虑添加 refresh 等方法,暂时不做一致性的考虑。
     * 对于实际的使用,我们更关心 K/V 的信息。
     *
     * (2)定时删除
     * 启动一个定时任务。每次随机选择指定大小的 key 进行是否过期判断。
     * 类似于 redis,为了简化,可以考虑设定超时时间,频率与超时时间成反比。
     *
     * 其他拓展性考虑:
     * 后期考虑提供原子性操作,保证事务性。暂时不做考虑。
     * 此处默认使用 TTL 作为比较的基准,暂时不想支持 LastAccessTime 的淘汰策略。会增加复杂度。
     * 如果增加 lastAccessTime 过期,本方法可以不做修改。
     *
     * @param key         key
     * @param timeInMills 毫秒时间之后过期
     * @return this
     */
    ICache<K, V> expire(final K key, final long timeInMills);

    /**
     * 在指定的时间过期
     * @param key key
     * @param timeInMills 时间戳
     * @return this
     */
    ICache<K, V> expireAt(final K key, final long timeInMills);

}

ICacheExpire

定义缓存过期的处理结构

public interface ICacheExpire<K,V> {

    /**
     * 指定过期信息
     * @param key key
     * @param expireAt 什么时候过期
     */
    void expire(final K key, final long expireAt);

    /**
     * 惰性删除中需要处理的 keys
     * @param keyList keys
     */
    void refreshExpire(final Collection<K> keyList);
}

代码实现

CacheExpire

实现定期删除过期key的思路,开一个线程定时执行,从cache种删除过期的key

  • 需要创建一个集合保存key于expiretime之间的关系
  • 创建一个线程用于清除过期的key
private final Map<K, Long> expireMap = new HashMap<>();

@Override
public void expire(K key, long expireAt) {
    expireMap.put(key, expireAt);
}

为了防止每次清理的时间占用过长时间,这里限制每次最多清理100

/**
 * 单次清空的数量限制
 */
private static final int LIMIT = 100;

/**
 * 缓存实现
 */
private final ICache<K,V> cache;
/**
 * 线程执行类
 */
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpire(ICache<K, V> cache) {
    this.cache = cache;
    this.init();
}
/**
 * 初始化任务
 */
private void init() {
    EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100, 100, TimeUnit.MILLISECONDS);
}

创建清理过期key的线程类ExpireThread

此类作为CacheExpire的内部了

private class ExpireThread implements Runnable {
    @Override
    public void run() {
        //1.判断是否为空
        if(MapUtil.isEmpty(expireMap)) {
            return;
        }
        //2. 获取 key 进行处理
        int count = 0;
        for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
            if(count >= LIMIT) {
                return;
            }
            expireKey(entry);
            count++;
        }
    }
}

执行清除的函数

/**
 * 执行过期操作
 * @param entry 明细
 */
private void expireKey(Map.Entry<K, Long> entry) {
    final K key = entry.getKey();
    final Long expireAt = entry.getValue();
    // 删除的逻辑处理
    long currentTime = System.currentTimeMillis();
    if(currentTime >= expireAt) {
        expireMap.remove(key);
        // 再移除缓存,后续可以通过惰性删除做补偿
        cache.remove(key);
    }
}

执行流程

  • CacheExpire被创建时,会启动定时清除的子线程
  • 当调用cache.expire()和cache.expireAt()方法时,会将key与expiretime保存到expireMap
  • 子线程不断执行,检查expireMap是否为空
    • 为空,则结束本次执行
    • 不为空,检查清除操作

优化

如果过期的应用场景不多,那么经常轮训的意义实际不大。

比如我们的任务 99% 都是在凌晨清空数据,白天无论怎么轮询,纯粹是浪费资源。

那有没有什么方法,可以快速的判断有没有需要处理的过期元素呢?

答案是有的,那就是排序的 MAP。

我们换一种思路,让过期的时间做 key,相同时间的需要过期的信息放在一个列表中,作为 value。

然后对过期时间排序,轮询的时候就可以快速判断出是否有过期的信息了。

CacheExpireSort

public class CacheExpireSort<K,V> implements ICacheExpire<K,V> {

    /**
     * 单次清空的数量限制
     */
    private static final int LIMIT = 100;

    /**
     * 排序缓存存储
     * 使用按照时间排序的缓存处理。
     */
    private final Map<Long, List<K>> sortMap = new TreeMap<>(new Comparator<Long>() {
        @Override
        public int compare(Long o1, Long o2) {
            return (int) (o1-o2);
        }
    });

    /**
     * 过期 map
     *
     * 空间换时间
     */
    private final Map<K, Long> expireMap = new HashMap<>();

    /**
     * 缓存实现
     */
    private final ICache<K,V> cache;

    /**
     * 线程执行类
     */
    private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

    public CacheExpireSort(ICache<K, V> cache) {
        this.cache = cache;
        this.init();
    }

    /**
     * 初始化任务
     */
    private void init() {
        EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 1, 1, TimeUnit.SECONDS);
    }

    /**
     * 定时执行任务
     */
    private class ExpireThread implements Runnable {
        @Override
        public void run() {
            //1.判断是否为空
            if(MapUtil.isEmpty(sortMap)) {
                return;
            }

            //2. 获取 key 进行处理
            int count = 0;
            for(Map.Entry<Long, List<K>> entry : sortMap.entrySet()) {
                final Long expireAt = entry.getKey();
                List<K> expireKeys = entry.getValue();

                // 判断队列是否为空
                if(CollectionUtil.isEmpty(expireKeys)) {
                    sortMap.remove(expireAt);
                    continue;
                }
                if(count >= LIMIT) {
                    return;
                }

                // 删除的逻辑处理
                long currentTime = System.currentTimeMillis();
                if(currentTime >= expireAt) {
                    Iterator<K> iterator = expireKeys.iterator();
                    while (iterator.hasNext()) {
                        K key = iterator.next();
                        // 先移除本身
                        iterator.remove();
                        expireMap.remove(key);

                        // 再移除缓存,后续可以通过惰性删除做补偿
                        cache.remove(key);

                        count++;
                    }
                } else {
                    // 直接跳过,没有过期的信息
                    return;
                }
            }
        }
    }

    @Override
    public void expire(K key, long expireAt) {
        List<K> keys = sortMap.get(expireAt);
        if(keys == null) {
            keys = new ArrayList<>();
        }
        keys.add(key);

        // 设置对应的信息
        sortMap.put(expireAt, keys);
        expireMap.put(key, expireAt);
    }
}

Cache

添加成员变量

private ICacheExpire<K,V> expire;

构造方法

public Cache(Map<K, V> map, int sizeLimit, ICacheEvict<K, V> evict) {
    this.map = map;
    this.sizeLimit = sizeLimit;
    this.evict = evict;
    // 修改过期策略时,修改此处即可
    this.expire = new CacheExpire<>(this);
}

方法实现,为了方便起见,将两种过期方式转换为一种,多长时间后过期

@Override
public ICache<K, V> expire(K key, long timeInMills) {
    long expireTime = System.currentTimeMillis() + timeInMills;
    return this.expireAt(key, expireTime);
}

@Override
public ICache<K, V> expireAt(K key, long timeInMills) {
    this.cacheExpire.expire(key, timeInMills);
    return this;
}

惰性删除

类似于 redis,我们采用定时删除的方案,就有一个问题:可能数据清理的不及时。那当我们查询时,可能获取到到是脏数据。

当我们关心某些数据时,才对数据做对应的删除判断操作,这样压力会小很多。

需要惰性删除的方法:各种数据查询的方法

@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
    //1. 刷新所有过期信息
    K genericKey = (K) key;
    this.cacheExpire.refreshExpire(Collections.singletonList(genericKey));
    return map.get(key);
}

在获取数据之前对数据进行刷新,就是refreshExpire()函数

public void refreshExpire(Collection<K> keyList) {
    if(CollectionUtil.isEmpty(keyList)) {
        return;
    }
    // 判断大小。一般都是过期的 keys 比较小。
    if(keyList.size() <= expireMap.size()) {
        for(K key : keyList) {
            expireKey(key);
        }
    } else {
        for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
            this.expireKey(entry);
        }
    }
}

随机删除

Redis内部维护一个定时任务,默认每秒运行10次(通过配置hz控制)。

定时任务中删除过期键逻辑采用了自适应算法,根据键的过期比例、使用快慢两种速率模式回收键

1)定时任务在每个数据库空间随机检查20个键,当发现过期时删除对应的键。

2)如果超过检查数25%的键过期,循环执行回收逻辑直到不足25%或运行超时为止,慢模式下超时时间为25毫秒。

3)如果之前回收键逻辑超时,则在Redis触发内部事件之前再次以快模式运行回收过期键任务,快模式下超时时间为1毫秒且2秒内只能运行1次。

4)快慢两种模式内部删除逻辑相同,只是执行的超时时间不同。

ps: 这里的快慢模式设计的也比较巧妙,根据过期信息的比例,调整对应的任务超时时间。

这里的随机也非常重要,可以比较客观的清理掉过期信息,而不是从头遍历,导致后面的数据无法被访问

基本属性

public class CacheExpireRandom<K,V> implements ICacheExpire<K,V> {

    private static final Log log = LogFactory.getLog(CacheExpireRandom.class);

    /**
     * 单次清空的数量限制
     * @since 0.0.16
     */
    private static final int COUNT_LIMIT = 100;

    /**
     * 过期 map
     *
     * 空间换时间
     * @since 0.0.16
     */
    private final Map<K, Long> expireMap = new HashMap<>();

    /**
     * 缓存实现
     * @since 0.0.16
     */
    private final ICache<K,V> cache;

    /**
     * 是否启用快模式
     * @since 0.0.16
     */
    private volatile boolean fastMode = false;

    /**
     * 线程执行类
     * @since 0.0.16
     */
    private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

    public CacheExpireRandom(ICache<K, V> cache) {
        this.cache = cache;
        this.init();
    }

    /**
     * 初始化任务
     * @since 0.0.16
     */
    private void init() {
        EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThreadRandom(), 10, 10, TimeUnit.SECONDS);
    }

}

定时任务

这里与redis保持一致,支持fastMode

实际上fastMode和慢模式的逻辑一样,只是超时的时间不同

/**
 * 定时执行任务
 * @since 0.0.16
 */
private class ExpireThreadRandom implements Runnable {
    @Override
    public void run() {
        //1.判断是否为空
        if(MapUtil.isEmpty(expireMap)) {
            log.info("expireMap 信息为空,直接跳过本次处理。");
            return;
        }
        //2. 是否启用快模式
        if(fastMode) {
            expireKeys(10L);
        }
        //3. 缓慢模式
        expireKeys(100L);
    }
}

过期信息核心实现

执行过期时,首先会记录超时时间,用于超时时之间中断执行

默认回复fastMode = false,当执行超时的时候设置fastMode=true

/**
 * 过期信息
 * @param timeoutMills 超时时间
 * @since 0.0.16
 */
private void expireKeys(final long timeoutMills) {
    // 设置超时时间 100ms
    final long timeLimit = System.currentTimeMillis() + timeoutMills;
    // 恢复 fastMode
    this.fastMode = false;
    //2. 获取 key 进行处理
    int count = 0;
    while (true) {
        //2.1 返回判断
        if(count >= COUNT_LIMIT) {
            log.info("过期淘汰次数已经达到最大次数: {},完成本次执行。", COUNT_LIMIT);
            return;
        }
        if(System.currentTimeMillis() >= timeLimit) {
            this.fastMode = true;
            log.info("过期淘汰已经达到限制时间,中断本次执行,设置 fastMode=true;");
            return;
        }
        //2.2 随机过期
        K key = getRandomKey();
        Long expireAt = expireMap.get(key);
        boolean expireFlag = expireKey(key, expireAt);
        log.debug("key: {} 过期执行结果 {}", key, expireFlag);
        //2.3 信息更新
        count++;
    }
}

随机获取过期key

/**
 * 随机获取一个 key 信息
 * @return 随机返回的 keys
 * @since 0.0.16
 */
private K getRandomKey() {
    Random random = ThreadLocalRandom.current();
    Set<K> keySet = expireMap.keySet();
    List<K> list = new ArrayList<>(keySet);
    int randomIndex = random.nextInt(list.size());
    return list.get(randomIndex);
}

直接将所有的keys转换为list然后通过random获取一个元素

但是存在一个缺点,getRandomKey() 方法为了获取一个随机的信息,代价还是太大了。如果 keys 的数量非常大,那么我们要创建一个 list,这个本身就是非常耗时的,而且空间复杂度直接翻倍。

优化思路

1.避免空间浪费

最简单的思路应该是避免list的创建

我们所要的只是基于size的随机值,我们可以遍历获取

private K getRandomKey2() {
    Random random = ThreadLocalRandom.current();
    int randomIndex = random.nextInt(expireMap.size());
    // 遍历 keys
    Iterator<K> iterator = expireMap.keySet().iterator();
    int count = 0;
    while (iterator.hasNext()) {
        K key = iterator.next();
        if(count == randomIndex) {
            return key;
        }
        count++;
    }
    // 正常逻辑不会到这里
    throw new CacheRuntimeException("对应信息不存在");
}

2.批量操作

上述的方法避免了 list 的创建,同时也符合随机的条件。

但是从头遍历到随机的 size 数值,这也是一个比较慢的过程(O(N) 时间复杂度)。

如果我们取 100 次,悲观的话就是 100 * O(N)。

我们可以运用批量的思想,比如一次取 100 个,降低时间复杂度:

/**
 * 批量获取多个 key 信息
 * @param sizeLimit 大小限制
 * @return 随机返回的 keys
 * @since 0.0.16
 */
private Set<K> getRandomKeyBatch(final int sizeLimit) {
    Random random = ThreadLocalRandom.current();
    int randomIndex = random.nextInt(expireMap.size());
    // 遍历 keys
    Iterator<K> iterator = expireMap.keySet().iterator();
    int count = 0;
    Set<K> keySet = new HashSet<>();
    while (iterator.hasNext()) {
        // 判断列表大小
        if(keySet.size() >= sizeLimit) {
            return keySet;
        }
        K key = iterator.next();
        // index 向后的位置,全部放进来。
        if(count >= randomIndex) {
            keySet.add(key);
        }
        count++;
    }
    // 正常逻辑不会到这里
    throw new CacheRuntimeException("对应信息不存在");
}

创建代理对象

相关接口

ICacheProxy

因为后面要实现几种类型的代理对象,所以先抽象出一个接口,方便使用

public interface ICacheProxy {

    /**
     * 获取代理实现
     * @return 代理
     */
    Object proxy();

}

ICacheProxyBsContext

  /**
   * 拦截器信息
   * @return 拦截器
   */
  CacheInterceptor interceptor();

  /**
   * 获取代理对象信息
   * @return 代理
   */
  ICache target();

  /**
   * 目标对象
   * @param target 对象
   * @return 结果
   */
  ICacheProxyBsContext target(final ICache target);

  /**
   * 参数信息
   * @return 参数信息
   */
  Object[] params();

  /**
   * 方法信息
   * @return 方法信息
   */
  Method method();

  /**
   * 方法执行
   * @return 执行
   * @throws Throwable 异常信息
   */
Object process() throws Throwable;

接口实现

CacheProxyBsContext

public class CacheProxyBsContext implements ICacheProxyBsContext {

    /**
     * 代理目标对象
     */
    private ICache target;

    /**
     * 方法执行的参数
     */
    private Object[] params;

    /**
     * 方法
     * @since 0.0.4
     */
    private Method method;


    /**
     * 新建对象
     * @return 对象
     */
    public static CacheProxyBsContext newInstance(){
        return new CacheProxyBsContext();
    }

    @Override
    public CacheInterceptor interceptor() {
        return interceptor;
    }

    @Override
    public ICache target() {
        return target;
    }

    @Override
    public ICacheProxyBsContext target(ICache target) {
        this.target = target;
        return this;
    }

    @Override
    public Object[] params() {
        return  params;
    }

    public CacheProxyBsContext params(Object[] params) {
        this.params = params;
        return this;
    }

    @Override
    public Method method() {
        return method;
    }

    public CacheProxyBsContext method(Method method) {
        this.method = method;
        return this;
    }

    @Override
    public Object process() throws Throwable {
        //通过反射调用方法 也就是目标代理对象要执行的方法
        return this.method.invoke(target, params);
    }
}

引导类代理对象

创建

public class CacheProxyBs {

    private CacheProxyBs(){}
    /**
     * 代理上下文
     */
    private ICacheProxyBsContext context;


    public static CacheProxyBs newInstance(){
        return new CacheProxyBs();
    }

    public CacheProxyBs context(ICacheProxyBsContext context) {
        this.context = context;
        return this;
    }

    /**
     * 该方法在每个方法执行之前会获取其执行的具体信息
     * 包括执方法名,参数,执行结果,执行结果
     * 在执行前会打印执行时间
     * 然后记录执行时间
     * 打印最后的执行时间
     * @return
     * @throws Throwable
     */
    @SuppressWarnings("all")
    public Object execute() throws Throwable {
        long startMills = System.currentTimeMillis();
        final ICache cache = context.target();
        Object result = context.process();
        return result;
    }
}

代理对象

创建代理对象的目的是为了对Map的原方法进行一些增强,比如统计某次调用的执行时间,可以在调用前记录下时间后,再调用目标对象的方法,之后再用当前时间减去调用前的时间,即可获取本次调用花费的时间。这个可以用于统计慢操作日志,后续会进行开发。

此外结合自定义注解,可以在一些需要加监听器的方法上添加对应注解,即可在调用此方法时,进行一些操作,比如刷新操作。

jdk动态代理

在java动态代理机制中,InvocationHandler 接口和 Proxy 类是核心。

Proxy 类中使用频率最高的方法是:newProxyInstance() ,这个方法主要用来生成一个代理对象。

@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
                                      Class<?>[] interfaces,
                                      InvocationHandler h)

这个方法一共有 3 个参数:

  • loader :类加载器,用于加载代理对象。
  • interfaces : 被代理类实现的一些接口;
  • h : 实现了 InvocationHandler 接口的对象;

要实现动态代理的话,还必须需要实现InvocationHandler 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时候,这个方法的调用就会被转发到实现InvocationHandler 接口类的 invoke 方法来调用。

public interface InvocationHandler {

    /**
     * 当你使用代理对象调用方法的时候实际会调用到这个方法
     */
    public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}

invoke()有三个参数

  • proxy :动态生成的代理类
  • method : 与代理类对象调用的方法相对应
  • args : 当前 method 方法的参数

也就是说:通过Proxy 类的 newProxyInstance() 创建的代理对象在调用方法的时候,实际会调用到实现InvocationHandler 接口的类的 invoke()方法。 你可以在 invoke() 方法中自定义处理逻辑,比如在方法执行前后做什么事情。

JDK 动态代理类使用步骤

  • 定义一个接口及其实现类;
  • 自定义InvocationHandler并重写invoke方法,在 invoke 方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑;
  • 通过 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h) 方法创建代理对象;
public class DynamicProxy implements InvocationHandler, ICacheProxy {

    /**
     * 被代理的目标对象
     */
    private final ICache target;

    public DynamicProxy(ICache target) {
        this.target = target;
    }

    /**
     * 这种方式虽然实现了异步执行,但是存在一个缺陷:
     * 强制用户返回值为 Future 的子类。
     *
     * 如何实现不影响原来的值,要怎么实现呢?
     * @param proxy 原始对象
     * @param method 方法
     * @param args 入参
     * @return 结果
     * @throws Throwable 异常
     */
    @Override
    @SuppressWarnings("all")
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ICacheProxyBsContext context = CacheProxyBsContext.newInstance()
                .method(method).params(args).target(target);
        return CacheProxyBs.newInstance().context(context).execute();
        // execute()
        /*
        public Object execute() throws Throwable {
        long startMills = System.currentTimeMillis();
        final ICache cache = context.target();
        Object result = context.process();
        return result;
        */
        
        // process() 就是通过反射的方式调用目标对象的方法
        // this.method.invoke(target, params);
    }
        
    }

    @Override
    public Object proxy() {
        // 我们要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法的
        InvocationHandler handler = new DynamicProxy(target);

        return Proxy.newProxyInstance(handler.getClass().getClassLoader(),
                target.getClass().getInterfaces(), handler);
    }
}

cglib动态代理

JDK 动态代理有一个最致命的问题是其只能代理实现了接口的类。

CGLIB(Code Generation Library)是一个基于ASM的字节码生成库,它允许我们在运行时对字节码进行修改和动态生成。CGLIB 通过继承方式实现代理。很多知名的开源框架都使用到了CGLIB, 例如 Spring 中的 AOP 模块中:如果目标对象实现了接口,则默认采用 JDK 动态代理,否则采用 CGLIB 动态代理。

在CGLIB动态代理机制中MethodInterceptor接口和Enhancer类的核心

所以要自定义自定义 MethodInterceptor 并重写intercept方法,intercept 用于拦截增强被代理类的方法。

public interface MethodInterceptor
extends Callback{
    // 拦截被代理类中的方法
    public Object intercept(Object obj, java.lang.reflect.Method method, Object[] args,
                               MethodProxy proxy) throws Throwable;
}
  • obj :被代理的对象(需要增强的对象)
  • method :被拦截的方法(需要增强的方法)
  • args :方法入参
  • methodProxy :用于调用原始方法

你可以通过 Enhancer类来动态获取被代理类,当代理类调用方法的时候,实际调用的是 MethodInterceptor 中的 intercept 方法。

CGLIB 动态代理类使用步骤

  • 定义一个类;
  • 自定义 MethodInterceptor 并重写intercept方法,intercept 用于拦截增强被代理类的方法,和 JDK 动态代理中的 invoke 方法类似;
  • 通过 Enhancer 类的 create()创建代理类;

添加maven依赖

<dependency>
  <groupId>cglib</groupId>
  <artifactId>cglib</artifactId>
  <version>3.3.0</version>
</dependency>

代理实现

public class CglibProxy implements MethodInterceptor, ICacheProxy {

    /**
     * 被代理的对象
     */
    private final ICache target;

    public CglibProxy(ICache target) {
        this.target = target;
    }

    @Override
    public Object intercept(Object o, Method method, Object[] params, MethodProxy methodProxy) throws Throwable {
        ICacheProxyBsContext context = CacheProxyBsContext.newInstance()
                .method(method).params(params).target(target);

        return CacheProxyBs.newInstance().context(context).execute();
    }

    @Override
    public Object proxy() {
        Enhancer enhancer = new Enhancer();
        //目标对象类
        enhancer.setSuperclass(target.getClass());
        enhancer.setCallback(this);
        //通过字节码技术创建目标对象类的子类实例作为代理
        return enhancer.create();
    }

}

获取代理对象

创建一个获取代理对象的工厂类

public final class CacheProxy {

    private CacheProxy(){}

    /**
     * 获取对象代理
     * @param <K> 泛型 key
     * @param <V> 泛型 value
     * @param cache 对象代理
     * @return 代理信息
     */
    @SuppressWarnings("all")
    public static <K,V> ICache<K,V> getProxy(final ICache<K,V> cache) {
        if(ObjectUtil.isNull(cache)) {
            return (ICache<K,V>) new NoneProxy(cache).proxy();
        }

        final Class clazz = cache.getClass();

        // 如果targetClass本身是个接口或者targetClass是JDK Proxy生成的,则使用JDK动态代理。
        // 参考 spring 的 AOP 判断
        if (clazz.isInterface() || Proxy.isProxyClass(clazz)) {
            return (ICache<K,V>) new DynamicProxy(cache).proxy();
        }

        return (ICache<K,V>) new CglibProxy(cache).proxy();
    }

}

实现监听器

注解

该注解的作用域是方法

/**
 * 缓存拦截器注解
 */
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {
    /**
     * 通用拦截器
     *
     * 1. 耗时统计
     * 2. 慢日志统计
     *
     * etc.
     * @return 默认开启
     */
    boolean common() default true;
}

那么如何使用注解呢?

创建一个测试类,添加两个公用方法

public class test {

    @CacheInterceptor
    public void test1(){
        System.out.println("test1");
    }

    public void test2() {
        System.out.println("test2");
    }
}

那么如何判断方法是否加了注解

public static void main(String[] args) {
    Method[] methods = test.class.getMethods();
    for (Method method : methods) {
        System.out.println(method.getName());
        System.out.println(method.getAnnotation(CacheInterceptor.class));
    }
}

如果方法上加了@CacheInterceptor注解,那么就能看到一下的打印信息

test2
null
test1
@com.sunzy.annotation.CacheInterceptor(common=true)

所以用这种方法判断,在哪些方法执行时,需要进行相应的处理。

相关接口

ICacheInterceptorContext

为监听器执行时提供相关参数

public interface ICacheInterceptorContext<K, V>{

    /**
     * 缓存信息
     * @return 缓存信息
     */
    ICache<K,V> cache();

    /**
     * 执行的方法信息
     * @return 方法
     */
    Method method();

    /**
     * 执行的参数
     * @return 参数
     */
    Object[] params();

    /**
     * 方法执行的结果
     * @return 结果
     */
    Object result();

    /**
     * 开始时间
     * @return 时间
     */
    long startMills();

    /**
     * 结束时间
     * @return 时间
     */
    long endMills();
}

ICacheInterceptor

在执行方法前后需要执行的操作

public interface ICacheInterceptor<K,V> {

    /**
     * 方法执行之前
     * @param context 上下文
     */
    void before(ICacheInterceptorContext<K,V> context);

    /**
     * 方法执行之后
     * @param context 上下文
     */
    void after(ICacheInterceptorContext<K,V> context);

}

接口实现类

CacheInterceptorCost

执行耗时统计监听器,再添加一个慢日志阈值即可实现判断是否为慢操作

public class CacheInterceptorCost<K, V> implements ICacheInterceptor<K, V> {
    private static final Log log = LogFactory.getLog(CacheInterceptorCost.class);

    @Override
    public void before(ICacheInterceptorContext<K, V> context) {
        log.debug("Cost start, method: {}", context.method().getName());
    }

    @Override
    public void after(ICacheInterceptorContext<K, V> context) {
        long costMills = context.endMills() - context.startMills();
        String methodName = context.method().getName();
        log.debug("Cost end, method: {}, cost: {}ms", methodName, costMills);

        // 添加慢操作日志
        List<ICacheSlowListener> slowListenerList = context.cache().slowListeners();
        if (CollectionUtil.isNotEmpty(slowListenerList)){
            CacheSlowListenerContext listenerContext = CacheSlowListenerContext.newInstance()
                    .startTimeMills(context.startMills())
                    .endTimeMills(context.endMills())
                    .costTimeMills(costMills)
                    .methodName(methodName)
                    .params(context.params())
                    .result(context.result());
            for (ICacheSlowListener slowListener : slowListenerList) {
                // 超过慢日志的阈值 则认定为满操作
                if(costMills >= slowListener.slowerThanMills()){
                    slowListener.listen(listenerContext);
                }
            }
        }

    }
}

CacheInterceptorContext

监听器执行的上下文

public class CacheInterceptorContext<K,V> implements ICacheInterceptorContext<K,V> {

    private ICache<K,V> cache;

    /**
     * 执行的方法信息
     */
    private Method method;

    /**
     * 执行的参数
     */
    private Object[] params;

    /**
     * 方法执行的结果
     */
    private Object result;

    /**
     * 开始时间
     */
    private long startMills;

    /**
     * 结束时间
     */
    private long endMills;

    public static <K,V> CacheInterceptorContext<K,V> newInstance() {
        return new CacheInterceptorContext<>();
    }

    @Override
    public ICache<K, V> cache() {
        return cache;
    }

    public CacheInterceptorContext<K, V> cache(ICache<K, V> cache) {
        this.cache = cache;
        return this;
    }

    @Override
    public Method method() {
        return method;
    }

    public CacheInterceptorContext<K, V> method(Method method) {
        this.method = method;
        return this;
    }

    @Override
    public Object[] params() {
        return params;
    }

    public CacheInterceptorContext<K, V> params(Object[] params) {
        this.params = params;
        return this;
    }

    @Override
    public Object result() {
        return result;
    }

    public CacheInterceptorContext<K, V> result(Object result) {
        this.result = result;
        return this;
    }

    @Override
    public long startMills() {
        return startMills;
    }

    public CacheInterceptorContext<K, V> startMills(long startMills) {
        this.startMills = startMills;
        return this;
    }

    @Override
    public long endMills() {
        return endMills;
    }

    public CacheInterceptorContext<K, V> endMills(long endMills) {
        this.endMills = endMills;
        return this;
    }
}

CacheProxyBs

在CacheProxyBs添加相关监听器

public class CacheProxyBs {

    private CacheProxyBs(){}
    /**
     * 代理上下文
     */
    private ICacheProxyBsContext context;
    
    /**
     * 默认通用拦截器
     *
     * JDK 的泛型擦除导致这里不能使用泛型
     */
    @SuppressWarnings("all")
    private final List<ICacheInterceptor> commonInterceptors = CacheInterceptors.defaultCommonList();


    public static CacheProxyBs newInstance(){
        return new CacheProxyBs();
    }

    public CacheProxyBs context(ICacheProxyBsContext context) {
        this.context = context;
        return this;
    }

    /**
     * 该方法在每个方法执行之前会获取其执行的具体信息
     * 包括执方法名,参数,执行结果,执行结果
     * 在执行前会打印执行时间
     * 然后记录执行时间
     * 打印最后的执行时间
     * @return
     * @throws Throwable
     */
    @SuppressWarnings("all")
    public Object execute() throws Throwable {
        long startMills = System.currentTimeMillis();
        final ICache cache = context.target();
        Object result = context.process();
        return result;
    }
}

通过注解判断,需要执行什么监听器

 private void interceptorHandler(CacheInterceptor cacheInterceptor,
                                    CacheInterceptorContext interceptorContext,
                                    ICache cache,
                                    boolean before) {
     if(cacheInterceptor != null) {
         //1. 通用
         if(cacheInterceptor.common()) {
             for(ICacheInterceptor interceptor : commonInterceptors) {
                 if(before) {
                     interceptor.before(interceptorContext);
                 } else {
                     interceptor.after(interceptorContext);
                 }
             }
         }
     }
}

修改代理对象调用的方法execute()

public Object execute() throws Throwable {
    //1. 开始的时间
    final long startMills = System.currentTimeMillis();
    final ICache cache = context.target();
    CacheInterceptorContext interceptorContext = CacheInterceptorContext.newInstance()
        .startMills(startMills)
        .method(context.method())
        .params(context.params())
        .cache(context.target());

    //1. 获取刷新注解信息
    CacheInterceptor cacheInterceptor = context.interceptor();
    this.interceptorHandler(cacheInterceptor, interceptorContext, cache, true);

    //2. 正常执行
    Object result = context.process();

    final long endMills = System.currentTimeMillis();
    interceptorContext.endMills(endMills).result(result);

    // 方法执行完成
    this.interceptorHandler(cacheInterceptor, interceptorContext, cache, false);
    return result;
}

CacheBs

修改引导类对象中的build()方法

public ICache<K,V> build() {
    Cache<K,V> cache = new Cache<>();
    cache.map(map);
    cache.evict(evict);
    cache.sizeLimit(size);

    // 初始化
    cache.init();
    // 创建代理对象
    return CacheProxy.getProxy(cache);
}

测试

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .build();
cache.put("1", "1");
cache.put("2", "2");

cache.expire("1", 10);
TimeUnit.MILLISECONDS.sleep(50);

System.out.println(cache.keySet());

持久化

redis的持久化有两种RDB和AOF,由于项目中这两种方案的实现比较简陋,就不在此多介绍了

CachePresist

实现一个基于json的持久化,创建一个CachePresistDbJson实现ICachePresist接口,该类中需要有一个成员变量用于获取保存json文件的地址

public class CachePersistDbJson<K,V> implements ICachePersist<K,V> {  
  
    /**  
     * 数据库路径  
     * @since 0.0.8  
     */  
    private final String dbPath;  
  
    public CachePersistDbJson(String dbPath) {  
        this.dbPath = dbPath;  
    }  
  
    /**  
     * 持久化  
     * key长度 key+value  
     * 第一个空格,获取 key 的长度,然后截取  
     * @param cache 缓存  
     */  
    @Override  
    public void persist(ICache<K, V> cache) {  
        Set<Map.Entry<K,V>> entrySet = cache.entrySet();  
  
        // 创建文件  
        FileUtil.createFile(dbPath);  
        // 清空文件  
        FileUtil.truncate(dbPath);  
  
        for(Map.Entry<K,V> entry : entrySet) {  
            K key = entry.getKey();  
            Long expireTime = cache.expire().expireTime(key);  
            PersistEntry<K,V> persistEntry = new PersistEntry<>();  
            persistEntry.setKey(key);  
            persistEntry.setValue(entry.getValue());  
            persistEntry.setExpire(expireTime);  
  
            String line = JSON.toJSONString(persistEntry);  
            FileUtil.write(dbPath, line, StandardOpenOption.APPEND);  
        }  
    }  
  
}

定时执行

有了持久化策略,需要有触发方式,这里采用定时执行的方式

public class InnerCachePersist<K,V> {  
  
    private static final Log log = LogFactory.getLog(InnerCachePersist.class);  
  
    /**  
     * 缓存信息  
     * @since 0.0.8  
     */  
    private final ICache<K,V> cache;  
  
    /**  
     * 缓存持久化策略  
     * @since 0.0.8  
     */  
    private final ICachePersist<K,V> persist;  
  
    /**  
     * 线程执行类  
     * @since 0.0.3  
     */  
    private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();  
  
    public InnerCachePersist(ICache<K, V> cache, ICachePersist<K, V> persist) {  
        this.cache = cache;  
        this.persist = persist;  
  
        // 初始化  
        this.init();  
    }  
  
    /**  
     * 初始化  
     * @since 0.0.8  
     */  
    private void init() {  
        EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {  
            @Override  
            public void run() {  
                try {  
                    log.info("开始持久化缓存信息");  
                    persist.persist(cache);  
                    log.info("完成持久化缓存信息");  
                } catch (Exception exception) {  
                    log.error("文件持久化异常", exception);  
                }  
            }  
        }, 0, 1, TimeUnit.MINUTES);  
    }  
  
}

这里需要注意是定时任务的实现方式,每隔一分钟执行一次

本地数据加载

有了持久化策略,还要有加载策略。将持久化的json文件内容解析并存入cache中。实现过程也比较简单

public class CacheLoadJSON<K, V> implements ICacheLoad<K, V> {  
  
    private static final Log log = LogFactory.getLog(CacheLoadJSON.class);  
  
    private String dbPath;  
  
    public CacheLoadJSON(String dbPath) {  
        this.dbPath = dbPath;  
    }  
  
    @Override  
    public void load(ICache<K, V> cache) {  
  
        List<String> lines = FileUtil.readAllLines(dbPath);  
        log.info("[load] 开始处理 path: {}", dbPath);  
        if(CollectionUtil.isEmpty(lines)){  
            log.info("[load] path: {} 文件内容为空,直接返回", dbPath);  
            return;  
        }  
  
        for (String line : lines) {  
            if(StringUtil.isEmpty(line)){  
                continue;  
            }  
  
            // 反序列化 将文件中的数据保存到cache中  
            PersistEntry<K, V> persistEntry = JSON.parseObject(line, PersistEntry.class);  
            K key = persistEntry.getKey();  
            V value = persistEntry.getValue();  
            Long expireTime = persistEntry.getExpireTime();  
  
            cache.put(key, value);  
            if(!ObjectUtil.isEmpty(expireTime)){  
                cache.expireAt(key, expireTime);  
            }  
        }  
    }  
}

这里只完成了类似于redis的rdb持久化,其中可优化的点还有很多,比如rdb文件的压缩、格式的定义 、CRC校验等

数据删除监听器

当 有数据被从cache中删除时,在控制台中打印出详细信息,这就涉及到两种删除操作

  • 当cache容量已满,数据被淘汰
  • 设置过期时间的key,到达过期时间

所以需要针对以上两种添加监听器

相关接口

ICacheRemoveListener

将监听器抽象成一个接口,方便后续添加不同的监听器

public interface ICacheRemoveListener<K,V> {

    /**
     * 监听
     * @param context 上下文
     */
    void listen(final ICacheRemoveListenerContext<K,V> context);

}

ICacheRemoveListenerContext

监听器执行时需要的参数

public interface ICacheRemoveListenerContext<K,V> {

    /**
     * 清空的 key
     * @return key
     */
    K key();

    /**
     * 值
     * @return
     */
    V value();

    /**
     * 删除类型
     * @return 类型
     */
    String type();

}

接口实现

CacheRemoveListener

移除操作的实现比较简单,当有key被删除时,在控制台中打印出本次操作的key,value以及移除的类型(evict,expire),所以还需要一个枚举类,来表示操作的类型。

public class CacheRemoveListener<K,V> implements ICacheRemoveListener<K,V> {

    private static final Log log = LogFactory.getLog(CacheRemoveListener.class);

    @Override
    public void listen(ICacheRemoveListenerContext<K, V> context) {
        log.debug("Remove key: {}, value: {}, type: {}",
                context.key(), context.value(), context.type());
    }

}

CacheRemoveListenerContext

CacheRemoveListenerContext该类的实现比较简单,就是给成员变量赋值即可,并且用到了单例模式

public class CacheRemoveListenerContext<K,V> implements ICacheRemoveListenerContext<K,V> {

    /**
     * key
     */
    private K key;

    /**
     * 值
     */
    private V value;

    /**
     * 删除类型
     */
    private String type;

    /**
     * 新建实例
     * @param <K> key
     * @param <V> value
     * @since 0.0.6
     */
    public static <K,V> CacheRemoveListenerContext<K,V> newInstance() {
        return new CacheRemoveListenerContext<>();
    }

    @Override
    public K key() {
        return key;
    }

    public CacheRemoveListenerContext<K, V> key(K key) {
        this.key = key;
        return this;
    }

    @Override
    public V value() {
        return value;
    }

    public CacheRemoveListenerContext<K, V> value(V value) {
        this.value = value;
        return this;
    }

    @Override
    public String type() {
        return type;
    }

    public CacheRemoveListenerContext<K, V> type(String type) {
        this.type = type;
        return this;
    }
}

CacheRemoveType

表示移除类型的枚举类

public enum CacheRemoveType {
    EXPIRE("expire", "过期"),
    EVICT("evict", "淘汰"),
    ;

    private final String code;

    private final String desc;


    CacheRemoveType(String code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public String code() {
        return code;
    }

    public String desc() {
        return desc;
    }

    @Override
    public String toString() {
        return "CacheRemoveType{" +
                "code='" + code + '\'' +
                ", desc='" + desc + '\'' +
                '}';
    }

}

因为同类型的监听器可能有多种,所以需要创建一个获取该类型监听器的工厂类

CacheRemoveListeners

public class CacheRemoveListeners {

    private CacheRemoveListeners(){}

    /**
     * 默认监听类
     * @return 监听类列表
     * @param <K> key
     * @param <V> value
     */
    @SuppressWarnings("all")
    public static <K,V> List<ICacheRemoveListener<K,V>> defaults() {
        List<ICacheRemoveListener<K,V>> listeners = new ArrayList<>();
        listeners.add(new CacheRemoveListener());
        return listeners;
    }

}

所有准备工作都做完后,接下来就是将监听器加入到cache中

在cache和cacheBs中添加成员变量

cache中代码

/**
 * 删除监听类
 */
private List<ICacheRemoveListener<K,V>> removeListeners;

@Override
public List<ICacheRemoveListener<K, V>> removeListeners() {
    return removeListeners;
}

public Cache<K, V> removeListeners(List<ICacheRemoveListener<K, V>> removeListeners) {
    this.removeListeners = removeListeners;
    return this;
}

cacheBs中的代码

/**
* 删除监听类
*/
private final List<ICacheRemoveListener<K,V>> removeListeners = CacheRemoveListeners.defaults();

// 同时添加对应的构造方法,这样用户可以自定义对应的监听器
public CacheBs<K, V> addRemoveListener(ICacheRemoveListener<K,V> removeListener) {
    ArgUtil.notNull(removeListener, "removeListener");

    this.removeListeners.add(removeListener);
    return this;
}
public ICache<K,V> build() {
    Cache<K,V> cache = new Cache<>();
    cache.map(map);
    cache.evict(evict);
    cache.sizeLimit(size);
    cache.removeListeners(removeListeners);
    // 初始化
    cache.init();
    return CacheProxy.getProxy(cache);
}

执行监听器

有两个位置需要执行删除监听器

  • key过期
  • key淘汰

key过期

首先修改key过期的代码,上述的CacheExpire和CacheExpireSort中是执行过期淘汰的类,其中expireKey()方法执行真正的淘汰的操作,所以就是在这里加入监听器

private void expireKey(final K key, final Long expireAt) {
    if(expireAt == null) {
        return;
    }

    long currentTime = System.currentTimeMillis();
    if(currentTime >= expireAt) {
        expireMap.remove(key);
        // 再移除缓存,后续可以通过惰性删除做补偿
        V removeValue = cache.remove(key);

        // 执行淘汰监听器
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(removeValue).type(CacheRemoveType.EXPIRE.code());
        for(ICacheRemoveListener<K,V> listener : cache.removeListeners()) {
            listener.listen(removeListenerContext);
        }
    }
}

key淘汰

对于第二种情况,也可以采用相同的方法,在CacheEvictFIFO中的evict()方法中执行监听器

@Override
public void evict(ICacheEvictContext<K, V> context) {
    final ICache<K,V> cache = context.cache();
    // 超过限制,执行移除
    if(cache.size() >= context.size()) {
        K evictKey = queue.remove();
        // 移除最开始的元素
        cache.remove(evictKey);
        // 执行淘汰监听器
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(removeValue).type(CacheRemoveType.EVICT.code());
        for(ICacheRemoveListener<K,V> listener : cache.removeListeners()) {
            listener.listen(removeListenerContext);
        }
    }

    // 将新加的元素放入队尾
    final K key = context.key();
    queue.add(key);
}

此外还有一种方法,修改cache中的put()方法,因为在添加元素之前,都会执行一次淘汰操作就是evict(),那么可以将被淘汰的元素作为返回值返回,在put()方法中执行监听器。所以需要创建一个接收淘汰元素的类

public class CacheEntry<K,V> implements ICacheEntry<K,V> {

    /**
     * key
     */
    private final K key;

    /**
     * value
     */
    private final V value;

    /**
     * 新建元素
     * @param key key
     * @param value value
     * @param <K> 泛型
     * @param <V> 泛型
     * @return 结果
     */
    public static <K,V> CacheEntry<K,V> of(final K key,
                                           final V value) {
        return new CacheEntry<>(key, value);
    }

    public CacheEntry(K key, V value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public K key() {
        return key;
    }

    @Override
    public V value() {
        return value;
    }

    @Override
    public String toString() {
        return "EvictEntry{" +
                "key=" + key +
                ", value=" + value +
                '}';
    }

}

put()中接收被淘汰的元素信息

public V put(K key, V value) {
    //1.1 尝试驱除
    CacheEvictContext<K,V> context = new CacheEvictContext<>();
    context.key(key).size(sizeLimit).cache(this);

    ICacheEntry<K,V> evictEntry = evict.evict(context);

    // 添加拦截器调用
    if(ObjectUtil.isNotNull(evictEntry)) {
        // 执行淘汰监听器
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(evictEntry.key())
            .value(evictEntry.value())
            .type(CacheRemoveType.EVICT.code());
        for(ICacheRemoveListener<K,V> listener : context.cache().removeListeners()) {
            listener.listen(removeListenerContext);
        }
    }

    //2. 判断驱除后的信息
    if(isSizeLimit()) {
        throw new CacheRuntimeException("当前队列已满,数据添加失败!");
    }

    //3. 执行添加
    return map.put(key, value);
}

同样也能实现对删除操作的监听,这样做的好处是,后续添加不同的淘汰策略时,不用在每一种策略中添加执行监听器的代码,减少了代码的冗余。

测试

public class MyRemoveListener<K,V> implements ICacheRemoveListener<K,V> {

    @Override
    public void listen(ICacheRemoveListenerContext<K, V> context) {
        System.out.println("【删除提示】:" + context.key());
    }

}



@Test
public void cacheRemoveListenerTest() {
    ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(1)
        .addRemoveListener(new MyRemoveListener<String, String>()) // 可以根据需要添加不同的监听器
        .build();

    cache.put("1", "1");
    cache.put("2", "2");
}
[DEBUG] [2023-05-26 11:27:03.186] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put
[DEBUG] [2023-05-26 11:27:03.208] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 34ms
[DEBUG] [2023-05-26 11:27:03.208] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put
0
[DEBUG] [2023-05-26 11:27:03.250] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: evict
【删除提示】:1
[DEBUG] [2023-05-26 11:27:03.250] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 42ms

LRU 缓存淘汰策略

LRU 是由 Least Recently Used 的首字母组成,表示最近最少使用的含义,一般使用在对象淘汰算法上。

也是比较常见的一种淘汰算法。

其核心思想是如果数据最近被访问过,那么将来被访问的几率也更高

以上的理论是基于连续性准则。

  • 时间连续性

    对于信息的访问,最近被访问过,再次被访问的可能性会很高。LRU就是基于这个理念

  • 空间连续性

    对于磁盘信息的访问,将很有访问连续的空间信息。所有会有page预取来提升性能

java的实现步骤

  • 新数据插入到链表头部
  • 每当缓存命中(即缓存数据被访问),就将数据移到链表的头部
  • 当链表满的时候,将链表尾部的数据丢弃

思考

1.如何判断是新数据?

每当缓存命中(即缓存数据被访问),则将数据移到链表头部;

2.什么是缓存命中

  • put(key, value)的情况,就是新元素。如果已经有这个元素,可以先删除,再加入
  • get(key)的情况,对于元素的访问,删除已有的元素,将新元素再放到链表的头部
  • remove(key)移除一个元素,直接删除已有的元素
  • keySet() valueSet() entrySet() 这些属于无差别访问,我们不对队列做调整。

3.移除

当链表满的时候,将链表尾部的数据丢弃。

链表满只有一种场景,那就是添加元素的时候,也就是执行 put(key, value) 的时候。

直接删除对应的 key 即可。

相关接口

和FIFO的接口保持一致,调用地方不变

为了后续的LRU实现,新增remove/update两个方法

public interface ICacheEvict<K, V> {

    /**
     * 驱除策略
     *
     * @param context 上下文
     * @return 是否执行驱除
     */
    boolean evict(final ICacheEvictContext<K, V> context);

    /**
     * 更新 key 信息
     * @param key key
     */
    void update(final K key);

    /**
     * 删除 key 信息
     * @param key key
     */
    void remove(final K key);

}

代码实现

CacheEvictLRU

直接基于 LinkedList 实现:

/**
 * 丢弃策略-LRU 最近最少使用
 */
public class CacheEvictLRU<K,V> implements ICacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLRU.class);

    /**
     * list 信息
     */
    private final List<K> list = new LinkedList<>();

    @Override
    public boolean evict(ICacheEvictContext<K, V> context) {
        boolean result = false;
        final ICache<K,V> cache = context.cache();
        // 超过限制,移除队尾的元素
        if(cache.size() >= context.size()) {
            K evictKey = list.get(list.size()-1);
            // 移除对应的元素
            cache.remove(evictKey);
            result = true;
        }
        return result;
    }


    /**
     * 放入元素
     * (1)删除已经存在的
     * (2)新元素放到元素头部
     *
     * @param key 元素
     */
    @Override
    public void update(final K key) {
        this.list.remove(key);
        this.list.add(0, key);
    }

    /**
     * 移除元素
     * @param key 元素
     */
    @Override
    public void remove(final K key) {
        this.list.remove(key);
    }

}

实现的比较简单,相对FiFO多了三个方法

  • update(): 做了一点简化,认为只要是访问就是删除,然后插入到队首
  • remove():删除就是直接从链表中删除

缺点:

插入数据或者获取数据时需要扫描整个链表,每次确认元素是否存在,都要消耗O(n)的时间复杂度查询。

CacheEvictLruDoubleListMap

基于双向链表和Hash表改进需要扫描链表的缺陷,配合hash表,将数据和链表的节点形成映射,将插入操作和读取操作的时间复杂度从O(n)降至O(1)

  • 双向链表保持不变
  • Map中key对应的值放双向链表的节点信息

双向链表的节点定义

public class DoubleListNode<K, V>{

    private K key;

    private V value;

    private DoubleListNode<K, V> pre;

    private DoubleListNode<K, V> next;

    public K key() {
        return key;
    }

    public DoubleListNode<K, V> key(K key) {
        this.key = key;
        return this;
    }

    public V value() {
        return value;
    }

    public DoubleListNode<K, V> value(V value) {
        this.value = value;
        return this;
    }

    public DoubleListNode<K, V> pre() {
        return pre;
    }

    public DoubleListNode<K, V> pre(DoubleListNode<K, V> pre) {
        this.pre = pre;
        return this;
    }

    public DoubleListNode<K, V> next() {
        return next;
    }

    public DoubleListNode<K, V> next(DoubleListNode<K, V> next) {
        this.next = next;
        return this;
    }
}

LRU缓存实现

public class CacheEvictLruDoubleListMap<K, V> extends AbstractCacheEvict<K, V>{

    private static final Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class);

    /**
     * 头节点
     */
    private DoubleListNode<K, V> head;

    /**
     * 尾节点
     */
    private DoubleListNode<K, V> tail;

    /**
     * map 信息
     *
     * key: 元素信息
     * value: 元素在 list 中对应的节点信息
     */
    private Map<K, DoubleListNode<K, V>> indexMap;

    public CacheEvictLruDoubleListMap() {
        this.indexMap = new HashMap<>();
        this.head = new DoubleListNode<>();
        this.tail = new DoubleListNode<>();

        this.head.next(this.tail);
        this.tail.pre(this.head);
    }

    @Override
    protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
        ICacheEntry<K, V> result = null;
        ICache<K, V> cache = context.cache();
        // 超出限制 移除队尾元素
        if(cache.size() >= context.size()){
            // 获取队尾元素的前一个元素
            DoubleListNode<K, V> tailPre = this.tail.pre();
            if(tailPre == this.head){
                log.error("当前列表为空,无法进行删除");
                throw new RuntimeException("不可删除头结点");
            }

            K evictKey = tailPre.key();
            V removeValue = cache.remove(evictKey);
            // 同时删除双向链表中的元素
            removeKey(evictKey);
            result = new CacheEntry<>(evictKey, removeValue);
        }

        return result;
    }

    /**
     * 放入元素
     *  (1) 删除已经存在的元素
     *  (2) 新元素放到头部
     * @param key
     */
    @Override
    public void updateKey(K key) {
        // 先删除元素
        this.removeKey(key);
        // 给添加的key创建一个节点
        DoubleListNode<K, V> newNode = new DoubleListNode<>();
        newNode.key(key);

        // 获取head的next节点
        DoubleListNode<K, V> next = this.head.next();
        // head的next指向新节点
        this.head.next(newNode);
        // 新节点的pre指向head
        newNode.pre(this.head);
        // next的pre指向新节点
        next.pre(newNode);
        // 新的节点的next指向next
        newNode.next(next);

        //将节点信息插入到map中
        indexMap.put(key, newNode);

    }

    /**
     * 移除元素
     *
     * 1. 获取 map 中的元素
     * 2. 不存在直接返回,存在执行以下步骤:
     * 2.1 删除双向链表中的元素
     * 2.2 删除 map 中的元素
     *
     * @param key
     */
    @Override
    public void removeKey(K key) {
        DoubleListNode<K, V> node = indexMap.get(key);
        if(ObjectUtil.isEmpty(node)){
            return;
        }

        DoubleListNode<K, V> pre = node.pre();
        DoubleListNode<K, V> next = node.next();

        // 从链表中摘除该节点
        pre.next(next);
        next.pre(pre);

        // 删除map中node节点
        this.indexMap.remove(key);
    }
}

这样就实现了获取节点的时间复杂度为O(1)

基于LinkedHashMap

LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现

public class LRUCache extends LinkedHashMap {

    private int capacity;

    public LRUCache(int capacity) {
        // 注意这里将LinkedHashMap的accessOrder设为true
        super(16, 0.75f, true);
        this.capacity = capacity;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry eldest) {
        return super.size() >= capacity;
    }
}

项目中的实现稍微复杂一些

public class CacheEvictLruLinkedHashMap<K, V> extends LinkedHashMap<K,V> implements ICacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLruLinkedHashMap.class);


    /**
     * 是否移除标识
     */
    private volatile boolean removeFlag = false;

    /**
     * 最旧的一个元素
     */
    private transient Map.Entry<K, V> eldest = null;

    public CacheEvictLruLinkedHashMap() {
        super(16, 0.75f, true);
    }

    @Override
    public ICacheEntry<K, V> evict(ICacheEvictContext<K, V> context) {
        ICacheEntry<K, V> result = null;
        final ICache<K,V> cache = context.cache();
        // 超过限制,移除队尾的元素
        if(cache.size() >= context.size()) {
            removeFlag = true;

            // 执行 put 操作
            super.put(context.key(), null);

            // 构建淘汰的元素
            K evictKey = eldest.getKey();
            // 从缓存中移除
            V evictValue = cache.remove(evictKey);
            result = new CacheEntry<>(evictKey, evictValue);
        } else {
            removeFlag = false;
        }
        return result;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        this.eldest = eldest;
        return removeFlag;
    }

    @Override
    public void updateKey(K key) {
        super.put(key, null);
    }


    @Override
    public void removeKey(K key) {
        super.remove(key);
    }
}

LRU-2

当存在热点数据时,LRU的效率很好,但是批量操作会导致LRU的命中率急剧下降,缓存污染的情况比较严重

LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。

LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”

相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。

当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。

数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;

当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;

缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。

LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择

public class CacheEvictLru2<K, V> extends AbstractCacheEvict<K, V> {

    private static Log log = LogFactory.getLog(CacheEvictLru2.class);

    /**
     * 访问一次的key
     */
    private LruMapDoubleList<K, V> firstLruMap;

    /**
     * 访问一次以上的key
     */
    private LruMapDoubleList<K, V> moreLruMap;

    public CacheEvictLru2() {
        this.firstLruMap = new LruMapDoubleList<>();
        this.moreLruMap = new LruMapDoubleList<>();
    }


    @Override
    protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
        ICacheEntry<K, V> result = null;
        ICache<K, V> cache = context.cache();
        if(cache.size() >= context.size()){
            ICacheEntry<K, V> evictEntry = null;
            if(!firstLruMap.isEmpty()){
                evictEntry = firstLruMap.removeEldest();
                log.debug("从 firstLruMap 中淘汰数据:{}", evictEntry.key());
            } else {
                evictEntry = moreLruMap.removeEldest();
                log.debug("从 moreLruMap 中淘汰数据:{}", evictEntry.key());
            }
            // 缓存移除
            final K evictKey = evictEntry.key();
            V evictValue =  cache.remove(evictKey);
            result = new CacheEntry<>(evictKey, evictValue);
        }
        return result;
    }

    @Override
    public void updateKey(K key) {
        if(moreLruMap.contains(key) || firstLruMap.contains(key)) {
            this.removeKey(key);
            moreLruMap.updateKey(key);
            log.debug("key: {} 多次访问,加入到 moreLruMap 中", key);
        } else {
            firstLruMap.updateKey(key);
            log.debug("key: {} 第一次访问,加入到 firstLruMap 中", key);
        }
    }

    @Override
    public void removeKey(K key) {
        if(moreLruMap.contains(key)){
            moreLruMap.removeKey(key);
            log.debug("key: {} 从 moreLruMap 中移除", key);
        } else {
            firstLruMap.removeKey(key);
            log.debug("key: {} 从 firstLruMap 中移除", key);
        }
    }
}

在doEvict中,当需要淘汰数据时,优先从firstLruMap中删除数据,如果为空则从moreLruMap中删除数据

在removeKey()中,如果key在moreLruMap中,则从moreLruMap中删除,否则从firstLruMap中删除

在UpdateKey()中

  • 如果moreLruMap中已经存在key,则先删除key,再插入key
  • 如果firstLruMap中已经存在,则处理firstLruMap队列,先从firstLruMap中删除,插入到moreLruMap队列中
  • 如果key不在firstLruMap和moreLruMap中,说明是新元素,直接插入到firstLruMap的开始即可

小结

对于LRU算法的改进主要有两点

  • 性能的改进,从O(N)优化到了O(1)
  • 批量操作的改进,避免了缓存污染

LFU

LFU是最近最不常用,是一种基于访问频次的一种算法,所以需要额外的空间存储数据访问的次数。

核心思想

如果一个数据在最近一段时间内使用次数很少,那么在将来一段时间被使用的可能性也很小

O(N) 的删除

为了能够淘汰最少使用的数据,个人第一直觉就是直接一个 HashMap<String, Interger>, String 对应 key 信息,Integer 对应次数。

每次访问到就去+1,设置和读取的时间复杂度都是 O(1);不过删除就比较麻烦了,需要全部遍历对比,时间复杂度为 O(n);

O(logn) 的删除

另外还有一种实现思路就是利用小顶堆+hashmap,小顶堆插入、删除操作都能达到O(logn)时间复杂度,因此效率相比第一种实现方法更加高效。比如 TreeMap。

O(1) 的删除

CacheInterceptor

方法已经写好,但是什么方法需要调用,如何调用?

这里还是基于注解的实现

/**
 * 是否执行驱除更新
 *
 * 主要用于 LRU/LFU 等驱除策略
 * @return 是否
 */
boolean evict() default false;

cache

上面思考中,什么是缓存命中,说明了哪些方法需要添加注释

  • get()
  • containsKey()
  • put()
  • remove()
@Override
@CacheInterceptor(evict = true)
public boolean containsKey(Object key) {
    return map.containsKey(key);
}

@Override
@CacheInterceptor(evict = true)
@SuppressWarnings("unchecked")
public V get(Object key) {
    //1. 刷新所有过期信息
    K genericKey = (K) key;
    this.expire.refreshExpire(Collections.singletonList(genericKey));
    return map.get(key);
}

@Override
@CacheInterceptor(aof = true, evict = true)
public V put(K key, V value) {
    //...
}

@Override
@CacheInterceptor(aof = true, evict = true)
public V remove(Object key) {
    return map.remove(key);
}

注解驱逐拦截器实现

执行顺序:放在方法之后更新,不然每次当前操作的 key 都会被放在最前面。

public class CacheInterceptorEvict<K,V> implements ICacheInterceptor<K, V> {

    private static final Log log = LogFactory.getLog(CacheInterceptorEvict.class);

    @Override
    public void before(ICacheInterceptorContext<K,V> context) {
    }

    @Override
    @SuppressWarnings("all")
    public void after(ICacheInterceptorContext<K,V> context) {
        ICacheEvict<K,V> evict = context.cache().evict();

        Method method = context.method();
        final K key = (K) context.params()[0];
        if("remove".equals(method.getName())) {
            evict.remove(key);
        } else {
            evict.update(key);
        }
    }

}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

 目录

Copyright © 2020 my blog
载入天数... 载入时分秒...