扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章将为大家详细讲解有关redis中如何实现限流策略,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
站在用户的角度思考问题,与客户深入沟通,找到阳西网站设计与阳西网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:网站设计、成都网站设计、企业官网、英文网站、手机端网站、网站推广、域名注册、雅安服务器托管、企业邮箱。业务覆盖阳西地区。
当系统处理能力有限,如何组织计划外的请求对系统施压。首先我们先看下一些简单的限流策略,防止暴力攻击。比如要对IP访问,没5s只能访问10次,超过进行拦截。
如上图,一般使用滑动窗口来统计区间时间内的访问次数。
使用 zset
记录 IP
访问次数,每个 IP
通过 key
保存下来,score
保存当前时间戳,value
唯一用时间戳或者UUID来实现
public class RedisLimiterTest { private Jedis jedis; public RedisLimiterTest(Jedis jedis) { this.jedis = jedis; } /** * @param ipAddress Ip地址 * @param period 特定的时间内,单位秒 * @param maxCount 最大允许的次数 * @return */ public boolean isIpLimit(String ipAddress, int period, int maxCount) { String key = String.format("ip:%s", ipAddress); // 毫秒时间戳 long currentTimeMillis = System.currentTimeMillis(); Pipeline pipe = jedis.pipelined(); // redis事务,保证原子性 pipe.multi(); // 存放数据,value 和 score 都使用毫秒时间戳 pipe.zadd(key, currentTimeMillis, "" + UUID.randomUUID()); // 移除窗口区间所有的元素 pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000); // 获取时间窗口内的行为数量 Responsecount = pipe.zcard(key); // 设置 zset 过期时间,避免冷用户持续占用内存,这里宽限1s pipe.expire(key, period + 1); // 提交事务 pipe.exec(); pipe.close(); // 比较数量是否超标 return count.get() > maxCount; } public static void main(String[] args) { Jedis jedis = new Jedis("localhost", 6379); RedisLimiterTest limiter = new RedisLimiterTest(jedis); for (int i = 1; i <= 20; i++) { // 验证IP 10秒钟之内只能访问5次 boolean isLimit = limiter.isIpLimit("222.73.55.22", 10, 5); System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问")); } } }
执行结果
访问第1次, 结果:允许访问 访问第2次, 结果:允许访问 访问第3次, 结果:允许访问 访问第4次, 结果:允许访问 访问第5次, 结果:允许访问 访问第6次, 结果:限制访问 访问第7次, 结果:限制访问 ... ...
缺点:要记录时间窗口所有的行为记录,量很大,比如,限定60s内不能超过100万次这种场景,不太适合这样限流,因为会消耗大量的储存空间。
漏斗的容量是限定的,如果满了,就装不进去了。
如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。
如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。
如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。
public class FunnelLimiterTest { static class Funnel { int capacity; // 漏斗容量 float leakingRate; // 漏嘴流水速率 int leftQuota; // 漏斗剩余空间 long leakingTs; // 上一次漏水时间 public Funnel(int capacity, float leakingRate) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = capacity; this.leakingTs = System.currentTimeMillis(); } void makeSpace() { long nowTs = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久 int deltaQuota = (int) (deltaTs * leakingRate); // 腾出的空间 = 时间*漏水速率 if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出 this.leftQuota = capacity; this.leakingTs = nowTs; return; } if (deltaQuota < 1) { // 腾出空间太小 就等下次,最小单位是1 return; } this.leftQuota += deltaQuota; // 漏斗剩余空间 = 漏斗剩余空间 + 腾出的空间 this.leakingTs = nowTs; if (this.leftQuota > this.capacity) { // 剩余空间不得高于容量 this.leftQuota = this.capacity; } } boolean watering(int quota) { makeSpace(); if (this.leftQuota >= quota) { // 判断剩余空间是否足够 this.leftQuota -= quota; return true; } return false; } } // 所有的漏斗 private Mapfunnels = new HashMap<>(); /** * @param capacity 漏斗容量 * @param leakingRate 漏嘴流水速率 quota/s */ public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) { String key = String.format("ip:%s", ipAddress); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return !funnel.watering(1); // 需要1个quota } public static void main(String[] args) throws Exception{ FunnelLimiterTest limiter = new FunnelLimiterTest(); for (int i = 1; i <= 50; i++) { // 每1s执行一次 Thread.sleep(1000); // 漏斗容量是2 ,漏嘴流水速率是0.5每秒, boolean isLimit = limiter.isIpLimit("222.73.55.22", 2, (float)0.5/1000); System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问")); } } }
执行结果
访问第1次, 结果:允许访问 # 第1次,容量剩余2,执行后1 访问第2次, 结果:允许访问 # 第2次,容量剩余1,执行后0 访问第3次, 结果:允许访问 # 第3次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0 访问第4次, 结果:限制访问 # 第4次,过了1s, 剩余空间小于1, 容量剩余0 访问第5次, 结果:允许访问 # 第5次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0 访问第6次, 结果:限制访问 # 以此类推... 访问第7次, 结果:允许访问 访问第8次, 结果:限制访问 访问第9次, 结果:允许访问 访问第10次, 结果:限制访问
我们观察 Funnel
对象的几个字段,我们发现可以将 Funnel
对象的内容按字段存储到一个 hash
结构中,灌水的时候将 hash
结构的字段取出来进行逻辑运算后,再将新值回填到 hash
结构中就完成了一次行为频度的检测。
但是有个问题,我们无法保证整个过程的原子性。从 hash
结构中取值,然后在内存里运算,再回填到 hash
结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。
如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?Redis-Cell
救星来了!
Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell
。该模块也使用了漏斗算法,并提供了原子的限流指令。
该模块只有1条指令cl.throttle
,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。
> cl.throttle key:xxx 15 30 60 1
15
: 15 capacity 这是漏斗容量
30 60
: 30 operations / 60 seconds 这是漏水速率
1
: need 1 quota (可选参数,默认值也是1)
> cl.throttle laoqian:reply 15 30 60 1) (integer) 0 # 0 表示允许,1表示拒绝 2) (integer) 15 # 漏斗容量capacity 3) (integer) 14 # 漏斗剩余空间left_quota 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒) 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle
指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep
即可,如果不想阻塞线程,也可以异步定时任务来重试。
关于“Redis中如何实现限流策略”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流