Redis6——入门介绍

x33g5p2x  于2021-10-25 转载在 Redis  
字(50.2k)|赞(0)|评价(0)|浏览(495)

本文章笔记整理来自黑马尚硅谷视频https://www.bilibili.com/video/BV1Rv41177Af,相关资料可以在评论区进行获取。

1.NoSQL数据库介绍

1.1.NoSQL数据库概述

(1)NoSQL(Not Only SQL),即“不仅仅是SQL”,它泛指非关系型的数据库。
(2)NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
(3)NoSQL特点:不遵循SQL标准、不支持ACID、远超于SQL的性能。

1.2.NoSQL适用场景

(1)对数据高并发的读写
(2)海量数据的读写
(3)对数据高可扩展性的

1.3.NoSQL不适用场景

(1)需要事务支持
(2)基于sql的结构化查询存储,处理复杂的关系,需要即席查询
(3)用不着sql的和用了sql也不行的情况

1.4.常见的NoSQL数据库

1.4.1.Memcache

(1)很早就出现的NoSql数据库
(2)数据都在内存中,一般不持久化
(3)支持简单的key-value模式,支持类型单一
(4)一般是作为缓存数据库辅助持久化的数据库

1.4.2.Redis

(1)几乎覆盖了Memcached的绝大部分功能
(2)数据都在内存中,支持持久化,主要用作备份恢复
(3)除了支持简单的key-value模式,还支持多种数据结构的存储,比如 list、set、hash、zset等。
(4)一般是作为缓存数据库辅助持久化的数据库

1.4.3.MongoDB

(1)高性能、开源、模式自由(schema free)的文档型数据库
(2)数据都在内存中, 如果内存不足,把不常用的数据保存到硬盘
(3)虽然是key-value模式,但是对value(尤其是json)提供了丰富的查询功能
(4)支持二进制数据及大型对象
(5)可以根据数据的特点替代RDBMS ,成为独立的数据库。或者配合RDBMS,存储特定的数据。

2.Redis介绍

2.1.Redis概述

(1)Redis是一个开源的key-value存储系统,其功能和Memcached类似,但是它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型等)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。
(2)Redis与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步
(3)Redis是单线程+多路I/O复用技术
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。

2.2.Redis应用场景

2.2.1.配合关系型数据库做高速缓存

(1)高频次,热门访问的数据,降低数据库I/O操作;
(2)分布式架构,做session共享;

2.1.2.多样的数据结构存储持久化数据

2.3.Redis安装

2.3.1.下载Redis安装包

(1)Redis官方网站:https://redis.io/
(2)Redis中文官方网站:http://www.redis.cn/
(3)下载安装包(下面演示安装的版本为6.2.1 for Linux)

2.3.2.安装步骤

(1)在Linux的/opt目录下准备好Redis的安装包(此处使用的是CentOS7.7)

(2)安装 C 语言的编译环境,即下载安装最新版的gcc编译器(如果需要卸载旧版本的gcc,可以参考这篇文章)

# 查看gcc版本命令
gcc --version
# 安装命令
yun install gcc

安装过程中出现的y/n选择,一律输入y即可。
(3)解压安装包

# 解压命令
tar -zxvf redis-6.2.1.tar.gz

(4)解压完成后进入目录redis-6.2.1,并使用make命令进行编译

cd redis-6.2.1/
make

(5)使用命令make install进行安装

make install

(6)验证Redis是否安装成功

# /usr/local/bin为安装目录
cd /usr/local/bin
ll

redis-benchmark性能测试工具
redis-check-aof修复有问题的AOF文件
redis-check-dump修复有问题的dump.rdb文件
redis-sentinelRedis集群使用
redis-serverRedis服务器启动命令
redis-cli客户端操作入口

2.4.Redis启动

2.4.1.前台启动(不推荐)

# 前台启动命令,其缺点在于命令行窗口不能关闭,否则服务器停止
redis-server

2.4.2.后台启动(推荐)

(1)备份redis.conf,拷贝一份/opt/redis-6.2.1/目录下的redis.conf到其他目录

(2)将后台启动设置daemonize no 改成 yes

(3)启动Redis

cd /usr/local/bin
# 启动Redis
redis-server /etc/redis.conf
# 查看Redis运行信息
ps -ef | grep redis

(4)用客户端访问Redis

redis-cli
# 测试验证
ping

(5)关闭Redis
① 使用命令shutdown进行关闭

shutdown

② 通过杀死Redis进程来关闭Redis

ps -ef | grep redis
# Redis进程ID需要先查出来
kill -9 9498

3.常用的五大数据类型

在了解Redis常用的五大数据类型之前,需要先知道一些于Redis键相关的命令,此外,想查看Redis更多的命令,可以参考http://doc.redisfans.com/

# 查看当前库所有key((也可以使用模糊查询,例如keys k*,即查询以字母k开头的键)
keys *

# 添加键值对
set <key> <value>

# 判断某个key是否存在
exists <key>

# 查看key的类型
type <key>

# 删除指定的key数据
del <key>

# 根据value选择非阻塞删除(仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作)
unlink <key>

# 为给定的key设置过期时间,单位为秒
expire <key> <sec>

# 查看key还有多少秒过期,-1表示永不过期,-2表示已过期
ttl <key>

# 切换数据库,Redis默认有16个数据库,类似于数组其下标从0开始,初始默认使用0号库
select <dbid>

# 查看当前数据库的key的数量
dbsize

# 清空当前库
flushdb

# 清空所有库
flushall

3.1.Redis字符串(String)

3.1.1.介绍

(1)String是Redis最基本的类型,可以理解成与Memcached一模一样的类型,一个key对应一个value。
(2)String类型是二进制安全的,这意味着Redis的string可以包含任何数据,比如jpg图片或者序列化的对象。
(3)String类型是Redis最基本的数据类型,一个Redis中字符串value的大小最大可以是512M。

3.1.2.常用命令

# 添加键值对(如果key存在,则覆盖原来的value,如果key不存在,则添加该键值对)
set <key> <value>

# 查询对应键值
get <key>

# 将给定的<value>追加到原值的末尾
append  <key> <value>

# 只有key不存在时,才能设置key的值
setnx  <key> <value>

# 获得key对应value值的长度
strlen  <key>

# 将key中储存的数字值增1(只能对数字值操作,如果为空,新增值为1)
incr <key>

# 将key中储存的数字值减1(只能对数字值操作,如果为空,新增值为-1)
decr <key>

#将key中储存的数字值增减,自定义步长
incrby / decrby <key> <步长>

# 同时设置一个或多个key-value对
mset  <key1> <value1> <key2> <value2> ......

# 同时获取一个或多个 value
mget <key1> <key2> <key3> .....。

# 同时设置一个或多个 key-value对,当且仅当所有给定key都不存在时才能设置成功(原子性,有一个失败则都失败)
msetnx <key1> <value1> <key2> <value2> ......

# 获得值的范围
getrange  <key> <起始位置> <结束位置>

# 用<value> 覆写<key>所储存的字符串值,从<起始位置>开始(索引从0开始)
setrange  <key> <起始位置> <value>

# 设置键值的同时,设置过期时间,单位为秒
setex  <key> <过期时间> <value>

# 以新换旧,设置了新值同时获得旧值
getset <key> <value>

3.1.3.数据结构

(1)String的数据结构为简单动态字符串(Simple Dynamic String,缩写为SDS),是可以修改的字符串,内部结构实现上似于Java中的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。

(2)如上图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。此外,需要注意的是字符串最大长度为512M。

3.2.Redis列表(List)

3.2.1.介绍

(1)Redis 列表是简单的字符串列表,按照插入顺序排序。可以添加一个元素到列表的头部(左边)或者尾部(右边),此外,Redis中的列表是单键多值的。
(2)列表的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。

3.2.2.常用命令

# 从左边/右边插入一个或多个值
lpush/rpush  <key> <value1> <value2> <value3> ......

# 从左边/右边弹出一个值
lpop/rpop  <key>

# 按照索引下标获得元素(从左到右),其顺序与插入时相反
lrange <key> <start> <stop>

# 从<key1>列表右边弹出一个值,插到<key2>列表左边
rpoplpush  <key1> <key2>

# 获取列表key中的所有值(0左边第一个,-1右边第一个,0-1表示所有)
lrange <key> 0 -1

# 按照索引下标获得元素(从左到右)
lindex <key> <index>

# 获得列表长度
llen <key>

# 在<value>的前面/后面插入<newvalue>
linsert <key> before/after <value> <newvalue>

# 从左边删除n个value(从左到右)
lrem <key> <n> <value>

# 将列表key下标为index的值替换成value
lset <key> <index> <value>

3.2.3.数据结构

(1)List的数据结构为快速链表quickList。
(2)首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表。它将所有的元素紧挨着一起存储,分配的是一块连续的内存。当数据量比较多的时候才会改成quicklist。因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。

(3)Redis将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。

3.3.Redis集合(Set)

3.3.1.介绍

(1)Redis集合对外提供的功能与List类似,都是一个列表的功能,但它的特殊之处在于Set是可以自动排重的,当需要存储一个列表数据,又不希望出现重复数据时,Set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是List所不能提供的。
(2)Redis的Set是string类型的无序集合,它底层其实是一个value为null的hash表,所以添加、删除、查找的复杂度都是O(1)。

3.3.2.常用命令

# 将一个或多个元素加入到集合<key>中,已经存在的元素将被忽略
sadd <key> <value1> <value2> ......

# 取出集合<key>的所有值
smembers <key>

# 判断集合<key>是否为含有该<value>值,有返回1,没有则返回0
sismember <key> <value>

# 返该集合<key>中的元素个数
scard <key>

# 删除集合<key>中的某个或多个元素
srem <key> <value1> <value2> ......

# 随机从集合<key>中吐出一个值
spop <key>

# 随机从该集合<key>中取出n个值(不会从集合中删除)
srandmember <key> <n>

# 把集合中一个值从一个集合移动到另一个集合
smove <source> <destination> value

# 返回两个集合的交集元素
sinter <key1> <key2>

# 返回两个集合的并集元素
sunion <key1> <key2>

# 返回两个集合的差集元素(属于key1但不属于key2)
sdiff <key1> <key2>

3.3.3.数据结构

(1)Set数据结构是dict字典,字典是用哈希表实现的。
(2)Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的Set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。

3.4.Redis哈希(Hash)

3.4.1.介绍

(1)Redis哈希是一个键值对集合。
(2)Redis哈希是一个string类型的field和value的映射表,hash特别适合用于存储对象,它类似Java里面的Map<String,Object>。
(3)用户ID为查找的key,存储的value用户对象包含姓名,年龄,生日等信息,如果用普通的key/value结构来存储,则主要有以下2种存储方式:
① 每次修改用户的某个属性需要,先反序列化改好后再序列化回去,开销较大。

② 用户ID数据冗余

(4)通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储据,也不会带来序列化和并发修改控制的问题。

3.4.2.常用命令

# 给<key>集合中的<field>键赋值<value>,例如:hset user:1001 id 1
hset <key> <field> <value>

# 从<key>集合的<field>取出value 
hget <key> <field>

# 批量设置hash的值
hmset <key1> <field1> <value1> <field2> <value2>......

# 查看哈希表<key>中,给定域field是否存在
hexists <key> <field>

# 列出该hash集合的所有field
hkeys <key>

# 列出该hash集合的所有value
hvals <key>

# 为哈希表<key>中的域field的值加上增量1/-1
hincrby <key> <field> <increment>

# 将哈希表key中的域field的值设置为value(当且仅当域field不存在)
hsetnx <key> <field> <value>

3.4.3.数据结构

Hash类型对应的数据结构是两种:ziplist(压缩列表)和hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。

3.5.Redis有序集合Zset(sorted set)

3.5.1.介绍

(1)Redis有序集合Zset与普通集合set非常相似,是一个没有重复元素的字符串集合。不同之处是有序集合的每个成员都关联了一个评分(score),这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。而集合的成员是唯一的,但是评分可以是重复了。
(2)因为元素是有序的,所以也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。访问有序集合的中间元素也是非常快的,因此能够使用有序集合作为一个没有重复成员的智能列表。

3.5.2.常用命令

# 将一个或多个元素及其score值加入到有序集<key>当中
zadd <key> <score1> <value1> <score2> <value2>......

# 返回有序集<key>中,下标在<start>~<stop>之间的元素(带WITHSCORES,可以让分数一起和值返回到结果集)
zrange <key> <start> <stop> [WITHSCORES]   

# 返回有序集<key>中,所有score值介于min和max之间(包括等于min或max)的成员,有序集成员按score值递增(从小到大)次序排列。
zrangebyscore <key> minmax [withscores] [limit offset count]

# 同上,改为从大到小排列。
zrevrangebyscore key maxmin [withscores] [limit offset count]

3.5.3.数据结构

(1)Zset是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String,Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
(2)Zset底层使用了两个数据结构:
① hash,其作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
② 跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。

4.Redis配置文件

之前自定义的配置文件目录为:/etc/redis.conf,下面将对部分配置进行介绍。

4.1.Units 单位

配置文件开头定义了一些基本的度量单位,Redis只支持bytes,不支持bit,并且对大小写不敏感

4.2.INCLUDES包含

类似于jsp中的include,多实例的情况可以把公用的配置文件提取出来。

4.3.网络相关配置

4.3.1.bind

默认配置为bind=127.0.0.1,这表示只能接受本机的访问请求。如果不写该配置,则表示无限制接受任何IP地址的访问。而在生产环境中肯定要写应用服务器的地址,且服务器是需要远程访问的,所以需要将其注释掉

4.3.2.protected-mode

protected-mode,即保护模式,如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应,此处需要将protected-mode设置no。

4.3.3.Port

Redis默认的端口号为6379,不需要修改。

4.3.4.tcp-backlog

设置tcp的backlog,backlog是一个连接队列,backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列。在高并发环境下需要一个高backlog值来避免慢客户端连接问题。注意Linux内核会将这个值减小到/proc/sys/net/core/somaxconn的值(128),所以需要确认增大/proc/sys/net/core/somaxconn和/proc/sys/net/ipv4/tcp_max_syn_backlog(128)两个值来达到想要的效果。

4.3.5.timeout

一个空闲的客户端维持多少秒会关闭,0表示关闭该功能,即永不关闭。

4.3.6.tcp-keepalive

对访问客户端的一种心跳检测,每个n秒检测一次,单位为秒,如果设置为0,则不会进行Keepalive检测,建议设置成60。

4.4.GENERAL 通用

4.4.1.daemonize

是否为后台进程,设置为yes(守护进程,后台启动)

4.4.2.pidfile

存放pid文件的位置,每个实例会产生一个不同的pid文件。

4.4.3.loglevel

指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice。

4.4.4.logfile

日志文件名称

4.4.5.databases

设定库的数量默认16,默认数据库为0,可以使用SELECT < dbid >命令在连接上指定数据库id。

5.Redis的发布和订阅

5.1.发布和订阅

(1)Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
(2)Redis 客户端可以订阅任意数量的频道。
客户端可以订阅频道如下图:

当给这个频道发布消息后,消息就会发送给订阅的客户端

5.2.发布订阅命令行实现

(1)打开一个客户端订阅channel1

SUBSCRIBE channel1

(2)打开另一个客户端,给channel1发布消息hello

publish channel1 hello

(3)打开第一个客户端可以看到发送的消息

6.Redis6新数据类型

6.1.Bitmaps

6.1.1.简介

现代计算机用二进制(位) 作为信息的基础单位, 1个字节等于8位, 例如“abc”字符串是由3个字节组成, 但实际在计算机存储时将其用二进制表示, “abc”分别对应的ASCII码分别是97、 98、 99, 对应的二进制分别是01100001、 01100010和01100011,如下图所示:

合理地使用操作位能够有效地提高内存使用率和开发效率,不过需要注意的是:
(1)Bitmaps本身不是一种数据类型, 实际上它就是字符串(key-value) ,但是它可以对字符串的位进行操作。
(2)Bitmaps单独提供了一套命令, 所以在Redis中使用Bitmaps和使用字符串的方法不太相同。 可以把Bitmaps想象成一个以位为单位的数组, 数组的每个单元只能存储0和1, 数组的下标在Bitmaps中叫做偏移量。

6.1.2.常用命令

6.1.2.1.setbit

(1)格式

# 设置Bitmaps中某个偏移量的值(0或1),且偏移量(offset)从0开始
setbit <key> <offset> <value>

(2)示例
每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。设置键的第offset个位的值(从0算起) ,假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图:

此外,需要注意以下几点:
① 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费,通常的做法是每次做setbit操作时将用户id减去这个指定数字。
② 在第一次初始化Bitmaps时,假如偏移量非常大,那么整个初始化过程执行会比较慢,可能会造成Redis的阻塞。

6.1.2.2.getbit

(1)格式

# 获取Bitmaps中某个偏移量的值
getbit <key> <offset>

(2)实例
获取id=8的用户是否在2021-01-01这天访问过, 返回0说明没有访问过:

6.1.2.3.bitcount

(1)格式

# 统计字符串从start字节到end字节比特值为1的数量
bitcount <key> [start end]

(2)示例
① 计算2021-01-01这天的独立访问用户数量

② start和end代表起始和结束字节数,下面操作计算用户id在第1个字节到第3个字节之间的独立访问用户数,对应的用户id是11,15,19。

6.1.2.4.bitop

(1)格式

# bitop是一个复合操作,它可以做多个Bitmaps的and(交集)、or(并集)、not(非)、xor(异或)操作并将结果保存在destkey中
bitop  and(or/not/xor) <destkey> [key…]

(2)示例
2020-11-03 日访问网站的userid=0,1,4,9。

2020-11-04 日访问网站的userid=1,2,5,9。

① 计算出两天都访问过网站的用户数量

# unique:users:and:20201104_03为目标Bitmaps
bitop and unique:users:and:20201104_03 unique:users:20201103 unique:users:20201104

② 计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种),可以使用or求并集

# unique:users:or:20201104_03为目标Bitmaps
bitop or unique:users:or:20201104_03 unique:users:20201103 unique:users:20201104

6.1.3.Bitmaps与set对比

(1)假设网站有1亿用户,每天独立访问的用户有5千万,如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表

数据类型每个用户id占用空间需要存储的用户量全部内存量
集合类型64位5000000064位/*50000000 = 400MB
Bitmaps1位1000000001位/*100000000 = 12.5MB

(2)很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存还是非常可观的。

数据类型一天一个月一年
集合类型400MB12GB144GB
Bitmaps12.5MB375MB4.5GB

(3)但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) ,那么两者的对比如下表所示, 很显然这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

数据类型每个userid占用空间需要存储的用户量全部内存量
集合类型64位10000064位/*100000 = 800KB
Bitmaps1位1000000001位/*100000000 = 12.5MB

6.2.HyperLogLog

6.2.1.简介

(1)在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。
(2)解决基数问题有很多种方案:
① 数据存储在MySQL表中,使用distinct count计算不重复个数
② 使用Redis提供的hash、set、bitmaps等数据结构来处理
以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,对于非常大的数据集是不切实际的。
(3)能否能够降低一定的精度来平衡存储空间?Redis推出了HyperLogLog。Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的
(4)在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
(5)什么是基数?比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8},基数(不重复元素)为5。基数估计就是在误差可接受的范围内,快速计算基数。

6.2.2.常用命令

6.2.2.1.pfadd

(1)格式

# 添加指定元素到HyperLogLog中
pfadd <key> <element> [element...]

(2)示例
将所有元素添加到指定HyperLogLog数据结构中,如果执行命令后HLL估计的近似基数发生变化,则返回1,否则返回0。

6.2.2.2.pfcount

(1)格式

# 计算HLL的近似基数,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可
pfcount <key> [key ...]

(2)示例

6.2.2.3.pfmerge

(1)格式

# 将一个或多个HLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得
pfmerge <destkey> <sourcekey> [sourcekey ...]

(2)示例

6.3.Geospatial

6.3.1.简介

Redis 3.2中增加了对GEO类型的支持。GEO即Geographic,是地理信息的缩写。该类型就是元素的二维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置查询、范围查询、距离查询、经纬度Hash等常见操作。

6.3.2.常用命令

6.3.2.1.geoadd

(1)格式

# 添加地理位置(经度,纬度,名称)
geoadd <key> <longitude> <latitude> <member> [longitude latitude member...]

(2)示例

此外,需要注意以下几点:
① 两极无法直接添加,一般会下载城市数据,直接通过 Java 程序一次性导入。
② 有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。
③ 当坐标位置超出指定范围时,该命令将会返回一个错误。
④ 已经添加的数据,是无法再次往里面添加的。

6.3.2.2.geopos

(1)格式

# 获得指定地区的坐标值
geopos <key> <member> [member...]

(2)示例

6.3.2.3.geodist

(1)格式

# 获取两个位置之间的直线距离
# 单位:
# m:表示单位为米[默认值]
# km:表示单位为千米
# mi:表示单位为英里
# ft:表示单位为英尺
如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位
geodist <key> <member1> <member2>  [m|km|ft|mi ]

(2)示例

6.3.2.4.georadius

(1)格式

# 以给定的经纬度为中心,找出某一半径内的元素
georadius <key> <longitude> <latitude> radius  m|km|ft|mi

(2)示例

7.Jedis

7.1.Jedis介绍

Jedis是Redis官方推荐的Java连接开发工具。

7.2.Jedis常用操作

7.2.1.连接测试

(1)在IDEA中创建一个名为jedis_redisdemo的maven工程

(2)在pom.xml中导入相关依赖

<dependencies>
	<!--Jedis-->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.2.0</version>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

(3)测试代码如下:

package com.atguigu.jedis;

import redis.clients.jedis.Jedis;

public class JedisDemo1{
    public static void main(String[] args) {
        /* 创建Jedis对象 (1)."192.168.88.100":主机IP地址 (2).6379:Redis端口号 * */
        Jedis jedis = new Jedis("192.168.88.100",6379);
        //测试连接
        String value = jedis.ping();
        //输出PONG则表示连接成功
        System.out.println(value);
    }
}

(4)连接Redis的注意事项:
① 在Redis配置文件redis.conf中,要注释掉bind 127.0.0.1,并且将protected-mode设置为no。
② 如果Linux中的防火墙处于开启状态,也可能连接不成功

此时关闭防火墙,命令如下:

systemctl stop firewalld

7.2.2.操作相关数据类型

(1)Jedis-API:Key

@Test
public void keyDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);

    jedis.set("k1", "v1");
    jedis.set("k2", "v2");
    jedis.set("k3", "v3");
    Set<String> keys = jedis.keys("*");
    System.out.println(keys.size());
    for (String key : keys) {
        System.out.println(key);
    }
    
    System.out.println(jedis.exists("k1"));
    System.out.println(jedis.ttl("k1"));
    System.out.println(jedis.get("k1"));
    //关闭Jedis
    jedis.close();
}

(2)Jedis-API:String

@Test
public void stringDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);
    jedis.mset("str1","v1","str2","v2","str3","v3");
    System.out.println(jedis.mget("str1","str2","str3"));
    //关闭Jedis
    jedis.close();
}

(3)Jedis-API:List

@Test
public void listDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);
    jedis.lpush("mylist","v1","v2","v3","v4");
    List<String> list = jedis.lrange("mylist",0,-1);
    for (String element : list) {
        System.out.println(element);
    }
    //关闭Jedis
    jedis.close();
}

(4)Jedis-API:Set

@Test
public void setDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);
    jedis.sadd("orders", "order01");
    jedis.sadd("orders", "order02");
    jedis.sadd("orders", "order03");
    jedis.sadd("orders", "order04");
    Set<String> smembers = jedis.smembers("orders");
    jedis.srem("orders", "order02");
    for (String order : smembers) {
        System.out.println(order);
    }
    //关闭Jedis
    jedis.close();
}

(5)Jedis-API:Hash

@Test
public void hashDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);
    
    jedis.hset("hash1","userName","lisi");
    System.out.println(jedis.hget("hash1","userName"));
    Map<String,String> map = new HashMap<String,String>();
    map.put("telphone","13810169999");
    map.put("address","atguigu");
    map.put("email","abc@163.com");
    jedis.hmset("hash2",map);
    List<String> result = jedis.hmget("hash2", "telphone","email");
    for (String element : result) {
        System.out.println(element);
    }
    //关闭Jedis
    jedis.close();
}

(6)Jedis-API:Zset

@Test
public void zsetDemo(){
    //创建Jedis对象
    Jedis jedis = new Jedis("192.168.88.100",6379);
    jedis.zadd("zset01", 100d, "z3");
    jedis.zadd("zset01", 90d, "l4");
    jedis.zadd("zset01", 80d, "w5");
    jedis.zadd("zset01", 70d, "z6");

    Set<String> zrange = jedis.zrange("zset01", 0, -1);
    for (String e : zrange) {
        System.out.println(e);
    }
	//关闭Jedis
    jedis.close();
}

7.3.Jedis实例——手机验证码

7.3.1.功能需求

(1)输入手机号,点击发送后随机生成6位数字码,2分钟内有效;
(2)输入验证码,点击验证,返回成功或失败;
(3)每个手机号每天只能输入3次;

7.3.2.功能实现

package com.atguigu.jedis;

import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.util.Random;

public class PhoneCode {
    
    //模拟验证码发送
    @Test
    public void testVerifyCode(){
        //模拟向手机号为139731795的手机发送验证码
        sendCode("139731795");
    }
    
    //模拟验证码校验
    @Test
    public void testSendCode(){
    	//当向手机号为139731795的手机发送验证码后,可在Redis中查询其验证码,并带回verifyCode()中进行验证
        verifyCode("139731795","609706");
    }
    
    //1.随机生成6位数字验证码
    public String getCode(){
        String code = "";
        Random random = new Random();
        for(int i=-0;i<6;i++){
            //nextInt(int num):随机返回一个值在[0,num)的int类型的整数
            int rand = random.nextInt(10);
            //将生成的数字拼接到code尾部
            code+=rand;
        }
        return code;
    }
    
    //2.每个手机每天只能发送三次,验证码放到redis中,设置过期时间120s
    public void sendCode(String phone) {
        //连接redis
        Jedis jedis = new Jedis("192.168.88.100",6379);
        
        //拼接key
        //手机发送次数key
        String countKey = "VerifyCode"+phone+":count";
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";
        
        //每个手机每天只能发送三次
        String count = jedis.get(countKey);
        if(count == null) {
            //没有发送次数,第一次发送
            //设置发送次数是1
            jedis.setex(countKey,24*60*60,"1");
        } else if(Integer.parseInt(count)<=2) {
            //发送次数+1
            jedis.incr(countKey);
        } else if(Integer.parseInt(count)>2) {
            //发送三次,不能再发送
            System.out.println("今天发送次数已经超过三次");
            jedis.close();
            return;
        }
        
        //发送验证码放到redis里面
        String vcode = getCode();
        jedis.setex(codeKey,120,vcode);
        //关闭Jedis
        jedis.close();
    }
    
    //3.校验验证码
    public void verifyCode(String phone,String code) {
        //从redis获取验证码
        Jedis jedis = new Jedis("192.168.88.100",6379);
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";
        String redisCode = jedis.get(codeKey);
        //判断
        if(redisCode.equals(code)) {
            System.out.println("成功");
        }else{
            System.out.println("失败");
        }
        //关闭Jedis
        jedis.close();
    }
}

8.Redis与SpringBoot整合

8.1.整合步骤

(1)在IDEA中创建一个SpringBoot工程

(2)在pom.xml中导入相关依赖

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.0</version>
</dependency>

(3)在SpringBoot全局配置文件application.properties中进行相关配置

# Redis服务器地址(根据实际情况进行配置)
spring.redis.host=192.168.88.100
# Redis服务器连接端口
spring.redis.port=6379
# Redis数据库索引(默认为0)
spring.redis.database= 0
# 连接超时时间(毫秒)
spring.redis.timeout=1800000
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
# 最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

(4)创建配置类RedisConfig(写法较为固定)

package com.atguigu.redis_springboot.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

8.2.测试

(1)编写控制器方法进行测试

package com.atguigu.redis_springboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/redisTest")
public class RedisController {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @GetMapping()
    public String testRedis(){
        //向Redis中添加键值对
        redisTemplate.opsForValue().set("name","lucy");
        String name = (String)redisTemplate.opsForValue().get("name");
        return name;
    }
}

(2)在SpringBoot启动类中启动SpringBoot,然后在浏览器地址栏输入http://localhost:8080/redisTest进行测试,结果如下:

9.Redis事务

9.1.Redis事务简介

(1)Redis事务定义
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。Redis事务的主要作用就是串联多个命令防止别的命令插队。
(2)Redis事务的三特性
① 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
② 没有隔离级别的概念:队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。
③ 不保证原子性:事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚操作。

9.2.Redis事务相关命令

(1)Redis事务相关命令有multiexecdiscard
(2)从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。组队的过程可以通过discard来放弃组队。

(3)下面看一下案例

9.3.事务的错误处理

Redis事务的错误处理一般分为以下两种情况:
(1)组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。

(2)执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

9.4.事务冲突问题

9.4.1.例子

场景:有2个人有你的账户(账户中有10000元),你们3人同时去参加双十一抢购,并且每人分别准备花费8000元、5000元、1000元,如果对该过程不加控制,则会出现以下情况(假设):

9.4.2.悲观锁

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

9.4.3.乐观锁

(1)乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

(2)下面演示乐观锁

# 对一个或多个key进行监视,如果在事务执行之前被监视的key被其他命令所修改,那么事务将被打断
watch key [key...]
# 取消watch命令对所有key的监视,如果在执行watch命令之后,exec命令或discard命令先被执行了的话,那么就不需要再执行unwatch命令了
unwatch

10.Redis事务——秒杀案例

10.1.简单秒杀

前端秒杀页面

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>Insert title here</title>
	</head>
<body>
	<h1>iPhone 13 Pro !!!  1元秒杀!!!</h1>

	<form id="msform" action="${pageContext.request.contextPath}/doseckill" enctype="application/x-www-form-urlencoded">
		<input type="hidden" id="prodid" name="prodid" value="0101">
		<input type="button" id="miaosha_btn" name="seckill_btn" value="秒杀点我"/>
	</form>

</body>
	<script type="text/javascript" src="${pageContext.request.contextPath}/script/jquery/jquery-3.1.0.js"></script>
	<script type="text/javascript"> $(function(){ $("#miaosha_btn").click(function(){ var url=$("#msform").attr("action"); $.post(url,$("#msform").serialize(),function(data){ if(data=="false"){ alert("抢光了" ); $("#miaosha_btn").attr("disabled",true); } } ); }) }) </script>
</html>

后端核心代码

package com.atguigu;

import java.io.IOException;

import redis.clients.jedis.Jedis;

public class SecKill_redis {
	//秒杀过程
	public static boolean doSecKill(String uid,String prodid) throws IOException {
		//1.uid和prodid非空判断
		if(uid == null || prodid == null) {
			return false;
		}

		//2.连接redis
		Jedis jedis = new Jedis("192.168.88.100",6379);
		
		//3.拼接key
		//3.1 库存key
		String kcKey = "sk:"+prodid+":qt";
		//3.2 秒杀成功用户key
		String userKey = "sk:"+prodid+":user";

		//4.获取库存,如果库存null,秒杀还没有开始
		String kc = jedis.get(kcKey);
		if(kc == null) {
			System.out.println("秒杀还没有开始,请等待");
			jedis.close();
			return false;
		}

		// 5.判断用户是否重复秒杀操作
		if(jedis.sismember(userKey, uid)) {
			System.out.println("已经秒杀成功了,不能重复秒杀");
			jedis.close();
			return false;
		}

		//6.判断如果商品数量,库存数量小于1,秒杀结束
		if(Integer.parseInt(kc)<=0) {
			System.out.println("秒杀已经结束了");
			jedis.close();
			return false;
		}

		//7.秒杀过程
		//7.1 库存-1
		jedis.decr(kcKey);
		//7.2 把秒杀成功用户添加清单里面
		jedis.sadd(userKey,uid);

		System.out.println("秒杀成功了..");
		jedis.close();
		return true;
	}
}

在Redis中将库存设置为10

set sk:0101:qt 10

测试结果如下:

10.2.秒杀并发模拟

10.2.1.安装工具ab

# 安装工具ab
yum install httpd-tools
# 查看帮助手册
ab --help

10.2.2.使用ab进行测试

# 测试命令(需要提前在/opt目录下准备好postfile文件,里面的内容是"prodid=0101&",模拟表单提交参数,以&符号结尾)
ab -n 1000 -c 100 -p ~/opt/postfile -T application/x-www-form-urlencoded http://192.168.88.1:8080/Seckill/doseckill

此次并发模拟至少出现了以下两种错误现象:

需要注意的是,在使用ab工具进行模拟时,可能会出现连接失败或超时的现象,其主要可能有以下几点:
① windows中的防火墙功能处于开启状态,可能会拦截Linux的请求;
② 模拟的请求数量过多,从而导致超时(配置Jedis连接池可以在一定程度上解决该问题);

10.2.3.配置Jedis连接池

(1)连接池工具类的代码如下:

package com.atguigu;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

//节省每次连接redis服务带来的消耗,把连接好的实例反复利用
public class JedisPoolUtil {
	private static volatile JedisPool jedisPool = null;

	private JedisPoolUtil() {
	}
	
	//采用单例模式
	public static JedisPool getJedisPoolInstance() {
		if (null == jedisPool) {
			synchronized (JedisPoolUtil.class) {
				if (null == jedisPool) {
					JedisPoolConfig poolConfig = new JedisPoolConfig();
					poolConfig.setMaxTotal(200);
					poolConfig.setMaxIdle(32);
					poolConfig.setMaxWaitMillis(100*1000);
					poolConfig.setBlockWhenExhausted(true);
					poolConfig.setTestOnBorrow(true);  // ping PONG
				 
					jedisPool = new JedisPool(poolConfig, "192.168.88.100", 6379, 60000 );
				}
			}
		}
		return jedisPool;
	}

	public static void release(JedisPool jedisPool, Jedis jedis) {
		if (null != jedis) {
			jedisPool.returnResource(jedis);
		}
	}
}

连接池参数如下:

MaxTotal控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted
maxIdle控制一个pool最多有多少个状态为idle(空闲)的jedis实例
MaxWaitMillis表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException
testOnBorrow获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的

(2)修改后端连接Redis的代码如下:

//2.连接redis
//Jedis jedis = new Jedis("192.168.88.100",6379);
//通过连接池得到jedis对象
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();

10.3.解决超卖问题

(1)这里可以考虑使用前面讲到的乐观锁来解决超卖问题。

使用乐观锁后的代码如下:

package com.atguigu;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.LoggerFactory;

import ch.qos.logback.core.rolling.helper.IntegerTokenConverter;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.Transaction;

public class SecKill_redis {
	
	public static void main(String[] args) {
		Jedis jedis =new Jedis("192.168.44.168",6379);
		System.out.println(jedis.ping());
		jedis.close();
	}
	
	//秒杀过程
	public static boolean doSecKill(String uid,String prodid) throws IOException {
		//1.uid和prodid非空判断
		if(uid == null || prodid == null) {
			return false;
		}
		
		//2.连接redis
		//Jedis jedis = new Jedis("192.168.44.168",6379);
		//通过连接池得到jedis对象
		JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis = jedisPoolInstance.getResource();
		
		//3.拼接key
		//3.1.库存key
		String kcKey = "sk:"+prodid+":qt";
		//3.2.秒杀成功用户key
		String userKey = "sk:"+prodid+":user";
		
		//监视库存
		jedis.watch(kcKey);
		
		//4.获取库存,如果库存null,秒杀还没有开始
		String kc = jedis.get(kcKey);
		if(kc == null) {
			System.out.println("秒杀还没有开始,请等待");
			jedis.close();
			return false;
		}
		
		//5.判断用户是否重复秒杀操作
		if(jedis.sismember(userKey, uid)) {
			System.out.println("已经秒杀成功了,不能重复秒杀");
			jedis.close();
			return false;
		}
		
		//6.判断如果商品数量,库存数量小于1,秒杀结束
		if(Integer.parseInt(kc)<=0) {
			System.out.println("秒杀已经结束了");
			jedis.close();
			return false;
		}
		
		//7.秒杀过程
		//使用事务
		Transaction multi = jedis.multi();
		
		//组队操作
		multi.decr(kcKey);
		multi.sadd(userKey,uid);
		
		//执行
		List<Object> results = multi.exec();
		
		if(results == null || results.size()==0) {
			System.out.println("秒杀失败了....");
			jedis.close();
			return false;
		}
		
		//7.1 库存-1
		//jedis.decr(kcKey);
		//7.2 把秒杀成功用户添加清单里面
		//jedis.sadd(userKey,uid);
		
		System.out.println("秒杀成功了..");
		jedis.close();
		return true;
	}
}

10.4.解决库存遗留问题

10.4.1.问题演示

(1)之前的库存都是设置为10,现在将库存设置为500。

flushdb
set sk:0101:qt 500

(2)再次进行测试

# 测试命令
ab -n 2000 -c 300 -p ~/opt/postfile -T application/x-www-form-urlencoded http://192.168.88.1:8080/Seckill/doseckill

(3)库存遗留问题是由乐观锁导致的,因为

10.4.2.问题解决

(1)Lua脚本
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
(2)Lua脚本在Redis中的优势:
① 将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数,提升性能。
② LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
③ 利用Lua脚本淘汰用户,解决超卖问题。
④ Redis2.6版本以后,通过Lua脚本解决争抢问题,实际上是Redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

(3)将Lua对应脚本嵌入到java代码中:

package com.atguigu;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.LoggerFactory;

import ch.qos.logback.core.joran.conditional.ElseAction;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.Transaction;

public class SecKill_redisByScript {
	
	private static final  org.slf4j.Logger logger =LoggerFactory.getLogger(SecKill_redisByScript.class) ;

	public static void main(String[] args) {
		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
 
		Jedis jedis=jedispool.getResource();
		System.out.println(jedis.ping());
		
		Set<HostAndPort> set=new HashSet<HostAndPort>();

	// doSecKill("201","sk:0101");
	}
	
	static String secKillScript ="local userid=KEYS[1];\r\n" + 
			"local prodid=KEYS[2];\r\n" + 
			"local qtkey='sk:'..prodid..\":qt\";\r\n" + 
			"local usersKey='sk:'..prodid..\":usr\";\r\n" + 
			"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + 
			"if tonumber(userExists)==1 then \r\n" + 
			" return 2;\r\n" + 
			"end\r\n" + 
			"local num= redis.call(\"get\" ,qtkey);\r\n" + 
			"if tonumber(num)<=0 then \r\n" + 
			" return 0;\r\n" + 
			"else \r\n" + 
			" redis.call(\"decr\",qtkey);\r\n" + 
			" redis.call(\"sadd\",usersKey,userid);\r\n" + 
			"end\r\n" + 
			"return 1" ;
			 
	static String secKillScript2 = 
			"local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
			" return 1";

	public static boolean doSecKill(String uid,String prodid) throws IOException {

		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis=jedispool.getResource();

		 //String sha1= .secKillScript;
		String sha1=  jedis.scriptLoad(secKillScript);
		Object result= jedis.evalsha(sha1, 2, uid,prodid);

		  String reString=String.valueOf(result);
		if ("0".equals( reString )  ) {
			System.err.println("已抢空!!");
		}else if("1".equals( reString )  )  {
			System.out.println("抢购成功!!!!");
		}else if("2".equals( reString )  )  {
			System.err.println("该用户已抢过!!");
		}else{
			System.err.println("抢购异常!!");
		}
		jedis.close();
		return true;
	}
}

11.Redis持久化——RDB

11.1.RDB介绍

(1)RDB(Redis DataBase),即在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。其它的具体细节可以查看中文官网:http://www.redis.cn/topics/persistence.html

(2)Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何I/O操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

(3)RDB的优点如下:
① 适合大规模的数据恢复
② 对数据完整性和一致性要求不高更适合使用
③ 节省磁盘空间
④ 恢复速度快

(4)RDB的缺点如下:
① fork()的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
② 虽然Redis在fork()时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
③ 在备份周期在一定间隔时间做一次备份,所以如果Redis出现异常不能工作的话,就会丢失最后一次快照后的所有修改。

(5)RDB默认是开启的,动态停止RDB的命令为:

# save后给空值,表示禁用保存策略
redis-cli config set save ""

(6)小总结

11.2.RDB持久化流程

11.3.fork()

(1)fork()的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。
(2)在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术”。
(3)一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

11.4.RDB相关配置

11.4.1.dump.rdb快照文件

(1)dump.rdb是由Redis服务器自动生成的,在默认情况下,每隔一段时间Redis服务器程序会自动对数据库做一次遍历,把内存快照写在一个叫做“dump.rdb”的文件里,该文件名称可以在Redis配置文件redis.conf中进行查看和配置。

(2)dump.rdb文件的保存路径默认为Redis启动时命令行所在的目录下。

11.4.2.快照配置

(1)快照保存策略

(2)stop-writes-on-bgsave-error
当Redis无法写入磁盘的话,直接关掉Redis的写操作,推荐设置为yes。

(3)rdbcompression——压缩文件
对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果不想消耗CPU来进行压缩的话,可以设置为关闭此功能,推荐设置为yes。

(4)dbchecksum——检查完整性
在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能,推荐设置为yes。

11.4.3.快照配置命令

(1)save
只管保存,其它不管。当出现全部阻塞时,需要手动保存,不建议使用。
(2)gbsave
Redis会在后台异步进行快照操作,同时还可以响应客户端请求。
(3)lastsave
获取最后一次成功执行快照的时间。

12.Redis持久化——AOF

12.1.AOF介绍

(1)AOF(Append Only File),即以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,Redis启动之初会读取该文件重新构建数据,换言之,Redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
(2)AOF默认不开启,需要手动开启(在配置文件redis.conf中将"appendonly no"中的"no"改为"yes"即可),此外AOF文件名默认为appendonly.aof,其保存路径与RDB文件的路径一致。
(3)当AOF和RDB同时开启时,系统默认取AOF的数据(数据不会存在丢失)

(4)AOF的优点如下:
① 备份机制更稳健,丢失数据概率更低。
② 可读的日志文本,通过操作AOF稳健,可以处理误操作。

(5)AOF的缺点如下:
① 比起RDB占用更多的磁盘空间。
② 恢复备份速度要慢。
③ 每次读写都同步的话,有一定的性能压力。
④ 存在个别不能Bug恢复的情况。

(6)小总结

12.2.AOF持久化流程

(1)客户端的请求写命令会被append追加到AOF缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always、everysec、no]将操作sync同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

12.3.AOF恢复

(1)AOF的备份机制和性能虽然和RDB不同,但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。
(2)正常恢复
① 将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)
② 重启Redis,然后重新加载即可正常恢复
(3)异常恢复
① 如果遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof–fix appendonly.aof进行恢复
② 备份被写坏的AOF文件
③ 重启Redis,然后重新加载即可正常恢复

12.4.AOF与RDB的选择

(1)官方推荐两个都启用。
(2)如果对数据不敏感,可以选单独用RDB。
(3)不建议单独用 AOF,因为可能会出现Bug。
(4)如果只是做纯内存缓存,可以都不用。

13.Redis主从复制

13.1.概述

(1)主从复制:主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,其中,Master以写为主,Slave以读为主。
(2)功能:
① 读写分离,性能扩展;
② 容灾快速恢复;

13.2.实现

下面实现的是一主两从的情况,一主三从等其它情况可以参考下面的步骤自行实现。
(1)创建/myredis文件夹

cd /myredis

(2)将/etc目录下的Redis配置文件redis.con复制到/myredis目录下

cp /etc/redis.conf /myredis/redis.conf

(3)关闭/myredis/redis.conf中的AOF,即将 appendonly 设置为no

(4)配置一主两从,创建三个配置文件(redis6379.conf、redis6380.conf、redis6381.conf)

# 在/myredis目录下新建redis6379.conf
vim redis6379.conf

# 然后在redis6379.conf中写入以下内容
include /myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb

# 为了方便起见,直接将redis6379.conf复制两份,并分别改名为redis6380.conf和redis6381.conf,并且修改其中的内容
include /myredis/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb

include /myredis/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb

(5)启动三台Redis服务器,并查看其运行情况

redis-server redis6379.conf 
redis-server redis6380.conf 
redis-server redis6381.conf

redis-cli -p 端口号
info replication

(6)配从(库)不配主(库)

# 当前服务器成为某个实例的从服务器
slaveof <ip> <port>

# 在6380和6381上执行下面命令(127.0.0.1为主服务器的ip,6379为其端口号)
slaveof 127.0.0.1 6379

再次查看主机运行情况,此时发现,端口号为6379的主机已经有了两台从机(6380和6381)

(7)测试
① 在主机上写数据,然后在从机上可以查看
② 在从机上不能写数据

13.3.复制原理

(1)主从复制过程
① 当从连接上主服务器之后,从服务器向主服务发送进行数据同步消息;
② 主服务器接到从服务器发送过来同步消息,把主服务器数据进行持久化,把新的rdb文件发送从服务器,从服务器拿到rdb进行读取;
③ 每次主服务器进行写操作之后,和从服务器进行数据同步。

(2)复制延迟
由于所有的写操作都是先在master上操作,然后同步更新到slave上,所以从master同步到slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,slave机器数量的增加也会使这个问题更加严重。

13.4.三种特殊情况

13.4.1.一主二仆

(1)主机如果宕机,重启后会自动恢复到之前的状态,不需要再做其它任何的修改,当再次写数据时,从机仍然可以读取到数据。
(2)从机如果宕机,再次重启后会断开其与之前主机的联系,自己成为一台独立的主机,此时需要使用 slaveof 命令再次声明所属主机,声明之后可以再次读取数据。

13.4.2.薪火相传

上一个slave可以是下一个slave的master,slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master,可以有效减轻master的写压力,去中心化降低风险。但缺点也是非常明显的,一旦某个从机宕机,后面的从机都无法备份。

13.4.3.反客为主

当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。使用命令 slaveof no one 可以将从机变为主机。

13.5.哨兵模式

13.5.1.介绍

哨兵模式可以理解为反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

13.5.2.使用步骤

(1)在/myredis目录下新建sentinel.conf文件(名字不能错)

cd /myreddis
vim sentinel.conf

# 在sentinel.conf中配置哨兵
sentinel monitor mymaster 127.0.0.1 6379 1
# 其中mymaster为监控对象起的服务器名称,1为至少有多少个哨兵同意迁移的数量

(2)启动哨兵

# 启动命令
redis-sentinel  /myredis/sentinel.conf

(3)测试
① 模拟主机(端口号为6379)宕机,即使用命令shutdown关闭主机。
② 稍等片刻,观察哨兵打印的信息:

③ 观察从机6381的运行信息,此时其主机从6379变为6380了

④ 当再次重启6379时,它会变成6380的从机

13.5.4.故障恢复

(1)优先级在配置文件redis.conf中默认为replica-priority 100,值越小优先级越高;
(2)偏移量是指对原主机数据同步多少的度量;
(3)每个redis实例启动后都会随机生成一个40位的runid;
此外,在Java代码中识别主从的示例如下:

private static JedisSentinelPool jedisSentinelPool=null;

public static  Jedis getJedisFromSentinel(){
	if(jedisSentinelPool==null){
	    Set<String> sentinelSet=new HashSet<>();
	    sentinelSet.add("192.168.11.103:26379");
	
	    JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
	    jedisPoolConfig.setMaxTotal(10); //最大可用连接数
		jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
		jedisPoolConfig.setMinIdle(5); //最小闲置连接数
		jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
		jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
		jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
		
		jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
		return jedisSentinelPool.getResource();
	}else{
		return jedisSentinelPool.getResource();
    }
}

14.Redis集群

14.1.概述

(1)问题引入
当Redis容量不够时,如何进行扩容?对于并发写操作, Redis如何分摊?另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是无中心化集群配置
(2)集群定义
① Redis 集群就是实现了对 Redis 的水平扩容,即启动N个Redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
② Redis 集群通过分区(partition)来提供一定程度的可用性(availability),即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
(3)集群优缺点
优点: 实现扩容、分摊压力、无中心配置相对简单
缺点:
① 多键操作实现起来比较复杂;
② 多键的Redis事务是不被支持的;
③ Lua脚本也不被支持;
④ 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大;

14.2.搭建集群

注:由于实际情况的限制,下面的搭建只是一个模拟效果!
(1)使用上面主从复制的目录/myredis,先删除该目录下所有的rdb文件(防止对后面的步骤产生干扰)

# 删除当前目录下所有以dump63开头的文件
rm -rf dump63*

(2)修改redis6379.conf中的内容

# 将redis6379.conf中的内容改成下面的(注释不用写)
include /myredis/redis.conf
pidfile "/var/run/redis_6379.pid"
port 6379
dbfilename "dump6379.rdb"

cluster-enabled yes   # 打开集群模式
cluster-config-file nodes-6379.conf  # 设定节点配置文件名
cluster-node-timeout 15000  # 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。

(3)删除redis6380.conf和redis6381.conf

rm -rf redis6380.conf
rm -rf redis6381.conf

(4)将redis6379.conf文件复制5份到当前目录,分别取名为redis6380.conf、redis6381.conf、redis6389.conf、redis6390.conf、redis6391.conf

(5)修改上面新复制的5份文件,这里以redis6380.conf举例,将该文件内容中的6379批量改为6380,具体命令为 %s/6379/6380 (其余的4份文件按照这个规则进行修改即可)

%s/6379/6380
%s/6379/6381
%s/6379/6389
%s/6379/6390
%s/6379/6391

(6)启动这6个redis服务

redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf
redis-server redis6389.conf
redis-server redis6390.conf
redis-server redis6391.conf

(7)将这6个节点合成一个集群
① 在组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

② 进入到/opt/redis-6.2.1/src/目录

cd /opt/redis-6.2.1/src/

在/opt/redis-6.2.1/src/目录下执行以下命令

# --replicas 1:采用最简单的方式配置集群,一台主机,一台从机,正好三组。
# 192.168.88.100为本机的IP地址
redis-cli --cluster create --cluster-replicas 1 192.168.88.100:6379 192.168.88.100:6380 192.168.88.100:6381 192.168.88.100:6389 192.168.88.100:6390 192.168.88.100:6391

(8)测试

# 采用集群策略连接(-c),端口号可以是6379、6380、6381中的任何一个
redis-cli -c -p 6379
# 查看集群信息
cluster nodes

14.3.集群操作

(1)Redis Cluster 如何分配这六个节点?
一个集群至少要有三个主节点,选项 --cluster-replicas 1 表示希望为集群中的每个主节点创建一个从节点。分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。
(2)什么是slots?

① 一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽的其中一个。集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
② 集群中的每个节点负责处理一部分插槽。 例如,上面搭建的集群有3个主节点, 那么节点 6379 负责处理 0 号至 5460 号插槽、节点 6380 负责处理 5461 号至 10922 号插槽、节点 6381 负责处理 10923 号至 16383 号插槽。

(3)在集群中写入值
① 一次写入一个key

② 一次写入多个key
由于不在一个slot下的键值,是不能直接使用mget、mset等多键操作。不过可以通过{ }来定义组的概念,从而使key中{ }内相同内容的键值对放到一个slot中去。

(4)查询集群中的值

# 返回count个slot槽中的键
cluster getkeysinslot <slot> <count>

14.4.故障恢复

(1)当三台主机中的某一台(例如主机A)挂掉后,其对应的从机(例如从机a)会顶替该主机的位置,成为一个新的主机,而当原来挂掉的主机A重启后,它会变成a的从机。
(2)如果某一段插槽的主从都挂掉,而配置文件redis.conf中的参数cluster-require-full-coverage为yes ,那么 整个集群都挂掉;如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么该插槽数据全都不能使用,也无法存储。

14.5.集群的Jedis开发

即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。无中心化主从集群,即无论从哪台主机写的数据,其他主机上都能读到数据。

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.88.100",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

15.Redis应用问题解决

15.1.缓存穿透

15.1.1.问题描述

缓存穿透是指用户不断发起请求的数据在缓存和数据库中都没有,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,该攻击会导致数据库压力过大。

15.1.2.解决方案

(1)对空值缓存:
如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,一般最长不超过五分钟。
(2)设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3)采用布隆过滤器:
布隆过滤器(Bloom Filter)是1970年由布隆提出的,它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
(4)进行实时监控:
当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。

15.2.缓存击穿

15.2.1.问题描述

缓存击穿是指缓存中没有但数据库中有的数据(一般是数据的缓存时间到期),这时由于并发请求特别多,同时读缓存没读到数据,又同时去数据库去取数据,造成数据库压力瞬间增大。

15.2.2.解决方案

(1)预先设置热门数据:
在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长。
(2)实时调整:
现场监控数据热门,实时调整key的过期时长。
(3)使用锁:
① 在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key。
③ 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
④ 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

15.3.缓存雪崩

15.3.1.问题描述

缓存雪崩是指缓存中数据大批量地过期,而查询数据量巨大,引起数据库压力过大。缓存雪崩与缓存击穿的区别在于前者针对很多key缓存,后者则是某一个key。

15.3.2.解决方案

(1)构建多级缓存架构:
nginx缓存 + redis缓存 +其他缓存(ehcache等)
(2)使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上(不适用高并发情况)。
(3)设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
(4)将缓存失效时间分散开:
比如可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

15.4.分布式锁

15.4.1.问题描述

(1)随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
(2)目前分布式锁主流的实现方案如下:
① 基于数据库实现分布式锁
② 基于缓存(例如Redis等)
③ 基于Zookeeper
其中,Redisd的性能最高,Zookeeper的可靠性最高。

15.4.2.使用Redis命令实现分布式锁

set <key> <value> NX|XX  EX|PX  second|millisecond
NX只在键不存在时,才对键进行设置操作,SET key value NX 效果等同于 SETNX key value
EX second设置键的过期时间为 second 秒,SET key value EX second 效果等同于 SETEX key second value
PX millisecond设置键的过期时间为 millisecond 毫秒,SET key value PX millisecond 效果等同于 PSETEX key millisecond value
XX只在键已经存在时,才对键进行设置操作

15.4.3.使用Java代码实现分布式锁

(1)具体代码如下:

@RestController
@RequestMapping("/redisTest")
public class RedisController {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @GetMapping("testLock")
    public void testLock(){
        //1获取锁,setne
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        //2获取锁成功、查询num的值
        if(lock){
            Object value = redisTemplate.opsForValue().get("num");
            //2.1判断num为空return
            if(StringUtils.isEmpty(value)){
                return;
            }
            //2.2有值就转成成int
            int num = Integer.parseInt(value+"");
            //2.3把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            //2.4释放锁,del
            redisTemplate.delete("lock");
            
        }else{
            //3获取锁失败、每隔0.1秒再获取
            try {
                Thread.sleep(100);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

初始化num的值,使其为0。

使用ab工具进行压力测试,命令如下:

ab -n 1000 -c 100 http://192.168.88.1:8080/redisTest/testLock

再次查看num的值,结果为100。

(2)代码优化——使用UUID防止误删锁

@GetMapping("testLock")
public void testLock(){
    String uuid = UUID.randomUUID().toString();
    //1获取锁,setne
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
    //2获取锁成功、查询num的值
    if(lock){
        Object value = redisTemplate.opsForValue().get("num");
        //2.1判断num为空return
        if(StringUtils.isEmpty(value)){
            return;
        }
        //2.2有值就转成成int
        int num = Integer.parseInt(value+"");
        //2.3把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        //2.4释放锁,del
        redisTemplate.delete("lock");
        //判断比较uuid值是否一样
        String lockUuid = (String)redisTemplate.opsForValue().get("lock");
        if(lockUuid.equals(uuid)){
            redisTemplate.delete("lock");
        }
    }else{
        //3获取锁失败、每隔0.1秒再获取
        try {
            Thread.sleep(100);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(3)代码优化——LUA脚本保证删除的原子性

@RestController
@RequestMapping("/redisTest")
public class RedisTestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("testLockLua")
    public void testLockLua() {
        //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
        String uuid = UUID.randomUUID().toString();
        //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
        String skuId = "25"; // 访问skuId 为25号的商品 100008348542
        String locKey = "lock:" + skuId; // 锁住的是每个商品的数据

        // 3 获取锁
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

        // 第一种: lock 与过期时间中间不写任何的代码。
        // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
        // 如果true
        if (lock) {
            // 执行的业务逻辑开始
            // 获取缓存中的num 数据
            Object value = redisTemplate.opsForValue().get("num");
            // 如果是空直接返回
            if (StringUtils.isEmpty(value)) {
                return;
            }
            // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
            int num = Integer.parseInt(value + "");
            // 使num 每次+1 放入缓存
            redisTemplate.opsForValue().set("num", String.valueOf(++num));
            /*使用lua脚本来锁*/
            // 定义lua 脚本
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            // 使用redis执行lua执行
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptText(script);
            // 设置一下返回值类型 为Long
            // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
            // 那么返回字符串与0 会有发生错误。
            redisScript.setResultType(Long.class);
            // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
            redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
        } else {
            // 其他线程等待
            try {
                // 睡眠
                Thread.sleep(1000);
                // 睡醒了之后,调用方法。
                testLockLua();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @GetMapping("testLock")
    public void testLock(){
        String uuid = UUID.randomUUID().toString();
        //1获取锁,setne
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
        //2获取锁成功、查询num的值
        if(lock){
            Object value = redisTemplate.opsForValue().get("num");
            //2.1判断num为空return
            if(StringUtils.isEmpty(value)){
                return;
            }
            //2.2有值就转成成int
            int num = Integer.parseInt(value+"");
            //2.3把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            //2.4释放锁,del
            //判断比较uuid值是否一样
            String lockUuid = (String)redisTemplate.opsForValue().get("lock");
            if(lockUuid.equals(uuid)) {
                redisTemplate.delete("lock");
            }
        }else{
            //3获取锁失败、每隔0.1秒再获取
            try {
                Thread.sleep(100);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

15.4.4.总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
(1)互斥性。在任意时刻,只有一个客户端能持有锁。
(2)不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
(3)解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
(4)加锁和解锁必须具有原子性。

16.Redis6新功能

16.1.ACL

16.1.1.介绍

Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。在Redis 5版本之前,Redis 安全规则只有密码控制,还有通过rename 来调整高危命令比如 flushdb、KEYS/*、 shutdown等。Redis6 则提供ACL的功能对用户进行更细粒度的权限控制 :
(1)接入权限:用户名和密码
(2)可以执行的命令
(3)可以操作的 KEY

16.1.2.相关命令

(1)acl list:展现用户权限列表

(2)acl cat
① 查看添加权限指令类别

② 加参数类型名可以查看类型下具体命令,例如:acl cat string

(3)acl whoami:查看当前用户

(4)acl setuser

① 创建新用户(默认权限)

acl setuser <username>

上面的示例没有指定任何规则,如果用户不存在,这将使用just created的默认属性来创建用户,如果用户已经存在,则上面的命令将不执行任何操作。
② 设置有用户名、密码、ACL权限、并启用的用户

acl setuser mary on >password ~cached:* +get

切换用户,验证权限

# 切换用户
auth mary password

16.2.I/O多线程

16.2.1.介绍

Redis6支撑多线程,但不是说Redis告别单线程。I/O多线程指的是客户端交互部分的网络I/O交互处理模块多线程,而非执行命令多线程,Redis6执行命令依然是单线程。

16.2.2.原理架构

(1)Redis6 加入多线程,但跟 Memcached 这种从 I/O处理到数据访问多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、Lua、事务,LPUSH/LPOP 等等的并发问题。其整体的设计大体如下:

(2)此外,多线程I/O默认是不开启的,需要在redis.conf配置文件中进行手动配置

# 开启I/O多线程
io-threads-do-reads yes
# 多线程数量
io-threads 4

16.3.工具支持 Cluster

之前版本的Redis如果想要搭建集群,就需要单独安装ruby环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。

16.4.Redis6其它新功能

(1)RESP3新的 Redis 通信协议:
优化服务端与客户端之间通信
(2)Client side caching客户端缓存:
基于 RESP3 协议实现的客户端缓存功能。为了进一步提升缓存的性能,将客户端经常访问的数据cache到客户端。减少TCP网络交互。
(3)Proxy集群代理模式:
Proxy 功能,让 Cluster 拥有像单实例一样的接入方式,降低开发者使用cluster的门槛。不过需要注意的是代理不改变 Cluster 的功能限制,不支持的命令还是不会支持,比如跨 slot 的多Key操作。
(4)Modules API
Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。Redis一开始就是一个向编写各种系统开放的平台。

相关文章

微信公众号

最新文章

更多

目录