Redis是一款基于内存的NoSQL,本部分分析了Redis的部分源码
准备工作
- 阅读工具:Clion 与 CMakeList.txt
- Redis版本: 3.2.5
Redis的安装
Mac测试环境下进行如下安装
brew install redis
接着配置用户、监听、端口与鉴权,最后启动Redis-Server即可。具体操作在网上有很多教程,这里就不讲了。
注意在公网上一定要配置鉴权、白名单与端口,否则就要交比特币赔钱了。
断点方法
学习任何一个语言或者框架要做的三件事:编译、Log与断点。以前都是使用VC断点的,现在有了更先进的工具,使用方法如下
导入步骤
- 使用Clion打开redis源码,并复制替换刚刚的CMakeList
- 编译时,首先直接在Clion的终端(Terminal)运行
make
编译 - 调试时,配置Clion的
Run/Debug Configurations
,并选择Excutable为./bin/redis-server
即可,最后打上断点,点击“虫子图标“即可使用lldb/gdb调试分析(mac上建议使用lldb)。
Redis网络事件端到端流程流程
文章速读
下面为Redis在运行时处理Socket事件的伪代码,Select的本质上是模式匹配与高阶函数(在C中用函数指针进行模拟),全程单线程运行(BGSAVE需要fork除外)。
// Redis 中 Socket 事件的循环处理伪代码
while(true){
// 通过Epoll,Select等方法获取当前的可用的SocketDescriptor数组
aeApiPoll(eventLoop, ...);
// 对可用Socket依次进行调用函数
eventLoop.forEach(event -> {
switch(event.mask){
case AE_READABLE:
// 调用相应的函数指针,即业务代码...
rfileProc(event, ...);
break;
case AE_WRITABLE:
wfileProc(event, ...);
break;
}
})
}
IO multiplexing 与 Select系统调用
Redis在内部采用非阻塞同步IO作为底层库实现,看起来很高大上,实际上就是一个不断循环搜索可用I/O descriptor
的系统调用,它在OS上有select, epoll, kqueue等实现。在本篇中,将以Select为例介绍I/O多路复用。
Select是一个网络相关的POSIX调用,可以用 man 2 select 查看具体的介绍。
select() examines the I/O descriptor sets whose addresses are passed in readfds, writefds, and errorfds to see if some of their descriptors are ready for reading, are ready for writing, or have an exceptional condi-tion pending, respectively.
它内部通过轮询一个默认长度为1024的数组查询可读
或者 可写
的Socket descriptor(在C中用 fd_set 表示, a fixed size buffer),并返回给用户,比如下图就是一个查询可读Socket的Select流程。
通过Select可以一次性单线程通过内核遍历大量的Socket连接,再依次进行事件处理。
- 把大量IO管理的脏活交给内核了,可以轻松设计基于Event loop的模型,单线程就搞定IO,之后产生的事件再交给其他模块(比如执行线程池),业务代码更容易维护
- 相比于以前每次新来一个连接就开一个线程的做法,Select节约了线程创建、切换等资源损失
至于Select具体实现的源码,已经属于SystemCall层,可以翻下在线或者教学用的迷你Linux学习下源码,本文就不深挖了。此外,这里有一个对比阻塞与非阻塞的问答
Select在Redis中的封装
Redis实现了对底层的系统调用 Select, epoll, export, kqueue 的适配,以支持多个平台。接下来以Select为例,看一下Redis是如何封装Select的
// ae_select.c
// Redis 对 Select 的封装入口
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
// 遍历是否有可读可写的 Socket 描述符
// 1. eventLoop->maxfd+1: 文件描述符数量+1
// 2. &state->_rfds:可读的 Socket descriptor
// 3. &state->_wfds: 可写的 Socket descriptor
// 4. NULL: 错误的 Socket descriptor,NUll表示在Redis中没有监听
// 5. tvp: 超时配置 timeval {秒 + 毫秒}, 为0时不等待, 为null时阻塞等待
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
// 如果刚刚Select遍历搜索到的话
if (retval > 0) {
// 就再遍历一遍,并添加 Mask
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
//fire这里翻译为`触发事件`比较合理,为将要触发的事件放入fd与mask
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
通过将此函数放在一个while循环中,Redis就可以通过一个线程对IO事件进行高效处理,下文将详细讲。
Redis 套接字处理流程
我们首先从main函数开始
事件循环(Event loop)
可以看出这里是一个简单的事件循环,关键点是aeProcessEvents
函数
// server.c 的 main 方法中调用 ae.c 的事件循环函数 aeMain
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
事件处理
接着分析 aeProcessEvents,它用于处理Redis的所有事件
// ae.c 的 aeProcessEvents 函数,处理Redis事件
// 本文只分析Socket事件,为了版面,将删除与Socket无关的代码
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
// 此处略去定时相关代码...
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 设置 select 的超时事件
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
// tvp 为 0 时 不等待
tvp = &tv;
} else {
// tvp 为 null 时 阻塞等待
tvp = NULL;
}
// 调用对 Select 的包装,这个刚刚讲过了
numevents = aeApiPoll(eventLoop, tvp);
// 对可读与可写的事件进行处理
for (j = 0; j < numevents; j++) {
// 读取刚刚通过Select获得的Event
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// 可读Socket处理
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
// 调用函数指针
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 可写Socket处理
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 此处略去定时相关代码...
return processed; /* return the number of processed file/time events */
}
调用业务( 调用函数指针)
接下来就进行具体的调用函数指针了。这部分进行反序列化操作并执行读写事务,C中的函数指针在一定程度上类似于高级语言中的闭包(Closure),比如这里的aeFileProc就有多个实现,下面是它的原型与某个实现
// 对函数的定义
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
// 对函数的实现
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
// 业务代码...
writeToClient(fd,privdata,1);
}
可以在Clion使用
cmd
+shift
+f
进行全局文本搜索aeCreateFileEvent
,就可以找到所有的业务了。
附录
为什么NIO中Select调用操作有时会占用很多CPU?
说个NIO相关的吧,最近在用JVisualvm抽样CPU使用时,发现MQ的Select调用占用非常高的CPU时间。通过定位,发现:
- 有大量的连接通过MQ发出后无响应
- 自身由于积压导致可写的连接没被处理
- 进而引发FullGC,导致更加恶化
个人猜测,Select在某些恶劣环境下不擅长于长连接或者不稳定的连接而出现空转循环。我的建议:
- 配置Unix参数,减小栈内存的分配量,减少Socket的等待超时
- 切忌在此循环线程上执行耗时任务,建议将产生的事件交给线程池
- 将Select超时设计为阻塞等待,减少空转次数
参考文献
- http://origin.redisbook.com/
- Redis设计与实现 (数据库技术丛书)
- 深入浅出Node.js
- https://www.zhihu.com/question/28594409
- TCP/IP 应用程序的通信连接模式
本文目录
- 什么是写时复制
- 写时复制的应用场景
- 写时复制的实现
在并发编程中,如果需要实现对资源的冲突处理,一般采用互斥锁,队列、不可变来实现。上面的技术实现在很多书籍中都有,不过今天介绍的是一种新的方法--写时复制(Copy-on-write, COW)
。
关键词: COW, Copy on write, Redis
如果看的懂英文就直接看这里:
Copy on write (COW) is an optimization strategy that avoids copying large sized objects.
In a lot of real world programs, a value is copied to another variable and often is never written to. In most languages other than C++, all large sized objects are actually references. When you copy an object, all you copy is a pointer (shallow copy semantics). In such languages, COW is implemented at the language/runtime level, and not in the standard library.
In C++, copies are deep copies by default (value semantics), thus assigning large structures and strings are expensive, because the entire data is duplicated.
To avoid this, one can make a system where a copy is always shallow, but when you modify a copied object, the underlying object is duplicated, and then the changes are applied to the new copy.
总的来说,COW通过浅拷贝(shallow copy)只复制引用而避免复制值;当的确需要进行写入操作时,首先进行值拷贝,再对拷贝后的值执行写入操作,这样减少了无谓的复制耗时。
特点如下
- 读取安全(但是不保证缓存一致性),写入安全(代价是加了锁,而且需要全量复制)
- 不建议用于频繁读写场景下,全量复制很容易造成GC停顿,因此建议使用平时的ConcurrentXX包来实现。
- 适用于对象空间占用大,修改次数少,而且对数据实效性要求不高的场景。
这里的安全指在进行读取或者写入的过程中,数据不被修改。
写时复制的应用场景
写时复制最擅长的是并发读取场景,即多个线程/进程可以通过对一份相同快照,去处理实效性要求不是很高但是仍然要做的业务(比如实现FS\DB备份、BinLog、日志、分布式路由),举例如下。
1. Unix下的fork()系统调用
fork()是一个系统调用,用于创建新的进程(process)。
fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent.
fork内部实际上是对clone()系统函数的调用,它的参数CLONE_FLAG
决定了需要共享哪些数据。在fork中,没有CLONE_VM
参数,也就意味着不会共享\竞争同一个内存,而是复制一个内存快照给子进程,这个内存在32位下是4G的大小,占用空间相当的大,如果通过类似memcpy进行内存复制的话,fork调用的耗时将相当显著,甚至阻塞业务,那么为什么在真正开发调用时却没有发生呢?因为内部也是通过COW机制实现的。
内核实现:
在内核侧,在进行了内存“复制”后,子进程与父进程指向同一个只读的Page分页。当子进程或者父进程发送修改内存请求后,由于是分页是只读的,OS此时才将内存进行复制为两份,并将这两份内存设置为可写权限,最后再处理刚刚发送的修改内存请求。通过上述策略,实现了延迟复制,进程的创建是不是变快了?
2. Redis的持久化
Redis是一个基于KV的MemCache框架,可以将数据全部存储在内存中,当你希望对数据进行全量Dump(bgsave)到文件中或者进行主从同步时,将进行下面的步骤。
- Redis forks. We now have a child and a parent process.
- The child starts to write the dataset to a temporary RDB file.
- When the child is done writing the new RDB file, it replaces the old one.
可以看出,Redis通过fork()系统调用实现了写时复制,而没有自己去造轮子
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
start = ustime();
//指向子线程的pid如果为0,表示fork成功,为正表示为parent线程
if ((childpid = fork()) == 0) {
int retval;
/* Child进程要执行的代码 */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
...
return C_OK;
}
return C_OK; /* unreached */
}
在rdbSave中(目前已经为子线程中),具体实现如下,代码太长就不贴了
- 创建了一个
temp-${getPid()}.rdb
的文件 - 调用
rioInitWithFile(rio *r, FILE *tmp)
,将r
初始化为rioBufferIO
- 对全局变量
server
进行forEach反序列化,并保持到缓存r中,并写入文件,注意这个Server指针已经与父进程无关了 - 进行fflush、fsync、fclose系统调用清除OS的FS缓存(这也是OS内部的COW优化)
- 进行
rename
系统调用,进行重命名
系统调用都是默认线程安全的,所以不用担心多次重命名等问题
可以看出,在Redis中没有memcpy等内存复制过程,而是直接使用server指针进行读取并写入文件,因为在fork时,已经duplicated了快照。
3. Docker Container
提高启动速度与节省空间
https://docs.docker.com/storage/storagedriver/#the-copy-on-write-cow-strategy
写时复制的实现
以Java为例,在CopyOnWriteArrayList中,写数据在锁的保护下,而读取可以任意进行,代码如下。
private transient volatile Object[] array;
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//类似于memcpy,构造一个新的对象
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//重新设置引用
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public E get(int index) {
//获取到的数据没有实效性
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}
其它可能需要深入了解的技能
#####1. 如何实现String\Map的写时复制?
这个一般只在糟糕的面试题中出现,因为写时复制主要用于处理大的数据,而大型的字符串、Map却很少见到场景(如果说非要来一个场景的话,就是Zookeeper中读取服务时,可能需要一个Map<String,Class>来实现)。在C++中,写时复制的String已经被废弃,并且Redis中设计的字符串可以更加优雅地扩容,在Java中,各类并发库已经很成熟,写时复制主要用于实现安全迭代,而没有String或者Map的需求。
如果非要让你写,可以这样处理:
- 在构造函数、写入函数中实现深拷贝,并加锁,比如put中就再包装一道HashMap。
- 在getter函数,实现无锁直接获取。
#####2. ConcurrentHashXXX与CopyOnWriteXXX的对比?
一个适用于写入量大的场景,一个适用于读取量大的场景,它们的线程安全关系如下
Normal | Concurrent | COW | |
---|---|---|---|
Read | Unsafe | Safe | Safe, may dirtyData |
Write | Unsafe | Safe | Safe, may slowest |
Ref
- https://zh.wikipedia.org/wiki/%E5%AF%AB%E5%85%A5%E6%99%82%E8%A4%87%E8%A3%BD
- http://ifeve.com/java-copy-on-write/
- http://www.ibm.com/developerworks/tivoli/library/t-snaptsm1/
- http://blog.csdn.net/jason314/article/details/5640969
- https://www.reddit.com/r/compsci/comments/31szui/trying_to_understand_fork_and_copyonwrite_cow/
- http://stackoverflow.com/questions/1570589/is-the-volatile-keyword-required-for-fields-accessed-via-a-reentrantlock
- https://www.quora.com/What-is-Copy-on-Write-and-how-is-it-used-in-C++
The Implication of Incremental Rehashing in Redis
本文适合Java中高级、有HashMap的基础,略懂C基础的读者进行阅读。
关键词: incremental rehashing, Redis, HashMap
什么是Hash
散列函数(或散列算法,又称哈希函数,英语:Hash Function)是一种从任何一种数据中创建小的数字“指纹”的方法。散列函数把消息或数据压缩成摘要,使得数据量变小,将数据的格式固定下来。完美的散列算法可以均匀的真正随机输出。在Redis中,通过MurmurHash2算法(位于源码的dictGenHashFunction)返回的一个32位的unit;在Java中,HashCode由JVM的object.c 实现,同样返回uint32。
在本文我们只考虑理想情况,只需要知道上述Hash算法将返回一个[0,Integer.MAX_VALUE-1]之间均匀分布,且与输入值一一对应的正整数即可。
HashMap的扩容操作
在Java的HashMap源码中,当put
操作中,如果已经使用了0.75的空间,将进行扩容(resize)操作。在不考虑链地址与红黑树等特例的情况下,步骤如下:
- Expand: 新建一个为原数组
oldTab
两倍容量的新数组newTab
- Rehash: 对
oldTab
进行for循环,找出不为空的元素,并放入新数组中,伪代码如下
int newCap = oldCap * 2;
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
oldTab.forEach((Node e) -> {
newTab[e.hash & (newCap - 1)] = e;
})
可以看出,此操作为高耗时的O(N)操作,不仅要分配大内存,而且还要把旧数组进行for循环复制。如果HashMap的元素成千上万,在进行put操作时如果遇到了扩容,将造成极大的操作延迟。
Redis中的HashTable
在Redis中,由于它对实时性要求更高,因此使用了渐进式rehash
HashTable的构造
在Redis中,Hash表是这样定义的,可以看出,同样采用数组加链地址,redis与Java中的HashMap并没有什么不同
// dict.c and dict.h
// dict hashtable
typedef struct dictht {
// 指针的指针,可以看成是一个全是指针的数组, 用Java写是这样的<dictEntry*>[]
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 供链地址冲突使用,本文不考虑此特例
struct dictEntry *next;
} dictEntry;
正常情况下添加K-V Entry的实现
在没有冲突、没有替换、扩容的场景下,SET
调用栈如下
void setCommand(t_string.c);
void setGenericCommand(t_string.c);
void setKey(db.c);
void dbAdd(db.c);
static int dictAdd(dict.c)
dictEntry *dictAddRaw(dict.c)
具体代码就不放了,先为Entry分配内存,接着放入ht数组的index中,最后写入指针的k-v数据。
Redis扩容实现
当redis中所用容量达到1.0左右时,在SET
操作时将触发扩容操作;亦或在Redis启动时也会进行初始化扩容。扩容与Java一样,首先创建了一个两倍长度的数组,接着进行reHash将旧值放入新值中。
Expand
伪代码如下,ht就是上面的结构体。其中ht[0]
是旧的hash表,ht[1]
是新的hash表。可以看出通过zcalloc分配了一个双倍的内存。
// dict.c dictExpand
var realsize = 2 * ht[0].used;
// 创建一个新的 dictht,并分配内存
var ht[1] = {
size: realsize,
sizemask: realsize - 1,
table: zcalloc(realsize*sizeof(dictEntry*)),
used: 0
}
ReHash实现
由于reHash需要大量的数据,很难构造出断点,因此可以在Redis初始化时,在如下代码中打断点,这样就可以分析详细过程了。
// file:scr/dict.c
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
首先上结论,如果正在进行ReHash:
- 当进行
SET
操作时,它将首先移动一个旧元素到新数组
,再把K-V放入新的数组中 - 当进行
GET
操作时,它将首先移动一个旧元素到新数组
,再依次从旧数组、新数组中搜索Value
上述操作每次只移动一个元素,类似于某个耐心的银行,取钱时扣一点,存钱时扣一点,最终总能扣完。这样就减少了一次性压垮导致双输的风险。
接下来开始分析移动一个旧元素到新数组
的过程,即上文dictRehash(d,1)
的过程。由于dictRehash
的第二个参数在SET
与GET
场景中始终为1,因此下文代码去掉了部分内容。
int empty_visits = n*10;
dictEntry *de, *nextde;
// 遍历旧表ht[0]的数组,从rehashidx 到最 rehashidx+10,如果连续10次都为空,放弃此次机会
// 当然计数器rehashidx也相应增长了10
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 老数组中刚刚找到的 dictEntry
de = d->ht[0].table[d->rehashidx];
// 移动元素到新数组,并处理链地址冲突
while(de) {
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// rehash 全部完成,rehashidx置为 -1
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
太长不看的话,总结一下rehash的过程:
- rehashidx开始为0
- 遍历旧数组[rehashidx, rehashidx+10]共10个元素是否有entity,没有找到就等待下次机会,每处理一个值,rehashidx就加一。
- 如果找到了,就重新计算hash值,并放入新数组中,并处理链地址冲突;否则接着执行1
- rehash 全部完成后,rehashidx置为 -1
通过上述操作,每次在进行SET/GET
操作时,都会保证向前遍历旧数组1~10步,最终ht[0]将被遍历完,而ht[1]将越来越多。
总结
Redis的reHash本质上就是分而治之的方法,降低了一次压垮的风险。通过渐进式操作,分散了put/get的时间复杂度到每次操作中,当然一定程度上也增加了框架的复杂度。
参考
- 《Redis的设计与实现》
分布式锁
请注意Redis的分布式锁,就算是Redlock,也不能完全实现百分百高可用,因此仅建议在能够容忍失败的场景下使用。如果有更高精度的要求,建议使用事务,zk或者etcd等工具。
另外,尽可能不要用锁,共享数据的内容尽可能执行快一点,不要长期阻塞。业务逻辑也建议使用MQ/Akka等流式方案,避免分布式全局变量。
单机版
加锁
通过执行如下命令实现
# EX seconds -- Set the specified EXpire time, in seconds.
# NX -- Only set the key if it does Not already eXist.
set key random-token EX 30 NX
→ OK
这里的失效时间比较难配,而且random-token
也要是分布式唯一ID。
如果一定需要获取到锁才能执行,那么需要再外面包装一层轮询才够
网上有许多JedisLock的实现,我个人实现的Java代码如下
// 此处没用UUID(懒得维护全局变量),我这里注入了Eureka的InstanceId,详见我的Eureka相关文档
String uniqueId = "10.0.0.2:8080:node1-sz"
//阻塞获取锁方案
public boolean compareAndSetSync(String lock, int waitTime, int leaseTime){
while(waitMs>=0){
boolean ok = jedis.set(lock, uniqueId, "EX", leaseTime, "NX").equals("OK");
if(ok){
return true;
}
synchronized(JedisLock.this){
waitMs- = 100; //ms
try{
JedisLock.this.wait()
}catch(ignored){
}
}
}
}
相对网上开源方案的变更
- 将synchronized范围降低,将
sleep
改为wait
(wait释放时间片,此处代码可以参考我以前写的OkHttp连接池分析) - 支持阻塞等待指定时间获取锁
移除锁
通过EVAL单线程的Lua脚本实现合并两条Command为原子指令,有点类似CAS命令
# 1: paraments count is 1
EVAL "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 key random-token
→ OK
总体状态
理想情况,假设当某台机器访问某个锁时
动作 | 无锁/失效 | 自己的锁 | 他人的锁 |
---|---|---|---|
AcquireLock | OK | (nil) | (nil) |
ReleaseLock | 0 | OK | 0 |
从上面我们也看出很多缺点
- 轮询阻塞: 在真实场景中,一般会通过轮询查询锁。但是轮询for循环获取锁没有优先级调度,全靠网线质量
- 超时配置: 短了,需要业务侧维护看门狗不断延期,增加了复杂性;配置时间长了,如果当前业务挂掉,其它机器也要阻塞等待至expire。
- FailOver: 集群环境中(主从同步为异步),主机挂掉,而从机没有同步Lock信息时,锁失去唯一性;主机解锁后挂掉,而从机没解锁,所有人都要等到超时才行。
针对这些问题,在真实场景中,我们并没有用Redis去完成苛刻的任务,而主要用作全局Flag(比如防止两个耗时任务同时执行),而且Redis方案也有替代品,比如Quartz中的SELECT FOR UPDATE
就是更加完善的方案。
对开源方案的改进
集群版(redlock)
Redis with spring session
How session works with redis?
Instead of Tomcat’s Embedded HttpSession
stored in memory, Spring-Session reimplements HttpSesson
with Redis to support multiple sessions with same browser.
Try it up with debugger
Let's first create a demo project, go to starter page and create a demo project with following plugin:
- Redis
- Web
- Session
Add @EnableRedisHttpSession
to BootApplication
@SpringBootApplication
+ @EnableRedisHttpSession
public class DemoApplication {
...
}
And create a Hello Controller for debugging
@RestController
@RequestMapping("/")
public class Hello {
@GetMapping("hello")
public String hello(HttpServletRequest request, @RequestParam String value){
// add debugger here
request.getSession().setAttribute("key",value);
return value;
}
}
Add debugger with find usage
for getter/setter
org.springframework.session.web.http.SessionRepositoryFilter#CURRENT_SESSION_ATTR
Cookies resolver
org.springframework.session.web.http.CookieHttpSessionIdResolver#resolveSessionIds
And these are the mapping between cookies and redis
org.springframework.session.web.http.DefaultCookieSerializer#readCookieValues
org.springframework.session.web.http.DefaultCookieSerializer#writeCookieValue
then start a local redis(localhost:6379) and run springboot in debug mode.
Let's try with a curl
curl "http://localhost:8080/hello?value=1"
Finally, login to redis-cli, you will find sessions are already in Redis.
KEYS spring:session:sessions:*
How to reuse session-id?
By default, session ids are generated by UUID, they have no relation with browser or IP address.
org.springframework.session.MapSession#generateId
To reuse the session-id, every request from browser must have a valid session in the header(cookies or x-auth-token), I prefer header based session resolver (less payload and easier for non-browser clients).
@Bean
HttpSessionIdResolver headerResolver(){
return new HeaderHttpSessionIdResolver("miao-token");
}
Recurl and your will get a miao-token
header in response.
Content-Type: ...
Date: ...
miao-token: f15b28ea-15b6-4ce8-aad5-9ed0ab00d643
To keep the client's session up to date with server, just add the header in HttpInterceptor(eg: okhttp/ajax).
// angular HttpInterceptor, same as okhttp
export class AddHeaderInterceptor implements HttpInterceptor {
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
const clonedRequest = req.clone({
headers: req.headers.set('miao-token', YOUR_GLOBAL_TOKEN)
});
return next.handle(clonedRequest);
}
}
In the end, here is the resuable process flow.
RESTful Request
|
HttpSessionIdResolver
|
(has token id)?
|
|-------(yes): get seesion by id from redis and reuse.
---------(no): jump to login page and create id by UUID
Appendix
What's cookie's scope?
See Scope of cookies, if you share two applications with same session, you need to customize the DefaultCookieSerializer
for wider scope(path/httpOnly attributes).
CSRF protection
CSRF(Cross-Site Request Forgery) attacks may inject malicious code into webpages , see more at Spring-Securiy for server-side protection and Angular for client-side protection.
Alternatives
If you prefer a lower level redis store implementation, just try tomcat-redis-session-manager
com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve
SSO
You can also use CAS, a SSO solution for your authentication and authorization, which have a redis plugin inside.