MyCache
本文最后更新于:1 分钟前
使用Map开发一个类似于redis的缓存工具
实现固定缓存
相关接口
为了方便后续扩展首先定义接口ICache
继承于Map
public interface ICache<K, V> extends Map<K, V> {
}
因为缓存的大小有限,所以不能无限制的像cache(也就是Map集合)中添加元素 ,当到达容量上限时,需要从cache中淘汰元素,所以需要添加驱逐策略。
public interface ICacheEvict<K, V> {
/**
* 驱除策略
*
* @param context 上下文
* @since 0.0.2
* @return 被移除的明细,没有时返回 null
*/
evict(final ICacheEvictContext<K, V> context);
}
其中ICacheEvictContext
,包含了缓存对象,缓存大小限制,以及需要添加到缓存中的key
/**
* 缓存驱逐上下文
* @param <K>
* @param <V>
*/
public interface ICacheEvictContext<K, V> {
/**
* 待缓存的key
* @return
*/
K key();
/**
* 缓存对象
* @return
*/
ICache<K, V > cache();
/**
* 缓存大小
* @return
*/
int size();
}
代码实现
CacheEvictContext
该类的实现比较简单,实现对单个成员变量的初始化,并返回该对象本身,便于实现fluent流式编程
public class CacheEvictContext<K,V> implements ICacheEvictContext<K, V> {
/**
* 新加的 key
*/
private K key;
/**
* cache 实现
*/
private ICache<K,V> cache;
/**
* 最大的大小
*/
private int size;
@Override
public K key() {
return key;
}
public CacheEvictContext<K, V> key(K key) {
this.key = key;
return this;
}
@Override
public ICache<K, V> cache() {
return cache;
}
public CacheEvictContext<K, V> cache(ICache<K, V> cache) {
this.cache = cache;
return this;
}
@Override
public int size() {
return size;
}
public CacheEvictContext<K, V> size(int size) {
this.size = size;
return this;
}
}
淘汰策略实现cacheEvict.evict(context);
如下
CacheEvictFIFO
淘汰策略可以有多种,比如 LRU/LFU/FIFO 等等,我们此处实现一个最基本的 FIFO。
拥有FIFO特性的数据结构就是queue的,那么就用java中LinkedList
集合实现该方法。
public class CacheEvictFIFO<K,V> implements ICacheEvict<K,V> {
/**
* queue按添加顺序保存key信息
*/
private Queue<K> queue = new LinkedList<>();
@Override
public void evict(ICacheEvictContext<K, V> context) {
final ICache<K,V> cache = context.cache();
// 超过限制,执行移除
if(cache.size() >= context.size()) {
K evictKey = queue.remove();
// 移除最开始的元素
cache.remove(evictKey);
}
// 将新加的元素放入队尾
final K key = context.key();
queue.add(key);
}
}
cache
核心的cache类,向其中添加一些关键属性
public class Cache<K,V> implements ICache<K,V> {
/**
* map信息
*/
private Map<K,V> map;
/**
* 缓存大小限制
*/
private int sizeLimit;
/**
* 驱除策略
*/
private ICacheEvict<K,V> evict;
public Cache(Map<K, V> map, int sizeLimit, ICacheEvict<K, V> evict) {
this.map = map;
this.sizeLimit = sizeLimit;
this.evict = evict;
}
// Override Map method
}
一些方法的重写没有在此贴出,使用快捷键补全后,调用map参数对应的方法即可。
对于put方法的改动比较大
- 驱逐旧元素
- 添加新元素
@Override
public V put(K key, V value) {
//1.1 尝试驱除
CacheEvictContext<K,V> context = new CacheEvictContext<>();
context.key(key).size(sizeLimit).cache(this);
cacheEvict.evict(context);
//2. 判断驱除后的信息
if(isSizeLimit()) {
throw new CacheRuntimeException("当前队列已满,数据添加失败!");
}
//3. 执行添加
return map.put(key, value);
}
private boolean isSizeLimit() {
final int currentSize = this.size();
return currentSize >= this.sizeLimit;
}
可以让用户动态指定大小,但是指定大小肯就要有对应的淘汰策略。否则,固定大小的 map 肯定无法放入元素。
CacheBs
为了方便用户创建客户端,可以创建一个引导类,这里使用到了fluent流式写法。
/**
* 缓存引导类
*/
public final class CacheBs<K,V> {
private CacheBs(){}
/**
* 创建对象实例
* @param <K> key
* @param <V> value
* @return this
*/
public static <K,V> CacheBs<K,V> newInstance() {
return new CacheBs<>();
}
/**
* map 实现
*/
private Map<K,V> map = new HashMap<>();
/**
* 大小限制
*/
private int size = Integer.MAX_VALUE;
/**
* 驱除策略
*/
private ICacheEvict<K,V> evict = CacheEvicts.fifo();
/**
* map 实现
* @param map map
* @return this
*/
public CacheBs<K, V> map(Map<K, V> map) {
ArgUtil.notNull(map, "map");
this.map = map;
return this;
}
/**
* 设置 size 信息
* @param size size
* @return this
*/
public CacheBs<K, V> size(int size) {
ArgUtil.notNegative(size, "size");
this.size = size;
return this;
}
/**
* 设置驱除策略
* @param evict 驱除策略
* @return this
*/
public CacheBs<K, V> evict(ICacheEvict<K, V> evict) {
this.evict = evict;
return this;
}
/**
* 构建缓存信息
* @return 缓存信息
*/
public ICache<K,V> build() {
CacheContext<K,V> context = new CacheContext<>();
context.cacheEvict(evict);
context.map(map);
context.size(size);
return new Cache<>(context);
}
}
测试
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(2)
.build();
cache.put("1", "1");
cache.put("2", "2");
cache.put("3", "3");
cache.put("4", "4");
System.out.println(cache.keySet());
默认为先进先出的策略,此时输出 keys,内容如下:
[3, 4]
实现key过期
redis中可以设置key的过期时间,这是一个非常有用的功能,比如将短信的验证码设置5分钟的过期时间;登录凭证设置一天有效等,这些场景都需要使用。
相关接口
ICache
首先在ICache中添加两个方法
expire()
多久后过期expireAt()
在什么时间过期
public interface ICache<K, V> extends Map<K, V> {
/**
* 设置过期时间
* (1)如果 key 不存在,则什么都不做。
* (2)暂时不提供新建 key 指定过期时间的方式,会破坏原来的方法。
*
* 会做什么:
* 类似于 redis
* (1)惰性删除。
* 在执行下面的方法时,如果过期则进行删除。
* {@link ICache#get(Object)} 获取
* {@link ICache#values()} 获取所有值
* {@link ICache#entrySet()} 获取所有明细
*
* 【数据的不一致性】
* 调用其他方法,可能得到的不是使用者的预期结果,因为此时的 expire 信息可能没有被及时更新。
* 比如
* {@link ICache#isEmpty()} 是否为空
* {@link ICache#size()} 当前大小
* 同时会导致以 size() 作为过期条件的问题。
*
* 解决方案:考虑添加 refresh 等方法,暂时不做一致性的考虑。
* 对于实际的使用,我们更关心 K/V 的信息。
*
* (2)定时删除
* 启动一个定时任务。每次随机选择指定大小的 key 进行是否过期判断。
* 类似于 redis,为了简化,可以考虑设定超时时间,频率与超时时间成反比。
*
* 其他拓展性考虑:
* 后期考虑提供原子性操作,保证事务性。暂时不做考虑。
* 此处默认使用 TTL 作为比较的基准,暂时不想支持 LastAccessTime 的淘汰策略。会增加复杂度。
* 如果增加 lastAccessTime 过期,本方法可以不做修改。
*
* @param key key
* @param timeInMills 毫秒时间之后过期
* @return this
*/
ICache<K, V> expire(final K key, final long timeInMills);
/**
* 在指定的时间过期
* @param key key
* @param timeInMills 时间戳
* @return this
*/
ICache<K, V> expireAt(final K key, final long timeInMills);
}
ICacheExpire
定义缓存过期的处理结构
public interface ICacheExpire<K,V> {
/**
* 指定过期信息
* @param key key
* @param expireAt 什么时候过期
*/
void expire(final K key, final long expireAt);
/**
* 惰性删除中需要处理的 keys
* @param keyList keys
*/
void refreshExpire(final Collection<K> keyList);
}
代码实现
CacheExpire
实现定期删除过期key的思路,开一个线程定时执行,从cache种删除过期的key
- 需要创建一个集合保存key于expiretime之间的关系
- 创建一个线程用于清除过期的key
private final Map<K, Long> expireMap = new HashMap<>();
@Override
public void expire(K key, long expireAt) {
expireMap.put(key, expireAt);
}
为了防止每次清理的时间占用过长时间,这里限制每次最多清理100
/**
* 单次清空的数量限制
*/
private static final int LIMIT = 100;
/**
* 缓存实现
*/
private final ICache<K,V> cache;
/**
* 线程执行类
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpire(ICache<K, V> cache) {
this.cache = cache;
this.init();
}
/**
* 初始化任务
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100, 100, TimeUnit.MILLISECONDS);
}
创建清理过期key的线程类ExpireThread
此类作为CacheExpire的内部了
private class ExpireThread implements Runnable {
@Override
public void run() {
//1.判断是否为空
if(MapUtil.isEmpty(expireMap)) {
return;
}
//2. 获取 key 进行处理
int count = 0;
for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
if(count >= LIMIT) {
return;
}
expireKey(entry);
count++;
}
}
}
执行清除的函数
/**
* 执行过期操作
* @param entry 明细
*/
private void expireKey(Map.Entry<K, Long> entry) {
final K key = entry.getKey();
final Long expireAt = entry.getValue();
// 删除的逻辑处理
long currentTime = System.currentTimeMillis();
if(currentTime >= expireAt) {
expireMap.remove(key);
// 再移除缓存,后续可以通过惰性删除做补偿
cache.remove(key);
}
}
执行流程
- 当
CacheExpire
被创建时,会启动定时清除的子线程 - 当调用cache.expire()和cache.expireAt()方法时,会将key与expiretime保存到expireMap
- 子线程不断执行,检查expireMap是否为空
- 为空,则结束本次执行
- 不为空,检查清除操作
优化
如果过期的应用场景不多,那么经常轮训的意义实际不大。
比如我们的任务 99% 都是在凌晨清空数据,白天无论怎么轮询,纯粹是浪费资源。
那有没有什么方法,可以快速的判断有没有需要处理的过期元素呢?
答案是有的,那就是排序的 MAP。
我们换一种思路,让过期的时间做 key,相同时间的需要过期的信息放在一个列表中,作为 value。
然后对过期时间排序,轮询的时候就可以快速判断出是否有过期的信息了。
CacheExpireSort
public class CacheExpireSort<K,V> implements ICacheExpire<K,V> {
/**
* 单次清空的数量限制
*/
private static final int LIMIT = 100;
/**
* 排序缓存存储
* 使用按照时间排序的缓存处理。
*/
private final Map<Long, List<K>> sortMap = new TreeMap<>(new Comparator<Long>() {
@Override
public int compare(Long o1, Long o2) {
return (int) (o1-o2);
}
});
/**
* 过期 map
*
* 空间换时间
*/
private final Map<K, Long> expireMap = new HashMap<>();
/**
* 缓存实现
*/
private final ICache<K,V> cache;
/**
* 线程执行类
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpireSort(ICache<K, V> cache) {
this.cache = cache;
this.init();
}
/**
* 初始化任务
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 1, 1, TimeUnit.SECONDS);
}
/**
* 定时执行任务
*/
private class ExpireThread implements Runnable {
@Override
public void run() {
//1.判断是否为空
if(MapUtil.isEmpty(sortMap)) {
return;
}
//2. 获取 key 进行处理
int count = 0;
for(Map.Entry<Long, List<K>> entry : sortMap.entrySet()) {
final Long expireAt = entry.getKey();
List<K> expireKeys = entry.getValue();
// 判断队列是否为空
if(CollectionUtil.isEmpty(expireKeys)) {
sortMap.remove(expireAt);
continue;
}
if(count >= LIMIT) {
return;
}
// 删除的逻辑处理
long currentTime = System.currentTimeMillis();
if(currentTime >= expireAt) {
Iterator<K> iterator = expireKeys.iterator();
while (iterator.hasNext()) {
K key = iterator.next();
// 先移除本身
iterator.remove();
expireMap.remove(key);
// 再移除缓存,后续可以通过惰性删除做补偿
cache.remove(key);
count++;
}
} else {
// 直接跳过,没有过期的信息
return;
}
}
}
}
@Override
public void expire(K key, long expireAt) {
List<K> keys = sortMap.get(expireAt);
if(keys == null) {
keys = new ArrayList<>();
}
keys.add(key);
// 设置对应的信息
sortMap.put(expireAt, keys);
expireMap.put(key, expireAt);
}
}
Cache
添加成员变量
private ICacheExpire<K,V> expire;
构造方法
public Cache(Map<K, V> map, int sizeLimit, ICacheEvict<K, V> evict) {
this.map = map;
this.sizeLimit = sizeLimit;
this.evict = evict;
// 修改过期策略时,修改此处即可
this.expire = new CacheExpire<>(this);
}
方法实现,为了方便起见,将两种过期方式转换为一种,多长时间后过期
@Override
public ICache<K, V> expire(K key, long timeInMills) {
long expireTime = System.currentTimeMillis() + timeInMills;
return this.expireAt(key, expireTime);
}
@Override
public ICache<K, V> expireAt(K key, long timeInMills) {
this.cacheExpire.expire(key, timeInMills);
return this;
}
惰性删除
类似于 redis,我们采用定时删除的方案,就有一个问题:可能数据清理的不及时。那当我们查询时,可能获取到到是脏数据。
当我们关心某些数据时,才对数据做对应的删除判断操作,这样压力会小很多。
需要惰性删除的方法:各种数据查询的方法
@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
//1. 刷新所有过期信息
K genericKey = (K) key;
this.cacheExpire.refreshExpire(Collections.singletonList(genericKey));
return map.get(key);
}
在获取数据之前对数据进行刷新,就是refreshExpire()
函数
public void refreshExpire(Collection<K> keyList) {
if(CollectionUtil.isEmpty(keyList)) {
return;
}
// 判断大小。一般都是过期的 keys 比较小。
if(keyList.size() <= expireMap.size()) {
for(K key : keyList) {
expireKey(key);
}
} else {
for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
this.expireKey(entry);
}
}
}
随机删除
Redis内部维护一个定时任务,默认每秒运行10次(通过配置hz控制)。
定时任务中删除过期键逻辑采用了自适应算法,根据键的过期比例、使用快慢两种速率模式回收键
1)定时任务在每个数据库空间随机检查20个键,当发现过期时删除对应的键。
2)如果超过检查数25%的键过期,循环执行回收逻辑直到不足25%或运行超时为止,慢模式下超时时间为25毫秒。
3)如果之前回收键逻辑超时,则在Redis触发内部事件之前再次以快模式运行回收过期键任务,快模式下超时时间为1毫秒且2秒内只能运行1次。
4)快慢两种模式内部删除逻辑相同,只是执行的超时时间不同。
ps: 这里的快慢模式设计的也比较巧妙,根据过期信息的比例,调整对应的任务超时时间。
这里的随机也非常重要,可以比较客观的清理掉过期信息,而不是从头遍历,导致后面的数据无法被访问
基本属性
public class CacheExpireRandom<K,V> implements ICacheExpire<K,V> {
private static final Log log = LogFactory.getLog(CacheExpireRandom.class);
/**
* 单次清空的数量限制
* @since 0.0.16
*/
private static final int COUNT_LIMIT = 100;
/**
* 过期 map
*
* 空间换时间
* @since 0.0.16
*/
private final Map<K, Long> expireMap = new HashMap<>();
/**
* 缓存实现
* @since 0.0.16
*/
private final ICache<K,V> cache;
/**
* 是否启用快模式
* @since 0.0.16
*/
private volatile boolean fastMode = false;
/**
* 线程执行类
* @since 0.0.16
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpireRandom(ICache<K, V> cache) {
this.cache = cache;
this.init();
}
/**
* 初始化任务
* @since 0.0.16
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThreadRandom(), 10, 10, TimeUnit.SECONDS);
}
}
定时任务
这里与redis保持一致,支持fastMode
实际上fastMode和慢模式的逻辑一样,只是超时的时间不同
/**
* 定时执行任务
* @since 0.0.16
*/
private class ExpireThreadRandom implements Runnable {
@Override
public void run() {
//1.判断是否为空
if(MapUtil.isEmpty(expireMap)) {
log.info("expireMap 信息为空,直接跳过本次处理。");
return;
}
//2. 是否启用快模式
if(fastMode) {
expireKeys(10L);
}
//3. 缓慢模式
expireKeys(100L);
}
}
过期信息核心实现
执行过期时,首先会记录超时时间,用于超时时之间中断执行
默认回复fastMode = false,当执行超时的时候设置fastMode=true
/**
* 过期信息
* @param timeoutMills 超时时间
* @since 0.0.16
*/
private void expireKeys(final long timeoutMills) {
// 设置超时时间 100ms
final long timeLimit = System.currentTimeMillis() + timeoutMills;
// 恢复 fastMode
this.fastMode = false;
//2. 获取 key 进行处理
int count = 0;
while (true) {
//2.1 返回判断
if(count >= COUNT_LIMIT) {
log.info("过期淘汰次数已经达到最大次数: {},完成本次执行。", COUNT_LIMIT);
return;
}
if(System.currentTimeMillis() >= timeLimit) {
this.fastMode = true;
log.info("过期淘汰已经达到限制时间,中断本次执行,设置 fastMode=true;");
return;
}
//2.2 随机过期
K key = getRandomKey();
Long expireAt = expireMap.get(key);
boolean expireFlag = expireKey(key, expireAt);
log.debug("key: {} 过期执行结果 {}", key, expireFlag);
//2.3 信息更新
count++;
}
}
随机获取过期key
/**
* 随机获取一个 key 信息
* @return 随机返回的 keys
* @since 0.0.16
*/
private K getRandomKey() {
Random random = ThreadLocalRandom.current();
Set<K> keySet = expireMap.keySet();
List<K> list = new ArrayList<>(keySet);
int randomIndex = random.nextInt(list.size());
return list.get(randomIndex);
}
直接将所有的keys转换为list然后通过random获取一个元素
但是存在一个缺点,getRandomKey()
方法为了获取一个随机的信息,代价还是太大了。如果 keys 的数量非常大,那么我们要创建一个 list,这个本身就是非常耗时的,而且空间复杂度直接翻倍。
优化思路
1.避免空间浪费
最简单的思路应该是避免list的创建
我们所要的只是基于size的随机值,我们可以遍历获取
private K getRandomKey2() {
Random random = ThreadLocalRandom.current();
int randomIndex = random.nextInt(expireMap.size());
// 遍历 keys
Iterator<K> iterator = expireMap.keySet().iterator();
int count = 0;
while (iterator.hasNext()) {
K key = iterator.next();
if(count == randomIndex) {
return key;
}
count++;
}
// 正常逻辑不会到这里
throw new CacheRuntimeException("对应信息不存在");
}
2.批量操作
上述的方法避免了 list 的创建,同时也符合随机的条件。
但是从头遍历到随机的 size 数值,这也是一个比较慢的过程(O(N) 时间复杂度)。
如果我们取 100 次,悲观的话就是 100 * O(N)。
我们可以运用批量的思想,比如一次取 100 个,降低时间复杂度:
/**
* 批量获取多个 key 信息
* @param sizeLimit 大小限制
* @return 随机返回的 keys
* @since 0.0.16
*/
private Set<K> getRandomKeyBatch(final int sizeLimit) {
Random random = ThreadLocalRandom.current();
int randomIndex = random.nextInt(expireMap.size());
// 遍历 keys
Iterator<K> iterator = expireMap.keySet().iterator();
int count = 0;
Set<K> keySet = new HashSet<>();
while (iterator.hasNext()) {
// 判断列表大小
if(keySet.size() >= sizeLimit) {
return keySet;
}
K key = iterator.next();
// index 向后的位置,全部放进来。
if(count >= randomIndex) {
keySet.add(key);
}
count++;
}
// 正常逻辑不会到这里
throw new CacheRuntimeException("对应信息不存在");
}
创建代理对象
相关接口
ICacheProxy
因为后面要实现几种类型的代理对象,所以先抽象出一个接口,方便使用
public interface ICacheProxy {
/**
* 获取代理实现
* @return 代理
*/
Object proxy();
}
ICacheProxyBsContext
/**
* 拦截器信息
* @return 拦截器
*/
CacheInterceptor interceptor();
/**
* 获取代理对象信息
* @return 代理
*/
ICache target();
/**
* 目标对象
* @param target 对象
* @return 结果
*/
ICacheProxyBsContext target(final ICache target);
/**
* 参数信息
* @return 参数信息
*/
Object[] params();
/**
* 方法信息
* @return 方法信息
*/
Method method();
/**
* 方法执行
* @return 执行
* @throws Throwable 异常信息
*/
Object process() throws Throwable;
接口实现
CacheProxyBsContext
public class CacheProxyBsContext implements ICacheProxyBsContext {
/**
* 代理目标对象
*/
private ICache target;
/**
* 方法执行的参数
*/
private Object[] params;
/**
* 方法
* @since 0.0.4
*/
private Method method;
/**
* 新建对象
* @return 对象
*/
public static CacheProxyBsContext newInstance(){
return new CacheProxyBsContext();
}
@Override
public CacheInterceptor interceptor() {
return interceptor;
}
@Override
public ICache target() {
return target;
}
@Override
public ICacheProxyBsContext target(ICache target) {
this.target = target;
return this;
}
@Override
public Object[] params() {
return params;
}
public CacheProxyBsContext params(Object[] params) {
this.params = params;
return this;
}
@Override
public Method method() {
return method;
}
public CacheProxyBsContext method(Method method) {
this.method = method;
return this;
}
@Override
public Object process() throws Throwable {
//通过反射调用方法 也就是目标代理对象要执行的方法
return this.method.invoke(target, params);
}
}
引导类代理对象
创建
public class CacheProxyBs {
private CacheProxyBs(){}
/**
* 代理上下文
*/
private ICacheProxyBsContext context;
public static CacheProxyBs newInstance(){
return new CacheProxyBs();
}
public CacheProxyBs context(ICacheProxyBsContext context) {
this.context = context;
return this;
}
/**
* 该方法在每个方法执行之前会获取其执行的具体信息
* 包括执方法名,参数,执行结果,执行结果
* 在执行前会打印执行时间
* 然后记录执行时间
* 打印最后的执行时间
* @return
* @throws Throwable
*/
@SuppressWarnings("all")
public Object execute() throws Throwable {
long startMills = System.currentTimeMillis();
final ICache cache = context.target();
Object result = context.process();
return result;
}
}
代理对象
创建代理对象的目的是为了对Map的原方法进行一些增强,比如统计某次调用的执行时间,可以在调用前记录下时间后,再调用目标对象的方法,之后再用当前时间减去调用前的时间,即可获取本次调用花费的时间。这个可以用于统计慢操作日志,后续会进行开发。
此外结合自定义注解,可以在一些需要加监听器的方法上添加对应注解,即可在调用此方法时,进行一些操作,比如刷新操作。
jdk动态代理
在java动态代理机制中,InvocationHandler
接口和 Proxy
类是核心。
Proxy 类中使用频率最高的方法是:newProxyInstance() ,这个方法主要用来生成一个代理对象。
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
这个方法一共有 3 个参数:
- loader :类加载器,用于加载代理对象。
- interfaces : 被代理类实现的一些接口;
- h : 实现了 InvocationHandler 接口的对象;
要实现动态代理的话,还必须需要实现InvocationHandler 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时候,这个方法的调用就会被转发到实现InvocationHandler 接口类的 invoke 方法来调用。
public interface InvocationHandler {
/**
* 当你使用代理对象调用方法的时候实际会调用到这个方法
*/
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}
invoke()
有三个参数
- proxy :动态生成的代理类
- method : 与代理类对象调用的方法相对应
- args : 当前 method 方法的参数
也就是说:通过Proxy 类的 newProxyInstance() 创建的代理对象在调用方法的时候,实际会调用到实现InvocationHandler 接口的类的 invoke()方法。 你可以在 invoke() 方法中自定义处理逻辑,比如在方法执行前后做什么事情。
JDK 动态代理类使用步骤
- 定义一个接口及其实现类;
- 自定义
InvocationHandler
并重写invoke
方法,在invoke
方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑; - 通过
Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h)
方法创建代理对象;
public class DynamicProxy implements InvocationHandler, ICacheProxy {
/**
* 被代理的目标对象
*/
private final ICache target;
public DynamicProxy(ICache target) {
this.target = target;
}
/**
* 这种方式虽然实现了异步执行,但是存在一个缺陷:
* 强制用户返回值为 Future 的子类。
*
* 如何实现不影响原来的值,要怎么实现呢?
* @param proxy 原始对象
* @param method 方法
* @param args 入参
* @return 结果
* @throws Throwable 异常
*/
@Override
@SuppressWarnings("all")
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ICacheProxyBsContext context = CacheProxyBsContext.newInstance()
.method(method).params(args).target(target);
return CacheProxyBs.newInstance().context(context).execute();
// execute()
/*
public Object execute() throws Throwable {
long startMills = System.currentTimeMillis();
final ICache cache = context.target();
Object result = context.process();
return result;
*/
// process() 就是通过反射的方式调用目标对象的方法
// this.method.invoke(target, params);
}
}
@Override
public Object proxy() {
// 我们要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法的
InvocationHandler handler = new DynamicProxy(target);
return Proxy.newProxyInstance(handler.getClass().getClassLoader(),
target.getClass().getInterfaces(), handler);
}
}
cglib动态代理
JDK 动态代理有一个最致命的问题是其只能代理实现了接口的类。
CGLIB(Code Generation Library)是一个基于ASM的字节码生成库,它允许我们在运行时对字节码进行修改和动态生成。CGLIB 通过继承方式实现代理。很多知名的开源框架都使用到了CGLIB, 例如 Spring 中的 AOP 模块中:如果目标对象实现了接口,则默认采用 JDK 动态代理,否则采用 CGLIB 动态代理。
在CGLIB动态代理机制中MethodInterceptor
接口和Enhancer
类的核心
所以要自定义自定义 MethodInterceptor
并重写intercept
方法,intercept
用于拦截增强被代理类的方法。
public interface MethodInterceptor
extends Callback{
// 拦截被代理类中的方法
public Object intercept(Object obj, java.lang.reflect.Method method, Object[] args,
MethodProxy proxy) throws Throwable;
}
- obj :被代理的对象(需要增强的对象)
- method :被拦截的方法(需要增强的方法)
- args :方法入参
- methodProxy :用于调用原始方法
你可以通过 Enhancer类来动态获取被代理类,当代理类调用方法的时候,实际调用的是 MethodInterceptor 中的 intercept 方法。
CGLIB 动态代理类使用步骤
- 定义一个类;
- 自定义
MethodInterceptor
并重写intercept
方法,intercept 用于拦截增强被代理类的方法,和 JDK 动态代理中的 invoke 方法类似; - 通过 Enhancer 类的 create()创建代理类;
添加maven依赖
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.3.0</version>
</dependency>
代理实现
public class CglibProxy implements MethodInterceptor, ICacheProxy {
/**
* 被代理的对象
*/
private final ICache target;
public CglibProxy(ICache target) {
this.target = target;
}
@Override
public Object intercept(Object o, Method method, Object[] params, MethodProxy methodProxy) throws Throwable {
ICacheProxyBsContext context = CacheProxyBsContext.newInstance()
.method(method).params(params).target(target);
return CacheProxyBs.newInstance().context(context).execute();
}
@Override
public Object proxy() {
Enhancer enhancer = new Enhancer();
//目标对象类
enhancer.setSuperclass(target.getClass());
enhancer.setCallback(this);
//通过字节码技术创建目标对象类的子类实例作为代理
return enhancer.create();
}
}
获取代理对象
创建一个获取代理对象的工厂类
public final class CacheProxy {
private CacheProxy(){}
/**
* 获取对象代理
* @param <K> 泛型 key
* @param <V> 泛型 value
* @param cache 对象代理
* @return 代理信息
*/
@SuppressWarnings("all")
public static <K,V> ICache<K,V> getProxy(final ICache<K,V> cache) {
if(ObjectUtil.isNull(cache)) {
return (ICache<K,V>) new NoneProxy(cache).proxy();
}
final Class clazz = cache.getClass();
// 如果targetClass本身是个接口或者targetClass是JDK Proxy生成的,则使用JDK动态代理。
// 参考 spring 的 AOP 判断
if (clazz.isInterface() || Proxy.isProxyClass(clazz)) {
return (ICache<K,V>) new DynamicProxy(cache).proxy();
}
return (ICache<K,V>) new CglibProxy(cache).proxy();
}
}
实现监听器
注解
该注解的作用域是方法
/**
* 缓存拦截器注解
*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {
/**
* 通用拦截器
*
* 1. 耗时统计
* 2. 慢日志统计
*
* etc.
* @return 默认开启
*/
boolean common() default true;
}
那么如何使用注解呢?
创建一个测试类,添加两个公用方法
public class test {
@CacheInterceptor
public void test1(){
System.out.println("test1");
}
public void test2() {
System.out.println("test2");
}
}
那么如何判断方法是否加了注解
public static void main(String[] args) {
Method[] methods = test.class.getMethods();
for (Method method : methods) {
System.out.println(method.getName());
System.out.println(method.getAnnotation(CacheInterceptor.class));
}
}
如果方法上加了@CacheInterceptor
注解,那么就能看到一下的打印信息
test2
null
test1
@com.sunzy.annotation.CacheInterceptor(common=true)
所以用这种方法判断,在哪些方法执行时,需要进行相应的处理。
相关接口
ICacheInterceptorContext
为监听器执行时提供相关参数
public interface ICacheInterceptorContext<K, V>{
/**
* 缓存信息
* @return 缓存信息
*/
ICache<K,V> cache();
/**
* 执行的方法信息
* @return 方法
*/
Method method();
/**
* 执行的参数
* @return 参数
*/
Object[] params();
/**
* 方法执行的结果
* @return 结果
*/
Object result();
/**
* 开始时间
* @return 时间
*/
long startMills();
/**
* 结束时间
* @return 时间
*/
long endMills();
}
ICacheInterceptor
在执行方法前后需要执行的操作
public interface ICacheInterceptor<K,V> {
/**
* 方法执行之前
* @param context 上下文
*/
void before(ICacheInterceptorContext<K,V> context);
/**
* 方法执行之后
* @param context 上下文
*/
void after(ICacheInterceptorContext<K,V> context);
}
接口实现类
CacheInterceptorCost
执行耗时统计监听器,再添加一个慢日志阈值即可实现判断是否为慢操作
public class CacheInterceptorCost<K, V> implements ICacheInterceptor<K, V> {
private static final Log log = LogFactory.getLog(CacheInterceptorCost.class);
@Override
public void before(ICacheInterceptorContext<K, V> context) {
log.debug("Cost start, method: {}", context.method().getName());
}
@Override
public void after(ICacheInterceptorContext<K, V> context) {
long costMills = context.endMills() - context.startMills();
String methodName = context.method().getName();
log.debug("Cost end, method: {}, cost: {}ms", methodName, costMills);
// 添加慢操作日志
List<ICacheSlowListener> slowListenerList = context.cache().slowListeners();
if (CollectionUtil.isNotEmpty(slowListenerList)){
CacheSlowListenerContext listenerContext = CacheSlowListenerContext.newInstance()
.startTimeMills(context.startMills())
.endTimeMills(context.endMills())
.costTimeMills(costMills)
.methodName(methodName)
.params(context.params())
.result(context.result());
for (ICacheSlowListener slowListener : slowListenerList) {
// 超过慢日志的阈值 则认定为满操作
if(costMills >= slowListener.slowerThanMills()){
slowListener.listen(listenerContext);
}
}
}
}
}
CacheInterceptorContext
监听器执行的上下文
public class CacheInterceptorContext<K,V> implements ICacheInterceptorContext<K,V> {
private ICache<K,V> cache;
/**
* 执行的方法信息
*/
private Method method;
/**
* 执行的参数
*/
private Object[] params;
/**
* 方法执行的结果
*/
private Object result;
/**
* 开始时间
*/
private long startMills;
/**
* 结束时间
*/
private long endMills;
public static <K,V> CacheInterceptorContext<K,V> newInstance() {
return new CacheInterceptorContext<>();
}
@Override
public ICache<K, V> cache() {
return cache;
}
public CacheInterceptorContext<K, V> cache(ICache<K, V> cache) {
this.cache = cache;
return this;
}
@Override
public Method method() {
return method;
}
public CacheInterceptorContext<K, V> method(Method method) {
this.method = method;
return this;
}
@Override
public Object[] params() {
return params;
}
public CacheInterceptorContext<K, V> params(Object[] params) {
this.params = params;
return this;
}
@Override
public Object result() {
return result;
}
public CacheInterceptorContext<K, V> result(Object result) {
this.result = result;
return this;
}
@Override
public long startMills() {
return startMills;
}
public CacheInterceptorContext<K, V> startMills(long startMills) {
this.startMills = startMills;
return this;
}
@Override
public long endMills() {
return endMills;
}
public CacheInterceptorContext<K, V> endMills(long endMills) {
this.endMills = endMills;
return this;
}
}
CacheProxyBs
在CacheProxyBs添加相关监听器
public class CacheProxyBs {
private CacheProxyBs(){}
/**
* 代理上下文
*/
private ICacheProxyBsContext context;
/**
* 默认通用拦截器
*
* JDK 的泛型擦除导致这里不能使用泛型
*/
@SuppressWarnings("all")
private final List<ICacheInterceptor> commonInterceptors = CacheInterceptors.defaultCommonList();
public static CacheProxyBs newInstance(){
return new CacheProxyBs();
}
public CacheProxyBs context(ICacheProxyBsContext context) {
this.context = context;
return this;
}
/**
* 该方法在每个方法执行之前会获取其执行的具体信息
* 包括执方法名,参数,执行结果,执行结果
* 在执行前会打印执行时间
* 然后记录执行时间
* 打印最后的执行时间
* @return
* @throws Throwable
*/
@SuppressWarnings("all")
public Object execute() throws Throwable {
long startMills = System.currentTimeMillis();
final ICache cache = context.target();
Object result = context.process();
return result;
}
}
通过注解判断,需要执行什么监听器
private void interceptorHandler(CacheInterceptor cacheInterceptor,
CacheInterceptorContext interceptorContext,
ICache cache,
boolean before) {
if(cacheInterceptor != null) {
//1. 通用
if(cacheInterceptor.common()) {
for(ICacheInterceptor interceptor : commonInterceptors) {
if(before) {
interceptor.before(interceptorContext);
} else {
interceptor.after(interceptorContext);
}
}
}
}
}
修改代理对象调用的方法execute()
public Object execute() throws Throwable {
//1. 开始的时间
final long startMills = System.currentTimeMillis();
final ICache cache = context.target();
CacheInterceptorContext interceptorContext = CacheInterceptorContext.newInstance()
.startMills(startMills)
.method(context.method())
.params(context.params())
.cache(context.target());
//1. 获取刷新注解信息
CacheInterceptor cacheInterceptor = context.interceptor();
this.interceptorHandler(cacheInterceptor, interceptorContext, cache, true);
//2. 正常执行
Object result = context.process();
final long endMills = System.currentTimeMillis();
interceptorContext.endMills(endMills).result(result);
// 方法执行完成
this.interceptorHandler(cacheInterceptor, interceptorContext, cache, false);
return result;
}
CacheBs
修改引导类对象中的build()
方法
public ICache<K,V> build() {
Cache<K,V> cache = new Cache<>();
cache.map(map);
cache.evict(evict);
cache.sizeLimit(size);
// 初始化
cache.init();
// 创建代理对象
return CacheProxy.getProxy(cache);
}
测试
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.build();
cache.put("1", "1");
cache.put("2", "2");
cache.expire("1", 10);
TimeUnit.MILLISECONDS.sleep(50);
System.out.println(cache.keySet());
持久化
redis的持久化有两种RDB和AOF,由于项目中这两种方案的实现比较简陋,就不在此多介绍了
CachePresist
实现一个基于json的持久化,创建一个CachePresistDbJson实现ICachePresist接口,该类中需要有一个成员变量用于获取保存json文件的地址
public class CachePersistDbJson<K,V> implements ICachePersist<K,V> {
/**
* 数据库路径
* @since 0.0.8
*/
private final String dbPath;
public CachePersistDbJson(String dbPath) {
this.dbPath = dbPath;
}
/**
* 持久化
* key长度 key+value
* 第一个空格,获取 key 的长度,然后截取
* @param cache 缓存
*/
@Override
public void persist(ICache<K, V> cache) {
Set<Map.Entry<K,V>> entrySet = cache.entrySet();
// 创建文件
FileUtil.createFile(dbPath);
// 清空文件
FileUtil.truncate(dbPath);
for(Map.Entry<K,V> entry : entrySet) {
K key = entry.getKey();
Long expireTime = cache.expire().expireTime(key);
PersistEntry<K,V> persistEntry = new PersistEntry<>();
persistEntry.setKey(key);
persistEntry.setValue(entry.getValue());
persistEntry.setExpire(expireTime);
String line = JSON.toJSONString(persistEntry);
FileUtil.write(dbPath, line, StandardOpenOption.APPEND);
}
}
}
定时执行
有了持久化策略,需要有触发方式,这里采用定时执行的方式
public class InnerCachePersist<K,V> {
private static final Log log = LogFactory.getLog(InnerCachePersist.class);
/**
* 缓存信息
* @since 0.0.8
*/
private final ICache<K,V> cache;
/**
* 缓存持久化策略
* @since 0.0.8
*/
private final ICachePersist<K,V> persist;
/**
* 线程执行类
* @since 0.0.3
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public InnerCachePersist(ICache<K, V> cache, ICachePersist<K, V> persist) {
this.cache = cache;
this.persist = persist;
// 初始化
this.init();
}
/**
* 初始化
* @since 0.0.8
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("开始持久化缓存信息");
persist.persist(cache);
log.info("完成持久化缓存信息");
} catch (Exception exception) {
log.error("文件持久化异常", exception);
}
}
}, 0, 1, TimeUnit.MINUTES);
}
}
这里需要注意是定时任务的实现方式,每隔一分钟执行一次
本地数据加载
有了持久化策略,还要有加载策略。将持久化的json文件内容解析并存入cache中。实现过程也比较简单
public class CacheLoadJSON<K, V> implements ICacheLoad<K, V> {
private static final Log log = LogFactory.getLog(CacheLoadJSON.class);
private String dbPath;
public CacheLoadJSON(String dbPath) {
this.dbPath = dbPath;
}
@Override
public void load(ICache<K, V> cache) {
List<String> lines = FileUtil.readAllLines(dbPath);
log.info("[load] 开始处理 path: {}", dbPath);
if(CollectionUtil.isEmpty(lines)){
log.info("[load] path: {} 文件内容为空,直接返回", dbPath);
return;
}
for (String line : lines) {
if(StringUtil.isEmpty(line)){
continue;
}
// 反序列化 将文件中的数据保存到cache中
PersistEntry<K, V> persistEntry = JSON.parseObject(line, PersistEntry.class);
K key = persistEntry.getKey();
V value = persistEntry.getValue();
Long expireTime = persistEntry.getExpireTime();
cache.put(key, value);
if(!ObjectUtil.isEmpty(expireTime)){
cache.expireAt(key, expireTime);
}
}
}
}
这里只完成了类似于redis的rdb持久化,其中可优化的点还有很多,比如rdb文件的压缩、格式的定义 、CRC校验等
数据删除监听器
当 有数据被从cache中删除时,在控制台中打印出详细信息,这就涉及到两种删除操作
- 当cache容量已满,数据被淘汰
- 设置过期时间的key,到达过期时间
所以需要针对以上两种添加监听器
相关接口
ICacheRemoveListener
将监听器抽象成一个接口,方便后续添加不同的监听器
public interface ICacheRemoveListener<K,V> {
/**
* 监听
* @param context 上下文
*/
void listen(final ICacheRemoveListenerContext<K,V> context);
}
ICacheRemoveListenerContext
监听器执行时需要的参数
public interface ICacheRemoveListenerContext<K,V> {
/**
* 清空的 key
* @return key
*/
K key();
/**
* 值
* @return 值
*/
V value();
/**
* 删除类型
* @return 类型
*/
String type();
}
接口实现
CacheRemoveListener
移除操作的实现比较简单,当有key被删除时,在控制台中打印出本次操作的key,value以及移除的类型(evict,expire),所以还需要一个枚举类,来表示操作的类型。
public class CacheRemoveListener<K,V> implements ICacheRemoveListener<K,V> {
private static final Log log = LogFactory.getLog(CacheRemoveListener.class);
@Override
public void listen(ICacheRemoveListenerContext<K, V> context) {
log.debug("Remove key: {}, value: {}, type: {}",
context.key(), context.value(), context.type());
}
}
CacheRemoveListenerContext
CacheRemoveListenerContext该类的实现比较简单,就是给成员变量赋值即可,并且用到了单例模式
public class CacheRemoveListenerContext<K,V> implements ICacheRemoveListenerContext<K,V> {
/**
* key
*/
private K key;
/**
* 值
*/
private V value;
/**
* 删除类型
*/
private String type;
/**
* 新建实例
* @param <K> key
* @param <V> value
* @since 0.0.6
*/
public static <K,V> CacheRemoveListenerContext<K,V> newInstance() {
return new CacheRemoveListenerContext<>();
}
@Override
public K key() {
return key;
}
public CacheRemoveListenerContext<K, V> key(K key) {
this.key = key;
return this;
}
@Override
public V value() {
return value;
}
public CacheRemoveListenerContext<K, V> value(V value) {
this.value = value;
return this;
}
@Override
public String type() {
return type;
}
public CacheRemoveListenerContext<K, V> type(String type) {
this.type = type;
return this;
}
}
CacheRemoveType
表示移除类型的枚举类
public enum CacheRemoveType {
EXPIRE("expire", "过期"),
EVICT("evict", "淘汰"),
;
private final String code;
private final String desc;
CacheRemoveType(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String code() {
return code;
}
public String desc() {
return desc;
}
@Override
public String toString() {
return "CacheRemoveType{" +
"code='" + code + '\'' +
", desc='" + desc + '\'' +
'}';
}
}
因为同类型的监听器可能有多种,所以需要创建一个获取该类型监听器的工厂类
CacheRemoveListeners
public class CacheRemoveListeners {
private CacheRemoveListeners(){}
/**
* 默认监听类
* @return 监听类列表
* @param <K> key
* @param <V> value
*/
@SuppressWarnings("all")
public static <K,V> List<ICacheRemoveListener<K,V>> defaults() {
List<ICacheRemoveListener<K,V>> listeners = new ArrayList<>();
listeners.add(new CacheRemoveListener());
return listeners;
}
}
所有准备工作都做完后,接下来就是将监听器加入到cache中
在cache和cacheBs中添加成员变量
cache中代码
/**
* 删除监听类
*/
private List<ICacheRemoveListener<K,V>> removeListeners;
@Override
public List<ICacheRemoveListener<K, V>> removeListeners() {
return removeListeners;
}
public Cache<K, V> removeListeners(List<ICacheRemoveListener<K, V>> removeListeners) {
this.removeListeners = removeListeners;
return this;
}
cacheBs中的代码
/**
* 删除监听类
*/
private final List<ICacheRemoveListener<K,V>> removeListeners = CacheRemoveListeners.defaults();
// 同时添加对应的构造方法,这样用户可以自定义对应的监听器
public CacheBs<K, V> addRemoveListener(ICacheRemoveListener<K,V> removeListener) {
ArgUtil.notNull(removeListener, "removeListener");
this.removeListeners.add(removeListener);
return this;
}
public ICache<K,V> build() {
Cache<K,V> cache = new Cache<>();
cache.map(map);
cache.evict(evict);
cache.sizeLimit(size);
cache.removeListeners(removeListeners);
// 初始化
cache.init();
return CacheProxy.getProxy(cache);
}
执行监听器
有两个位置需要执行删除监听器
- key过期
- key淘汰
key过期
首先修改key过期的代码,上述的CacheExpire和CacheExpireSort中是执行过期淘汰的类,其中expireKey()
方法执行真正的淘汰的操作,所以就是在这里加入监听器
private void expireKey(final K key, final Long expireAt) {
if(expireAt == null) {
return;
}
long currentTime = System.currentTimeMillis();
if(currentTime >= expireAt) {
expireMap.remove(key);
// 再移除缓存,后续可以通过惰性删除做补偿
V removeValue = cache.remove(key);
// 执行淘汰监听器
ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(removeValue).type(CacheRemoveType.EXPIRE.code());
for(ICacheRemoveListener<K,V> listener : cache.removeListeners()) {
listener.listen(removeListenerContext);
}
}
}
key淘汰
对于第二种情况,也可以采用相同的方法,在CacheEvictFIFO
中的evict()
方法中执行监听器
@Override
public void evict(ICacheEvictContext<K, V> context) {
final ICache<K,V> cache = context.cache();
// 超过限制,执行移除
if(cache.size() >= context.size()) {
K evictKey = queue.remove();
// 移除最开始的元素
cache.remove(evictKey);
// 执行淘汰监听器
ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(removeValue).type(CacheRemoveType.EVICT.code());
for(ICacheRemoveListener<K,V> listener : cache.removeListeners()) {
listener.listen(removeListenerContext);
}
}
// 将新加的元素放入队尾
final K key = context.key();
queue.add(key);
}
此外还有一种方法,修改cache
中的put()
方法,因为在添加元素之前,都会执行一次淘汰操作就是evict()
,那么可以将被淘汰的元素作为返回值返回,在put()
方法中执行监听器。所以需要创建一个接收淘汰元素的类
public class CacheEntry<K,V> implements ICacheEntry<K,V> {
/**
* key
*/
private final K key;
/**
* value
*/
private final V value;
/**
* 新建元素
* @param key key
* @param value value
* @param <K> 泛型
* @param <V> 泛型
* @return 结果
*/
public static <K,V> CacheEntry<K,V> of(final K key,
final V value) {
return new CacheEntry<>(key, value);
}
public CacheEntry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K key() {
return key;
}
@Override
public V value() {
return value;
}
@Override
public String toString() {
return "EvictEntry{" +
"key=" + key +
", value=" + value +
'}';
}
}
在put()
中接收被淘汰的元素信息
public V put(K key, V value) {
//1.1 尝试驱除
CacheEvictContext<K,V> context = new CacheEvictContext<>();
context.key(key).size(sizeLimit).cache(this);
ICacheEntry<K,V> evictEntry = evict.evict(context);
// 添加拦截器调用
if(ObjectUtil.isNotNull(evictEntry)) {
// 执行淘汰监听器
ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(evictEntry.key())
.value(evictEntry.value())
.type(CacheRemoveType.EVICT.code());
for(ICacheRemoveListener<K,V> listener : context.cache().removeListeners()) {
listener.listen(removeListenerContext);
}
}
//2. 判断驱除后的信息
if(isSizeLimit()) {
throw new CacheRuntimeException("当前队列已满,数据添加失败!");
}
//3. 执行添加
return map.put(key, value);
}
同样也能实现对删除操作的监听,这样做的好处是,后续添加不同的淘汰策略时,不用在每一种策略中添加执行监听器的代码,减少了代码的冗余。
测试
public class MyRemoveListener<K,V> implements ICacheRemoveListener<K,V> {
@Override
public void listen(ICacheRemoveListenerContext<K, V> context) {
System.out.println("【删除提示】:" + context.key());
}
}
@Test
public void cacheRemoveListenerTest() {
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(1)
.addRemoveListener(new MyRemoveListener<String, String>()) // 可以根据需要添加不同的监听器
.build();
cache.put("1", "1");
cache.put("2", "2");
}
[DEBUG] [2023-05-26 11:27:03.186] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put
[DEBUG] [2023-05-26 11:27:03.208] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 34ms
[DEBUG] [2023-05-26 11:27:03.208] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put
0
[DEBUG] [2023-05-26 11:27:03.250] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: evict
【删除提示】:1
[DEBUG] [2023-05-26 11:27:03.250] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 42ms
LRU 缓存淘汰策略
LRU 是由 Least Recently Used 的首字母组成,表示最近最少使用的含义,一般使用在对象淘汰算法上。
也是比较常见的一种淘汰算法。
其核心思想是如果数据最近被访问过,那么将来被访问的几率也更高。
以上的理论是基于连续性准则。
时间连续性
对于信息的访问,最近被访问过,再次被访问的可能性会很高。LRU就是基于这个理念
空间连续性
对于磁盘信息的访问,将很有访问连续的空间信息。所有会有page预取来提升性能
java的实现步骤
- 新数据插入到链表头部
- 每当缓存命中(即缓存数据被访问),就将数据移到链表的头部
- 当链表满的时候,将链表尾部的数据丢弃
思考
1.如何判断是新数据?
每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
2.什么是缓存命中
- put(key, value)的情况,就是新元素。如果已经有这个元素,可以先删除,再加入
- get(key)的情况,对于元素的访问,删除已有的元素,将新元素再放到链表的头部
- remove(key)移除一个元素,直接删除已有的元素
- keySet() valueSet() entrySet() 这些属于无差别访问,我们不对队列做调整。
3.移除
当链表满的时候,将链表尾部的数据丢弃。
链表满只有一种场景,那就是添加元素的时候,也就是执行 put(key, value) 的时候。
直接删除对应的 key 即可。
相关接口
和FIFO的接口保持一致,调用地方不变
为了后续的LRU实现,新增remove/update两个方法
public interface ICacheEvict<K, V> {
/**
* 驱除策略
*
* @param context 上下文
* @return 是否执行驱除
*/
boolean evict(final ICacheEvictContext<K, V> context);
/**
* 更新 key 信息
* @param key key
*/
void update(final K key);
/**
* 删除 key 信息
* @param key key
*/
void remove(final K key);
}
代码实现
CacheEvictLRU
直接基于 LinkedList 实现:
/**
* 丢弃策略-LRU 最近最少使用
*/
public class CacheEvictLRU<K,V> implements ICacheEvict<K,V> {
private static final Log log = LogFactory.getLog(CacheEvictLRU.class);
/**
* list 信息
*/
private final List<K> list = new LinkedList<>();
@Override
public boolean evict(ICacheEvictContext<K, V> context) {
boolean result = false;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
K evictKey = list.get(list.size()-1);
// 移除对应的元素
cache.remove(evictKey);
result = true;
}
return result;
}
/**
* 放入元素
* (1)删除已经存在的
* (2)新元素放到元素头部
*
* @param key 元素
*/
@Override
public void update(final K key) {
this.list.remove(key);
this.list.add(0, key);
}
/**
* 移除元素
* @param key 元素
*/
@Override
public void remove(final K key) {
this.list.remove(key);
}
}
实现的比较简单,相对FiFO多了三个方法
- update(): 做了一点简化,认为只要是访问就是删除,然后插入到队首
- remove():删除就是直接从链表中删除
缺点:
插入数据或者获取数据时需要扫描整个链表,每次确认元素是否存在,都要消耗O(n)的时间复杂度查询。
CacheEvictLruDoubleListMap
基于双向链表和Hash表改进需要扫描链表的缺陷,配合hash表,将数据和链表的节点形成映射,将插入操作和读取操作的时间复杂度从O(n)降至O(1)
- 双向链表保持不变
- Map中key对应的值放双向链表的节点信息
双向链表的节点定义
public class DoubleListNode<K, V>{
private K key;
private V value;
private DoubleListNode<K, V> pre;
private DoubleListNode<K, V> next;
public K key() {
return key;
}
public DoubleListNode<K, V> key(K key) {
this.key = key;
return this;
}
public V value() {
return value;
}
public DoubleListNode<K, V> value(V value) {
this.value = value;
return this;
}
public DoubleListNode<K, V> pre() {
return pre;
}
public DoubleListNode<K, V> pre(DoubleListNode<K, V> pre) {
this.pre = pre;
return this;
}
public DoubleListNode<K, V> next() {
return next;
}
public DoubleListNode<K, V> next(DoubleListNode<K, V> next) {
this.next = next;
return this;
}
}
LRU缓存实现
public class CacheEvictLruDoubleListMap<K, V> extends AbstractCacheEvict<K, V>{
private static final Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class);
/**
* 头节点
*/
private DoubleListNode<K, V> head;
/**
* 尾节点
*/
private DoubleListNode<K, V> tail;
/**
* map 信息
*
* key: 元素信息
* value: 元素在 list 中对应的节点信息
*/
private Map<K, DoubleListNode<K, V>> indexMap;
public CacheEvictLruDoubleListMap() {
this.indexMap = new HashMap<>();
this.head = new DoubleListNode<>();
this.tail = new DoubleListNode<>();
this.head.next(this.tail);
this.tail.pre(this.head);
}
@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
ICache<K, V> cache = context.cache();
// 超出限制 移除队尾元素
if(cache.size() >= context.size()){
// 获取队尾元素的前一个元素
DoubleListNode<K, V> tailPre = this.tail.pre();
if(tailPre == this.head){
log.error("当前列表为空,无法进行删除");
throw new RuntimeException("不可删除头结点");
}
K evictKey = tailPre.key();
V removeValue = cache.remove(evictKey);
// 同时删除双向链表中的元素
removeKey(evictKey);
result = new CacheEntry<>(evictKey, removeValue);
}
return result;
}
/**
* 放入元素
* (1) 删除已经存在的元素
* (2) 新元素放到头部
* @param key
*/
@Override
public void updateKey(K key) {
// 先删除元素
this.removeKey(key);
// 给添加的key创建一个节点
DoubleListNode<K, V> newNode = new DoubleListNode<>();
newNode.key(key);
// 获取head的next节点
DoubleListNode<K, V> next = this.head.next();
// head的next指向新节点
this.head.next(newNode);
// 新节点的pre指向head
newNode.pre(this.head);
// next的pre指向新节点
next.pre(newNode);
// 新的节点的next指向next
newNode.next(next);
//将节点信息插入到map中
indexMap.put(key, newNode);
}
/**
* 移除元素
*
* 1. 获取 map 中的元素
* 2. 不存在直接返回,存在执行以下步骤:
* 2.1 删除双向链表中的元素
* 2.2 删除 map 中的元素
*
* @param key
*/
@Override
public void removeKey(K key) {
DoubleListNode<K, V> node = indexMap.get(key);
if(ObjectUtil.isEmpty(node)){
return;
}
DoubleListNode<K, V> pre = node.pre();
DoubleListNode<K, V> next = node.next();
// 从链表中摘除该节点
pre.next(next);
next.pre(pre);
// 删除map中node节点
this.indexMap.remove(key);
}
}
这样就实现了获取节点的时间复杂度为O(1)
基于LinkedHashMap
LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现
public class LRUCache extends LinkedHashMap {
private int capacity;
public LRUCache(int capacity) {
// 注意这里将LinkedHashMap的accessOrder设为true
super(16, 0.75f, true);
this.capacity = capacity;
}
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return super.size() >= capacity;
}
}
项目中的实现稍微复杂一些
public class CacheEvictLruLinkedHashMap<K, V> extends LinkedHashMap<K,V> implements ICacheEvict<K,V> {
private static final Log log = LogFactory.getLog(CacheEvictLruLinkedHashMap.class);
/**
* 是否移除标识
*/
private volatile boolean removeFlag = false;
/**
* 最旧的一个元素
*/
private transient Map.Entry<K, V> eldest = null;
public CacheEvictLruLinkedHashMap() {
super(16, 0.75f, true);
}
@Override
public ICacheEntry<K, V> evict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
removeFlag = true;
// 执行 put 操作
super.put(context.key(), null);
// 构建淘汰的元素
K evictKey = eldest.getKey();
// 从缓存中移除
V evictValue = cache.remove(evictKey);
result = new CacheEntry<>(evictKey, evictValue);
} else {
removeFlag = false;
}
return result;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
this.eldest = eldest;
return removeFlag;
}
@Override
public void updateKey(K key) {
super.put(key, null);
}
@Override
public void removeKey(K key) {
super.remove(key);
}
}
LRU-2
当存在热点数据时,LRU的效率很好,但是批量操作会导致LRU的命中率急剧下降,缓存污染的情况比较严重
LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。
LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”
相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。
当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。
数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;
当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;
缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。
LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择
public class CacheEvictLru2<K, V> extends AbstractCacheEvict<K, V> {
private static Log log = LogFactory.getLog(CacheEvictLru2.class);
/**
* 访问一次的key
*/
private LruMapDoubleList<K, V> firstLruMap;
/**
* 访问一次以上的key
*/
private LruMapDoubleList<K, V> moreLruMap;
public CacheEvictLru2() {
this.firstLruMap = new LruMapDoubleList<>();
this.moreLruMap = new LruMapDoubleList<>();
}
@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
ICache<K, V> cache = context.cache();
if(cache.size() >= context.size()){
ICacheEntry<K, V> evictEntry = null;
if(!firstLruMap.isEmpty()){
evictEntry = firstLruMap.removeEldest();
log.debug("从 firstLruMap 中淘汰数据:{}", evictEntry.key());
} else {
evictEntry = moreLruMap.removeEldest();
log.debug("从 moreLruMap 中淘汰数据:{}", evictEntry.key());
}
// 缓存移除
final K evictKey = evictEntry.key();
V evictValue = cache.remove(evictKey);
result = new CacheEntry<>(evictKey, evictValue);
}
return result;
}
@Override
public void updateKey(K key) {
if(moreLruMap.contains(key) || firstLruMap.contains(key)) {
this.removeKey(key);
moreLruMap.updateKey(key);
log.debug("key: {} 多次访问,加入到 moreLruMap 中", key);
} else {
firstLruMap.updateKey(key);
log.debug("key: {} 第一次访问,加入到 firstLruMap 中", key);
}
}
@Override
public void removeKey(K key) {
if(moreLruMap.contains(key)){
moreLruMap.removeKey(key);
log.debug("key: {} 从 moreLruMap 中移除", key);
} else {
firstLruMap.removeKey(key);
log.debug("key: {} 从 firstLruMap 中移除", key);
}
}
}
在doEvict中,当需要淘汰数据时,优先从firstLruMap中删除数据,如果为空则从moreLruMap中删除数据
在removeKey()中,如果key在moreLruMap中,则从moreLruMap中删除,否则从firstLruMap中删除
在UpdateKey()中
- 如果moreLruMap中已经存在key,则先删除key,再插入key
- 如果firstLruMap中已经存在,则处理firstLruMap队列,先从firstLruMap中删除,插入到moreLruMap队列中
- 如果key不在firstLruMap和moreLruMap中,说明是新元素,直接插入到firstLruMap的开始即可
小结
对于LRU算法的改进主要有两点
- 性能的改进,从O(N)优化到了O(1)
- 批量操作的改进,避免了缓存污染
LFU
LFU是最近最不常用,是一种基于访问频次的一种算法,所以需要额外的空间存储数据访问的次数。
核心思想
如果一个数据在最近一段时间内使用次数很少,那么在将来一段时间被使用的可能性也很小
O(N) 的删除
为了能够淘汰最少使用的数据,个人第一直觉就是直接一个 HashMap<String, Interger>
, String 对应 key 信息,Integer 对应次数。
每次访问到就去+1,设置和读取的时间复杂度都是 O(1);不过删除就比较麻烦了,需要全部遍历对比,时间复杂度为 O(n);
O(logn) 的删除
另外还有一种实现思路就是利用小顶堆+hashmap,小顶堆插入、删除操作都能达到O(logn)时间复杂度,因此效率相比第一种实现方法更加高效。比如 TreeMap。
O(1) 的删除
CacheInterceptor
方法已经写好,但是什么方法需要调用,如何调用?
这里还是基于注解的实现
/**
* 是否执行驱除更新
*
* 主要用于 LRU/LFU 等驱除策略
* @return 是否
*/
boolean evict() default false;
cache
上面思考中,什么是缓存命中,说明了哪些方法需要添加注释
- get()
- containsKey()
- put()
- remove()
@Override
@CacheInterceptor(evict = true)
public boolean containsKey(Object key) {
return map.containsKey(key);
}
@Override
@CacheInterceptor(evict = true)
@SuppressWarnings("unchecked")
public V get(Object key) {
//1. 刷新所有过期信息
K genericKey = (K) key;
this.expire.refreshExpire(Collections.singletonList(genericKey));
return map.get(key);
}
@Override
@CacheInterceptor(aof = true, evict = true)
public V put(K key, V value) {
//...
}
@Override
@CacheInterceptor(aof = true, evict = true)
public V remove(Object key) {
return map.remove(key);
}
注解驱逐拦截器实现
执行顺序:放在方法之后更新,不然每次当前操作的 key 都会被放在最前面。
public class CacheInterceptorEvict<K,V> implements ICacheInterceptor<K, V> {
private static final Log log = LogFactory.getLog(CacheInterceptorEvict.class);
@Override
public void before(ICacheInterceptorContext<K,V> context) {
}
@Override
@SuppressWarnings("all")
public void after(ICacheInterceptorContext<K,V> context) {
ICacheEvict<K,V> evict = context.cache().evict();
Method method = context.method();
final K key = (K) context.params()[0];
if("remove".equals(method.getName())) {
evict.remove(key);
} else {
evict.update(key);
}
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!