本书介绍
Java体系
要求熟悉以下
- JVM实现: 最好看过C源码,具体有ClassLoader,线程池,锁,NIO、反射
- GC: 常见GC的算法与区别
- EE方向: JMX, HotDeploy
- 各种业务引擎: RuleEngine, TaskEngine, BPM
本文直接分析JVM源码
使用Clion(GDB)调试小型JVM源码
JVM与源码阅读工具的选择
初次学习JVM时,不建议去看Android Art、Hotspot等重量级JVM的实现,它内部的防御代码很多,还有android与libcore、bionic库紧密耦合,以及分层、内联甚至能把编译器的语义分析绕进去,因此找一个教学用的、嵌入式小型的JVM有利于节约自己的时间。因为以前折腾过OpenWrt,听过有大神推荐过jamvm,只有不到200个源文件,非常适合学习。
在工具的选择上,个人推荐SourceInsight。对比了好几个工具clion,vscode,sublime,只有sourceinsight对索引、符号表的解析最准确。
sourceInsight的配置
阅读C代码除了Clion外,还推荐使用sourceInsight,这个软件一般在H等硬件厂商用的多,虽然很丑但是速度很快,使用教程见这里
- 记得在View中打开Reference与ContextWindow
- 使用时建议关闭浏览器与各种套壳应用(否则Reference会很慢)
什么是MarkSwip
虽然以前在各种文档、面试题中看过这个算法(标记清除算法),但是它完整的定义是这样的:
标记清除-将所有活动对象都做上标记,接着将没有标记的(非活动)对象进行回收的过程。
在Java中,
伪代码如下
funtion(obj){
return
}
引用计数法(Refference Count)
在Python、iOS中,使用了引用计数法作为GC机制(当然iOS中的更高级,叫做ARC,是在编译时就搞定了自动控制内存)
如何分析full-gc
ClassLoader
ClassLoader也就是类的加载器
比如
- Spring与Quartz的ResourceLoaderClassLoadHelper
- Mybatis中的ClassLoaderWrapper
Java Class文件的结构
在*.class文件中,以Byte流的形式进行Class的存储,通过一系列Load,Parse后,Java代码实际上可以映射为下图的C结构体,这里可以用javap -v -p
命令或者IDE插件进行查看。
typedef struct {
u4 magic;/*0xCAFEBABE*/
u2 minor_version; /*网上有表可查*/
u2 major_version; /*网上有表可查*/
u2 constant_pool_count;
cp_info constant_pool[constant_pool_count-1];
u2 access_flags;
u2 this_class;
u2 super_class;
u2 interfaces_count;
u2 interfaces[interfaces_count];
//重要
u2 fields_count;
field_info fields[fields_count];
//重要
u2 methods_count;
method_info methods[methods_count];
u2 attributes_count;
attribute_info attributes[attributes_count];
}ClassBlock;
-
常量池(constant pool):类似于C中的DATA段与BSS段,提供常量、字符串、方法名等值或者符号(可以看作偏移定值的指针)的存放
-
access_flags: 对Class的flag修饰
typedef enum { ACC_PUBLIC = 0x0001, ACC_FINAL = 0x0010, ACC_SUPER = 0x0020, ACC_INTERFACE = 0x0200, ACC_ACSTRACT = 0x0400 }AccessFlag
-
this class/super class/interface: 一个长度为u2的指针,指向常量池中真正的地址,将在Link阶段进行符号解引。
-
filed: 字段信息,结构体如下
typedef struct fieldblock { char *name; char *type; char *signature; u2 access_flags; u2 constant; union { union { char data[8]; uintptr_t u; long long l; void *p; int i; } static_value; u4 offset; } u; } FieldBlock;
-
method: 提供descriptor, access_flags, Code等索引,并指向常量池:
它的结构体如下,详细在这里
method_info { u2 access_flags; u2 name_index; //the parameters that the method takes and the //value that it return u2 descriptor_index; u2 attributes_count; attribute_info attributes[attributes_count]; }
以上具体内容可以参考
Class与Object
typedef struct object Class;
typedef struct object {
uintptr_t lock;
Class *class;
} Object;
本文指的Class
指
前提: 已经获取到Class
结构体对应的指针
下面是经过删减与注释的代码(删去了状态判断、Lock与异常处理),并替换宏变量为字符串
// class.c
Class *initClass(Class *class) {
ClassBlock *cb = CLASS_CB(class);
ConstantPool *cp = &cb->constant_pool;
FieldBlock *fb = cb->fields;
MethodBlock *mb;
Object *excep;
int state, i;
linkClass(class);
cb->state = CLASS_INITING;
cb->initing_tid = threadSelf()->id;
if(!(cb->access_flags & ACC_INTERFACE) && cb->super
&& (CLASS_CB(cb->super)->state != CLASS_INITED)) {
initClass(cb->super);
}
/* Never used to bother with this as only static finals use it and
the constant value's copied at compile time. However, separate
compilation can result in a getstatic to a (now) constant field,
and the VM didn't initialise it... */
for(i = 0; i < cb->fields_count; i++,fb++)
if((fb->access_flags & ACC_STATIC) && fb->constant) {
if((*fb->type == 'J') || (*fb->type == 'D'))
fb->u.static_value.l = *(u8*)&(CP_INFO(cp, fb->constant));
else
fb->u.static_value.u = resolveSingleConstant(class, fb->constant);
}
if((mb = findMethod(class, "<clinit>", "()V")) != NULL)
executeStaticMethod(class, mb);
return class;
}
贴出上面主要是为了让你明白
- 调用
linkClass
进行连接 - 调用
super
的class中的initClass
- 调用
<clinit>
方法,也就是static代码段
上面部分实际上是对Class进行模式匹配(pattmatch)的遍历,伪代码如下
(define initClass
(lambda (exp)
(linkClass exp)
(match [(?isSuperInited superClass) (initClass superClass)])
(clinit exp)))
在平时开发中,只需要背住就可以了。
Class与依赖注入
很多人认为学习JVM是“高手”才去做的,平时写业务时没用,下面举一个Spring断点调试技巧。在分析Spring的依赖注入时,很多人看到复杂源码就无法接着分析了。其实可以这样,首先在Bean中加入static代码段,并打上断点,然后启动程序。
@Bean(name="SSR")
public class DemoBean{
static{
// 此处打上断点
System.out.println("class loaded");
}
....
}
等断点跳到这里时,可以发现是Class.newInstance()
方法被调用,进而调用<clinit>
,此时再向上分析Spring的代码堆栈,阅读源码与流程就轻而易举了。
附录
下面2个是常见面试题
求打印顺序
实例化AChildChild后,求输出顺序
public class AParent {
static {println("AParent clinit");}//1
public AParent() {System.out.println("AParent init");}//4
}
public class AChild extends AParent {
static {println("AChild clinit");}//2
public AChild() {println("AChild init");}// 5
}
public class AChildChild extends AChild {
static {println("AChildChild clinit");}//3
public AChildChild() {println("AChildChild init");}//6
}
AChildChild acc = new AChildChild();
其中1,2,3的顺序本文已经可以解释,4,5,6下次讲解init
时进行分析
JDBC的加载
下图是加载mysql的例子,当程序员调用Class.forName
时,static代码段就会执行
public class Driver extends NonRegisteringDriver implements java.sql.Driver {
static {
try {
java.sql.DriverManager.registerDriver(new Driver());
} catch (SQLException E) {
throw new RuntimeException("Can't register driver!");
}
}
public Driver() throws SQLException {}
}
JUC(java.util.current)、线程安全与并发的综述
本文先入一个门,方便学习并发
- 线程安全的实现
- 线程,线程池,JVM内部的实现
- CAS、Sync(以及区别)
- 平时遇到的并发问题
什么是并发(Concurrent),什么是并行(Parallels)?
这个概率比较混乱,在书上与StackOverFlow上答案各有各的说法。以下是一本专业书籍的引用
并发程序是指可以被同时发起执行的程序。而并行程序则是被设计成可以在并行的硬件上执行的并发程序。换句话说,并发程序代表了所有可以实现真正的或者可能的并发行为的程序。它是一个比较宽泛的概念。这其中包含了并行程序。并行程序是并发程序中的一种。 -《Go并发编程实战》
总的来说,就是并行需要硬件支持,并发同时可以用硬件或者时间片模拟。因此并行是并发的子类。
详细可以参考parallel-vs-concurrent
线程安全的实现
线程安全与并发似乎天天都遇到,
1. 无状态函数
无状态函数是指无副作用的函数,也就是函数式编程中所谓的“纯函数”,比如map,filter,它们在LISP语言中叫做Lambda表达式。因为不涉及共享变量,所以总是线程安全的。
2. Final不可变
3. 锁
3.1. CAS锁
3.2. 互斥锁
书籍推荐
- 七周七并发模型
- Java虚拟机并发编程
final变量
fina指read-only的意思,很容易理解
简单入门
class Student {
final String name
Student(String name) {
this.name = name
}
}
Student student = new Student("smith")
// 报错
student.name = "changed"
为什么String是final的
String通过final class 与 final array表示String即无法继承,也无法修改,让String在语义上保证不可变性
ThreadLocal
ThreadLocal是线程本地独有可见的变量(各管各的),一般用于上下文管理
简单入门
举个常见的反序列化例子
private static ThreadLocal<SimpleDateFormat> f = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd");
}
};
原理
threadLocal看起来像是在“栈”上,实际上它还是Thread的成员变量,还是在堆上的,每次实际上通过访问如下获得
//类型如下
//Thread
//ThreadLocal.ThreadLocalMap
//ThreadLocal.ThreadLocalMap.Entry[]
//T
Thread.currentThread().threadLocals.table[i].value
其中
- i: 通过ThreadLocal的全局成员变量nextHashCode进行自增,每多一个ThreadLocal实例,自增一次
- value: 通过initialValue传入的对象
在企业开发中的作用
ThreadLocal是线程本地独有可见的变量,一般用于上下文管理
比如
- 管理Spring-transaction: 数据库事务
- 管理BPM流程上下文: 这个主要是在企业级应用中使用,自己跳自己的伞,一个线程走到底
- 业务路由: Tomcat中的
getRequestDispatcher
进行重定向,比如资源静态化的Filter
在序列化的作用
在进行序列化与反序列化时,内部一定会通过一个状态机来维护上下文,如果它不是线程安全的,那么状态机一定会出问题
- SimpleDataFormat.parse(不是线程安全)
- BigDecimal.toString(线程安全)
- String
.<init>
(线程安全)
线程/进程区别
通过strace
分析两个C语言写的fork
与pthread_create
,最终可以发现是系统调用clone
的共享FLAG不用
a review for thread mode
本文首先介绍了一些线程基础,比如并发、并行、内存分配、系统调用、POSIX线程。接着通过strace分析了线程与进程的区别。最后以Android、Golang等线程模型进行了分析。
1. 基础
1.1. OS下如何进行内存分配?用户区与内核区有什么区别?
在32位的Linux操作系统中,当一个进程启动后,将被分配4G的虚拟内存。内存可以分为两个空间,一个是用户空间(0~3G),另一个是内核空间(3G~4G)。其中用户空间就是代码运行的空间,比如堆栈、BSS(未初始化数据段)、DATA(已经初始化数据段)、TEXT(代码二进制段);而在内核空间中,是OS内核的映射,只有在执行syscall系统调用时,才能进行重写。
在用户态中,执行用户代码,比如直接运行C程序、或者运行JVM虚拟机等。
在内核中,主要负责I/O(显示,层三以下的网络,FS),Memory(虚拟内存,页面替换/缓存), Process(信号、线程/进程管理,CPU调度)的管理,直接控制CPU、内存等硬件,权限(privilege)非常大;
1.2. 系统调用中断(SCI)是什么?
系统调用是用户与内核间的一个桩(stub),当在用户态执行高权限任务,需要通过系统调用切换入内核态去执行最底层任务。比如在C语言中调用getTime()
时,大致流程如下
1. app method(User Application)
|
|调用stdlibc标准库
|
2. systemcall_stub(std libc)
|
|系统调用,进入内核态
|
3. system_call_table[call_number](Kernel)
|
|通过查表调用硬件函数
|
4. hardware_call(Kernel)
- 在App层面,开发者不需要自己写系统调用,系统会提供相关C标准库的SDK供开发者使用,比如开发者调用
getTime()
时,实际是使用了标准库的time.h
头文件。 - 代码在执行时,OS自动加载标准库。比如在android的bionic库中,实际执行getTime的系统调用是这里的平台相关的汇编代码,将系统调用的ID、参数传入内核。
- 内核通过系统调用ID进行表的索引,寻找真正的硬件调用函数
- 进行硬件相关的调用
在Mac下打开ActivityManager或者在Terminal中运行top,就可以显示地看到用户与系统的CPU占用
1.3. POSIX线程模型
POSIX是IEEE P1003.1中的线程标准,目前所有的系统,甚至windows都支持POSIX。它提供了用户态下的线程编程接口,开发者在进行线程开发时,只用引用pthread.h
头文件调用即可。程序在运行时通过系统调用,在内核中进行线程的实现。它有很多函数,比如create, exit, join, yield等,具体可以去各个平台下的libc源码/sdk中去看Header文件中方法的定义,比如android中使用biolibc中pthread.h的代码在这里,这里的头文件是对内核线程的包装。
2. 线程与进程的区别
这是一道经典的面试题,大多数回答者都是回忆起当初学习操作系统课本中的知识。然而课本中太偏向于内核,从开始就学习内核底层,而脱离开发,笔者认为是不太明智的。因此通过设计一个系统调用栈的分析,让读者有更清晰的了解。
本文线程特指32位下使用glibc的Linux系统中的POSIX模型,即用户面线程,进程特指unstd.h
中的fork产生的进程。
本测试基于Ubuntu 14.04 i386
1. 测试代码设计
1.1. 线程测试代码
//modified from https://computing.llnl.gov/tutorials/pthreads/samples/hello.c
//todo run:
//clang -Wall -g pthread.c -o pthread.out -lpthread
//strace -Cfo ./pthread.strace.log ./pthread.out
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void*
PrintHello(void *threadid)
{
long tid;
tid = (long)threadid;
printf("Hello World! It's me, thread #%ld!\n", tid);
pthread_exit(NULL);
}
int
main(int argc, char *argv[]){
pthread_t thread;
int rc = 0;
long t = 0;
printf("In main: creating thread %ld\n", t);
//注意这里是一个函数指针,不要傻眼了
rc = pthread_create(&thread, NULL, PrintHello, (void *)t);
if (rc){
exit(-1);
}
}
1.2. 进程测试代码
//todo run:
//clang -Wall -g fork.c -o fork.out
//strace -Cfo ./fork.strace.log ./fork.out
#include <unistd.h>
int
main(int argc, char *argv[])
{
pid_t pid;
pid = fork();
if(pid < 0){
return -1;
}
return 0;
}
2. 测试结果
在编译完成后,调用strace
命令后,结果如下
2.1. 进程的strace路线如下
19948 execve("./fork.out", ["./fork.out"], [/* 68 vars */]) = 0
19948 brk(0) = 0x9bc000
19948 open("/lib/x86_64-linux-gnu/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
19948 read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\37\2\0\0\0\0\0"..., 832) = 832
.....
19948 clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5adac4ca10) = 19949
....
19949 +++ exited with 0 +++
2.2. 线程的strace路线如下
21958 execve("./pthread.out", ["./pthread.out"], [/* 68 vars */]) = 0
21958 open("/lib/x86_64-linux-gnu/libpthread.so.0", O_RDONLY|O_CLOEXEC) = 3
....
21958 access("/etc/ld.so.nohwcap", F_OK) = -1 ENOENT (No such file or directory)
21958 open("/lib/x86_64-linux-gnu/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
21958 read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\37\2\0\0\0\0\0"..., 832) = 832
21958 fstat(3, {st_mode=S_IFREG|0755, st_size=1845024, ...}) = 0
21958 mmap(NULL, 3953344, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f34229e4000
....
21958 clone(child_stack=0x7f34229e2fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f34229e39d0, tls=0x7f34229e3700, child_tidptr=0x7f34229e39d0) = 21959
....
21958 +++ exited with 0 +++
3. 测试结论
通过上述的调用栈分析,可以得知均是通过调用x86_64-linux-gnu
下的libc库,接着通过systemcall函数clone()
实现对内核Process的控制,主要区别在函数参数中clone_flag上的不同,clone_flag指定了可以共享的资源,下图显示了进程与线程的flag
//clone flag between thread and process
//⚠️: 省略了`CLONE_`前缀
//进程的FLAG参数
flags=CHILD_CLEARTID|CHILD_SETTID|SIGCHLD
//线程的FLAG参数
flags=VM|FS|FILES|SIGHAND|THREAD|SYSVSEM|SETTLS|PARENT_SETTID|CHILD_CLEARTID
通过对clone
进行man查询,解释如下
进程的参数解释:
CLONE_CHILD_CLEARTID
: Erase child thread ID at location ctid in child memory when the child exits, and do a wakeup on the futex at that address。CLONE_SETTLS
: thread local storage (TLS) area,注意这个不可移植CLONE_SIGHAND
: 共享signal handlers
线程的一些参数解释:
CLONE_VM
: the calling process and the child process run in the same memory space. (注意这里说的是memory space
,指通过mmap()分配的内存。再多说一点,线程中的栈内存由pthread_attr_t
属性中的pthread_attr_setstacksize()
函数实现,默认可能为8MB(你可以运行ulimit -a
查看最大值),当然在实际中我们使用栈内存大多都是几KB而已;堆内存是共享的,这里不讨论)CLONE_FS
: 共享文件系统,如下函数chroot(2), chdir(2), or umask(2)会被影响。CLONE_FILES
: 共享file descriptor tableCLONE_SIGHAND
: 共享signal handlersCLONE_THREAD
: 共享thread group,即有相同的PID,独立的TID;CLONE_SYSVSEM
: 共享System V semaphore undo values列表,俺表示目前还不懂。CLONE_SETTLS
: thread local storage (TLS) area,注意这个不可移植CLONE_PARENT_SETTID
: Store child thread ID at location ptid in parent and child memory.CLONE_CHILD_CLEARTID
: Erase child thread ID at location ctid in child memory when the child exits, and do a wakeup on the futex at that address。
接着结合一些教科书,可以得知最终结论
进程 | 线程 | |
---|---|---|
用户层函数 | fork() | pthread_create() |
内核实现 | clone() | clone() |
内存 | 新复制的内存(Copy-on-Write),独立4G(1G+3G) | 共享4G内存:其中8M左右的栈内存是私有的,可以通过参数决定;共享堆内存 |
创建耗时 | 复制的flag少,所以耗时多 | 低 |
上下文切换耗时 | 主要是切换内存空间 | 几乎只有进出内核的损失 |
内部通信 | IPC(Socket, Pipe, ASM...) | 共享的数据段(比如说DATA段的全局变量,更简单) |
举例 | Redis备份,运行含全局锁的脚本语言... | I/O Select 的消息处理、线程池... |
高级语言对内核线程的封装实现
除了通过POSIX标准外,高级语言也可以自己通过系统调用对内核的线程进行实现,主要有如下三种。
1. 纯内核线程实现(1:1)
此线程模型将内核线程与App线程一一对应,可以看作为一种简单的映射关系,这里的代表有POSIX线程模型(pthread),以及依赖pThread标准库的Java与Ruby(1.9+)线程模型。
以在Android/ARTJvm下创建线程为例,具体实现调用栈如下
java.lang.Thread
|
POSIX thread(user mode){
0. art.runtime.Thread::CreateNativeThread(cpp, in jvm)
1. pthread_create(pthread.h,标准库头文件)
2. bionic标准库下的so文件,进行SystemCall(libc)
3. 用户态陷入内核态
}
|
Kernal thread(kernal mode)
可以看出,在JVM下的实现主要是对POSIX线程的包装与映射,自己本身只是做了点微小的工作,特点如下:
- 移植性较差,需要适配各种libc库,但是由于被OS直接管理,因此在分配任务上可以充分借用内核的高效调度,能够高效利用物理核并实现真正的并行。
- 用户态与内核态切换有一定的消耗损失
2. 纯用户态实现(1:N)
将线程的调度在用户态实现,也称green thread
,自己写调度算法,可以将一个native线程映射为多个app thread(这里也可以叫做线程包),这里的代表有Ruby(1.8-),Java等老版本,特点如下:
- 移植性好,没有切换、映射到内核的损失
- 需要自己维护Scheduler
- 由于内核并不了解调度细节,很难进行多核利用
3. 混合实现(M:N)
可以同时运行M个kernel线程下管理N个app线程,比如golang。通过设置GOMAXPROCS
个native线程,然后通过go
关键词创建app线程,它的特点如下:
- 调度器实现比较困难
- 通过语法糖与管道简化了并发编程,切换损失低
- 部分调度需要自己主动释放时间片
golang threading model(N)
↓
↓ goroutine
↓
Kernal thread model(M)
详见libtask与许式伟的《go语言编程》
总结
- Concurrent是Parallels的父类
- 在启动一个程序后,将分配用户态与内核态任务,通过系统调用执行内核中的高权限任务
- POSIX是一种线程标准,或者是一种接口,由libc库实现
- 线程与进程最大的区别在于内核函数
clone
函数的flag不同,导致共享资源不同。最终创建、切换耗时不同;以及内存分配、内部通信复杂度不同。 - 在Java中,
java.lang.Thread
与内核线程一一对应;在某些旧版语言中,实现了一个内核线程对应多个高层线程;在golang中,通过goroutine
实现M个内核线程对应N个高层线程;
REFFERENCE
- https://www.zhihu.com/question/21461752
- https://blog.codinghorror.com/understanding-user-and-kernel-mode/
- http://stackoverflow.com/questions/1311402/differences-between-user-and-kernel-modes
- https://zh.wikipedia.org/wiki/%E5%BF%99%E7%A2%8C%E7%AD%89%E5%BE%85
- https://www.ibm.com/developerworks/cn/linux/l-system-calls/
JVM线程生命周期的native实现
本文不是入门文章,对读者的理论基础与折腾要求都有点高
- 了解Java层Thread的各种玩法
- 知道什么是pthread,什么是系统调用,什么是Glibc/NPTL(Native POSIX Thread Library)
- 了解操作系统内核中线程的理论流程
- 需要会配置Clion/VSCode
配置编译参数并测试
首先还是按照上次讲的使用Clion(GDB)调试小型JVM源码方法编译,只是编译参数改一下,以免日志过多
- ./configure --with-classpath-install-dir=/tmp/classpath --enable-trace
+ ./configure --with-classpath-install-dir=/tmp/classpath --enable-tracethread
编译成功后,分别用直接运行与strace进行测试
➜ cd $HOME
➜ $HOME/Desktop/jamvm-2.0.0/src/jamvm -cp . A
....
Hello
➜ strace $HOME/Desktop/jamvm-2.0.0/src/jamvm -cp . A
....
clone(child_stack=0x7f96bf542ff0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f96bf5439d0, tls=0x7f96bf543700, child_tidptr=0x7f96bf5439d0) = 17140
....
new Thread创建的源码分析
首先构造一个Java代码创建一个线程,并编译为Class文件
//A.java
class A{
public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"233");
}
}).start();
}
}
首先在Java层中使用普通JDK进行断点,并观察断点运行时的线程名称,位置如下
- start()
- run()
- exit()
- ThreadGroup.remove()断点
接着看Java中线程的源码
通过对Thread的<init>
与start
进行分析,可以发现start
中调用了native方法。这里的start
需要注意,在OracleJVM中使用了start0作为native方法,而在GNU Classpath/Android中出使用了java.lang.VMThread.create(this, stacksize);
实现,本文主要分析java.lang.VMThread.create
的实现createJavaThread。
在JVM上通过断点调试可以发现,此native方法被JVM路由到thread.c的createJavaThread
中
我们在这里打断点
// src/thread.c
void createJavaThread(Object *jThread, long long stack_size) {
...
Thread *thread = sysMalloc(sizeof(Thread));
...
// native 与 Java 线程的上下文环境进行绑定
thread->ee = ee;
ee->thread = jThread;
ee->stack_size = stack_size;
...
disableSuspend(self);
// 通过系统调用创建native线程, `threadStart`是创建成功后的回掉函数指针
if(pthread_create(&thread->tid, &attributes, threadStart, thread)) {
...
return;
}
/* 自旋等待线程变更状态,这里一般来说将会变为 RUNNING */
pthread_mutex_lock(&lock);
/* Wait for thread to start */
while(classlibGetThreadState(thread) == CREATING)
pthread_cond_wait(&cv, &lock);
pthread_mutex_unlock(&lock);
enableSuspend(self);
}
当通过系统调用创建线程完成后,在新的线程中调用threadStart
函数指针
// 新线程启动后将立刻调用的函数
void *threadStart(void *arg) {
Thread *thread = (Thread *)arg;
Object *jThread = thread->ee->thread;
/* Parent thread created thread with suspension disabled.
This is inherited so we need to enable */
enableSuspend(thread);
/* Complete initialisation of the thread structure, create the thread
stack and add the thread to the thread list */
initThread(thread, INST_DATA(jThread, int, daemon_offset), &thread);
/* Add thread to thread ID map hash table. */
addThreadToHash(thread);
/* Set state to running and notify creating thread */
// 设置状态为 RUNNING,通过`pthread_cond_broadcast`结束`pthread_cond_wait`的状态
signalThreadRunning(thread);
/* Execute the thread's run method */
/* 开始执行Java代码 class=java.lang.Thread,nam=run,sign=`()v` */
executeMethod(jThread, CLASS_CB(jThread->class)->method_table[run_mtbl_idx]);
/* Run has completed. Detach the thread from the VM and exit */
/* 这里执行了java.lang.ThreadGroup: removeThread(Ljava/lang/Thread;)V */
detachThread(thread);
TRACE("Thread %p id: %d exited\n", thread, thread->id);
return NULL;
}
如何通过Class引用打印名称 slash2DotsDup(CLASS_CB(class)->name)
可以发现,总体流程是非常清晰的
- 主线程通过pthread_create创建新线程
- 新线程执行
Thread.run()
的Java代码段,执行完成后detach,移除ThreadGroup,结束线程
总的来说,这里分析并不是非常细致,主要是把流程先通起来,后续有具体例子(比如线程池)时再分析
总结
在JamVM中
- Java线程与pthread线程一一对应,通过内核调度实现。但是不同厂商不一定。
- 在Java代码中Runnable中的代码段的执行实际上通过
pthread_create
的函数指针threadStart
进行包装后,最终调用executeMethod
实现解释字节码
附录
pthead相关知识
在线程模型的综述中已经讲过pthread相关知识,这里再回顾一下。JamVM使用pthread的头文件进行开发,在Linux中一般由Glibc实现,最后通过SYSCALL系统调用通过内核去创建线程。
java.lang.Thread
|
POSIX thread(user mode){
1. pthread_create(pthread.h,标准库头文件)
2. bionic/glibc等标准库下的so文件,进行SystemCall
3. 用户态陷入内核态
}
|
Kernal thread(kernal mode)
pthead_create参数解释
入参如下
extern int pthread_create (pthread_t *__restrict __newthread,
const pthread_attr_t *__restrict __attr,
void *(*__start_routine) (void *),
void *__restrict __arg) __THROWNL __nonnull ((1, 3));
相关参数
_newthread
: 即tid, 通过某种算法计算出唯一的Id_attr
: 线程类型,初学者可以配置为默认,即NULL_start_routine
: 是一个函数指针,类似于动态语言中的闭包(Closure)。线程创建后将执行此函数_arg
: 供_start_routine
使用的入参
Glibc中NPTL的pthread_create的实现
下次有人再问你线程与进程的区别这种烂大街的问题,你就回答它们就是flag不同,导致(通过COW)共享的内存也不同而已
// glibc-2.25/sysdeps/unix/sysv/linux/createthread.c
const int clone_flags = (CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SYSVSEM
| CLONE_SIGHAND | CLONE_THREAD
| CLONE_SETTLS | CLONE_PARENT_SETTID
| CLONE_CHILD_CLEARTID
| 0);
TLS_DEFINE_INIT_TP (tp, pd);
// 此部分调用 external `__clone`
if (__glibc_unlikely (ARCH_CLONE (&start_thread, STACK_VARIABLES_ARGS,
clone_flags, pd, &pd->tid, tp, &pd->tid)
== -1))
return errno;
__clone
的实现在内核中,通过 x86_64 ABI与网上找的例子,最终汇编代码如下
eax = 120 (syscall number for sys_clone)
ebx = unsigned long flags
ecx = void *child_stack
edx = void *ptid
esi = void *ctid
edi = struct pt_regs *regs
int 80H
内核收到80H中断请求后,将进行内核态线程的创建与调度,后面内核线程以后再写...
JVM中自动启动的线程
在JVM中,创建了main线程(Native线程,对应 JVM下的用户主线程) + 如下4个Java-level的线程 ,可以通过对createVMThread
进行findUsage定位
1. GC线程
通过WhileLoop与Sleep每隔1000ms进行一次GC,这里后期将专门分析GC流程(当初买的一本GC实现终于可以用啦)
/* Create and start VM thread for asynchronous GC */
if(args->asyncgc)
createVMThread("Async GC", asyncGCThreadLoop);
2. Reference处理线程
这里配合GC的线程,共2个,日后在详细讲
/* Create and start VM threads for the reference handler and finalizer */
createVMThread("Finalizer", finalizerThreadLoop);
createVMThread("Reference Handler", referenceHandlerThreadLoop);
3. 信号处理线程
通过WhileLoop+sigwait()
系统阻塞监听信号实现处理 SIGINT
与 SIGQUIT
/* Create the signal handler thread. It is responsible for
catching and handling SIGQUIT (thread dump) and SIGINT
(user-termination of the VM, e.g. via Ctrl-C). Note it
must be a valid Java-level thread as it needs to run the
shutdown hooks in the event of user-termination */
createVMThread("Signal Handler", classlibSignalThread);
参考
- 操作系统真象还原: 类似于读书笔记吧,功力在普通国产书之上
- http://syscalls.kernelgrok.com/: 在线查阅各种系统调用
开源项目中的线程池
下面介绍几个开源项目中如何配置的线程池
JDK自带Executors里面几个线程池
这几个就不介绍了,可以直接看JavaDoc
corePoolSize | maximumPoolSize | keepAlive | BlockingQueue | |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newFixedThreadPool | ${nThreads} | ${nThreads} | 0s | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0s | LinkedBlockingQueue |
初始线程数并不是越大越好,它最终取决于代码中可以并行运算的比例。当并发线程太多时,系统整体性能反而会下降,因为系统把很多时间花在了线程调度上,详见阿姆达尔定律
RxJava中几个线程池
在RxJava2中主要有如下4个线程池(划掉的不进行分析)
SingleScheduler- ComputationScheduler
- IoScheduler
NewThreadScheduler
本文只专注于它的“执行”过程,而不是调度过程,最重要的部分在Worker
中,因此可以这样构造代码研究
// Groovy代码
5.times {
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Runnable() {
@Override
void run() {
println "inner = " + Thread.currentThread().getStackTrace().join('\n')
println "inner = " + Thread.currentThread().getName()
}
});
}
Thread.currentThread().join()
下面假设你已经把上面流程全部调通。以下为我的调试结果
线程池
: 一群只会干活的奋斗者,所有线程池构造都是newScheduledThreadPool(1, factory)
。Worker
: Worker是对线程池的包装,类似于基层领导,一个Worker被分配一个线程池,当然这个线程池可能身兼数职。EventLoopWorker
: 对线程池外部状态进行维护,可以看作某些公司的HR
Worker状态
创建
调用createWorker创建一个Worker
//IoScheduler
public Worker createWorker() {
//CachedWorkerPool: 变长队列,可以创建与缓存无数个线程池
return new EventLoopWorker(pool.get());
}
//ComputationScheduler
public Worker createWorker() {
//FixedSchedulerPool: 用数组pool[CPUSize]实现,通过取余调用(实现代码为`pool[i++%4]`)分配任务,不能创建更多的线程池
return new EventLoopWorker(pool.get().getEventLoop());
}
干活
work执行schedule
,最终调用线程池的executor.submit()
,并返回一个包装后的Future,这里没什么可讲的。
任务结束
onComplete
后RxJava框架调用FlowableSubscribeOn
的dispose
方法,最终主动调用worker.dispose()
//IoScheduler.EventLoopWorker
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
// 让线程池执行cancel命令,并清空所有Future
tasks.dispose();
// 设置Worker超时,加入“短命单”,60s后定时清理一次
// 这里原理是构造了每隔60s自动执行的`evictorService`,它是一个清洁线程池,定期执行`evictExpiredWorkers`方法,移除队列`expiringWorkerQueue`中超时的Worker
// releasing the pool<threadWorker> should be the last action
pool.release(threadWorker);
}
}
//ComputationScheduler.EventLoopWorker
@Override
public void dispose() {
if (!disposed) {
disposed = true;
// 斩立决,让线程池执行cancel命令,并清空所有Future
both.dispose();
}
}
IoScheduler的模式正是某些私营公司在工作量大时海量招聘时堆人,加班奋斗后变成甘蔗渣,最后进入短名单被HR定期优化的流程。
ComputationScheduler的模式是某些公司工作量大时加班再多也不招人,加班奋斗后变成甘蔗渣,最后直接被通知斩立决马上走人的流程。
通过断点也可以发现,由于IoScheduler是在static初始化并启动,因此无论是否使用IoScheduler,清洁工线程池evictorService将自动启动且永远不会自动关闭,除非将IoScheduler进行主动shutdown。发现了什么了吗,执行者只是可替代的资源,只有Pool才是核心竞争力!
Dubbo中的线程池
Dubbo是一款事务框架,这种纯框架中使用的线程池一般可以作为各种场景负载下的教科书,我们将Executors
作为关键字进行统计
打开IDEA,运行
cmd
+shift
+F
,搜索Executors.
即可
如果不使用IDEA,可以通过Shell的grep暴力搜索
grep 'Executors.' * -R|grep -v test|grep -v import|grep -v class
经过统计,最终如下
newCachedThreadPool: 10
最低线程限制为1个,处理实际业务与Socket(JBOSS、Netty、bootstrap)等
newScheduledThreadPool: 11
最低线程数在1~3之间
URL: 定期重连,清除过期者,心跳,延期等。调用BO失败后定期重试
文件: 更新定时统计/图表/日志, 检查文件更新(可用于热部署),
newFixedThreadPool: 1
线程限制固定为1个,用于文件缓存写入
在并发业务中,一般分为有状态的与无状态的服务,其中【有状态】的服务需要处理竞态场景(比如a++这样的操作),主要有两种解决方案
- 使用互斥锁保证独立性,但是由于需要线程挂起/切换/调度,导致效率较低,因此对它的要求是包裹越小越好,详情看这里的对比与以前写的理论分析。这里典型的实现有Java的
synchronized
以及其它语言的Mutex - 通过busy waiting不断尝试,即
while(CAS(old,new)) {};
。这里主要有JDK5.0以上的atomic库,以及常见的自旋锁(SpinLock,比如Linux内核/通信设备就有使用),它们底层均通过CAS指令保证原子性,这个是本文将要介绍的内容。
快速结论
- Java的CAS是通过汇编命令
cmpxchg
进行的包装与定制实现atomic memory operation (AMO) i - CAS由于是进行主动比较,因此
compareAndSet
一定会放在一个自旋中,它适用于冲突(自旋次数)比较少的场景。在某些激烈的场景下可能性能还不如互斥锁。 - CPU汇编层面没有实现CAS的公平调用,需要上层业务自行实现
CAS的简介
如果没有听过CAS的话,下面是简要入门,比如下面可以达到最终总是100,而不会小于100
AtomicInteger a = new AtomicInteger(0)
def runnable = {
println "a=${a.incrementAndGet()}, ${Thread.currentThread().getName()}"
}
100.times {
new Thread(runnable).start()
}
Thread.currentThread().join()
配置GDB断点(选读)
本文将通过GDB调试JamVM实现对CAS的源码分析,首先要搭建GDB调试环境,然后才能进行测试
测试代码如下
import java.util.concurrent.atomic.AtomicInteger;
class A{
public static void main(String[] args){
AtomicInteger a = new AtomicInteger(1);
System.out.println("a = " + a.incrementAndGet());
}
}
接着使用Javac进行编译
javac A.java
最后使用JamVM与GDB进行断点测试
jamvm -cp . A
断点打在natives.c
的compareAndSwapInt
方法中,就可以分析调用栈了
CAS在JVM上的流程分析
CAS在Java层上的实现
CAS在Java中最后的实现都是sun.misc.Unsafe
类,而它几乎全部是Native方法,几乎没有什么可以分析,值得注意的是,在不同JVM中实现的过程不一定一样,但是最后肯定都有一个循环与CAS,下文是OracleJDK的实现
// 1. 开发者调用JDK
AtomicInteger.incrementAndGet();
// 2. JDK内部自旋并不断尝试(getAndAddInt)
int var5;
do {
var5 = getIntVolatile(this, offset);
} while(!compareAndSwapInt(this, offset, var5, var5 + 1));
return var5;
其中offset
是通过Unsafe方法获取var5
相对于AtomicInteger这个class的native内存地址偏移
上面的 getIntVolatile
与 compareAndSwapInt
均是native方法,其中this
指AtomicInteger
这个结构体
- this与offset: 表示内存中的数值
- var5: 是旧的数值
- var5+1: 是新的数值
在多线程下,如果没有竞争,1~2次就可以完成循环;在有一定竞争情况下,也就是memory被反复改,它通过不断自旋实现重试
CAS在native层的实现
下面是迷你虚拟机JamVM的unsafe.compareAndSwapInt的C语言实现,最终还是调用到了汇编,通过CPU硬件实现
首先分析objectFieldOffset
,它本质是获取Class的Slot中的“value”对应的偏移量
// 在Java中,获取结构体的偏移位置
// Field f = AtomicInteger.class.getDeclaredField("value")
// valueOffset = unsafe.objectFieldOffset(f);
//JamVM natives.c
uintptr_t *objectFieldOffset(Class *class, MethodBlock *mb, uintptr_t *ostack) {
FieldBlock *fb = fbFromReflectObject((Object*)ostack[1]);
*(long long*)ostack = (long long)(uintptr_t)
&(INST_DATA((Object*)NULL, int, fb->u.offset));
return ostack + 2;
}
在本文的 AtomicInteger
中,valueOffset为12常量,为什么要做这一步呢,因此CAS基本上都是native操作,需要直接操作内存,后续将通过valueOffset
获取到 value
接着分析compareAndSwapInt
// 在Java中,调用如下
// compareAndSwapInt(this, offset, var5, var5 + 1)
//JamVM natives.c
uintptr_t *compareAndSwapInt(Class *class, MethodBlock *mb, uintptr_t *ostack) {
long long offset = *((long long *)&ostack[2]);
unsigned int *addr = (unsigned int*)((char *)ostack[1] + offset);
unsigned int expect = ostack[4];
unsigned int update = ostack[5];
int result;
//调用平台CPU特定的汇编代码实现
result = COMPARE_AND_SWAP_32(addr, expect, update);
*ostack++ = result;
return ostack;
}
// x86-64 的汇编实现宏
#define COMPARE_AND_SWAP_32(addr, old_val, new_val) \
({ \
char result; \
__asm__ __volatile__ (" \
lock; \
cmpxchgl %4, %1; \
sete %0" \
: "=q" (result), "=m" (*addr)/*out*/ \
: "m" (*addr), "a" (old_val), "r" (new_val) /*in*/ \
: "memory"); /*Clobbers, reload from memory*/ \
result; \
})
通过使用CLion对源码进行文本搜索
compareAndSwapInt
方法,可以快速找到CAS的实现类
`
这里的汇编格式阅读起来就比较费劲了,你可以选择学一下它的格式(如果要看JVM代码,早晚都得学),并通过这里与这里的文档了解q,m,r
是什么意思
如果你不想看的话,直接上HopperDisassembler将二进制的进行反编译,注意这里正好操作是反过来的
compareAndSwapInt:
0000000000030e80 mov rcx, qword [ds:rdx+0x10]
0000000000030e84 mov rax, qword [ds:rdx+0x20]
0000000000030e88 add rcx, qword [ds:rdx+0x8]
0000000000030e8c mov rsi, qword [ds:rdx+0x28]
/* COMPARE_AND_SWAP_32 start */
0000000000030e90 lock cmpxchg dword [ds:rcx], esi
0000000000030e94 sete al
0000000000030e97 movsx rax, al
/* COMPARE_AND_SWAP_32 end */
0000000000030e9b mov qword [ds:rdx], rax
0000000000030e9e lea rax, qword [ds:rdx+0x8]
0000000000030ea2 ret
你可能需要知道各个寄存器的相关约定
最终COMPARE_AND_SWAP_32
的伪代码如下,如果兴趣非常强,可以看网上开源的Verilog实现
//下文的 <- 指赋值的意思,Intel文档也是这样写的
--INPUT:
rcx <- memory <- addr
al <- old_val
esi <- new_val
--OUTPUT:
rax <- result
rcx <-memory <- addr
--PROCEDURE:
//cmpxchg dword [ds:rcx], esi
IF al == rcx
THEN
ZF <- 1;
rcx <- esi;
ELSE
ZF <- 0;
al <- rcx;
FI;
//sete al
IF ZF == 1
THEN
al <- 0;
ELSE
al <- 1;
FI;
//movsx rax, al
rax <- al
return rax
用C语言简化是这样的
if(old_val == *addr){
*addr = new_val;
return true;
} else{
old_val = *addr;
return false;
}
详细汇编分析可以参考这篇类似的文章与Is x86 CMPXCHG atomic?
注意上面CPU里的Lock是用来锁多核下的Bus实现内存屏障,而不是高级语言中锁代码段的,只是碰巧名称类似而已
附录
CMPXCHG 的一些介绍
通过查询Intel文档,可以得知
Opcode | Instruction | Description |
---|---|---|
0F B0/r | CMPXCHG r/m8,r8 | Compare AL(accumulator, 累加器) with r/m8. If equal, ZF is set and r8 is loaded into r/m8. Else, clear ZF and load r/m8 into AL. |
调用方法
CMPXCHG DEST, SRC
伪代码
IF accumulator == DEST
THEN
ZF <- 1;
DEST <- SRC;
ELSE
ZF <- 0;
accumulator <- DEST;
FI;
CAS中ABA的问题
使用AtomicStampedReference
todo
简单的例子
自增
int a = 0
ReentrantLock lock = new ReentrantLock()
100.times {
new Thread(new Runnable() {
@Override
void run() {
try {
lock.lock()
a++;
} finally {
lock.unlock()
}
}
}).run()
}
println "a = $a"
与 synchronized 的区别
下面是使用object作为lock实现
int a = 0
def lock = new Object()
100.times {
new Thread(new Runnable() {
@Override
void run() {
synchronized (lock){
a++
}
}
}).run()
}
println "a = $a"
在功能上,ReentrantLock 比 synchronized 多了tryLock,超时,公平调度等额外功能
在native实现上
- synchronized使用了Object中的monitor作为lock,调度通过OS实现(比如pthread_mutex等调用)
- ReentrantLock使用了AQS实现Java层的公平与非公平调度
在Groovy中通过闭包去掉模版代码
参考这里: http://blog.johanneslink.net/2011/10/25/simplified-use-of-locks-in-groovy/
CountDownLatch
英文名词
- predecessor| ˈpriːdɪsɛsə |: 前任者; 先輩. eg: the Prime Minister learned from his predecessor's mistakes.
- latch: 门闩,CountDownLatch表示自动倒计时关上的门锁
- elide | ɪˈlʌɪd |: 省略. eg: the null check may be elided.
应用
CountDownLatch一般用于异步转同步操作,比如SpringCloud/ElasticSearch中在主线程实现阻塞等待Response
比如SpringCloud中的Feign框架利用Rxjava等待异步网络请求,并转为同步操作
return command.submit(request)
// 类似于前端的await,在Java中用CountDownLatch实现
.toBlocking()
.single();
再比如ElasticSearch客户端中基于Netty异步请求,同样也是基于await实现的
CountDownLatch是并发编程中通过CAS队列包装实现的锁,先举个例子(Groovy代码)
def s = Executors.newCachedThreadPool()
def latch = new CountDownLatch(3)
3.times{
s.execute(new Runnable() {
@Override
void run() {
try {
println "start ${it}"
TimeUnit.SECONDS.sleep(2)
println "end ${it}"
}catch (Exception e){
e.printStackTrace()
}finally{
latch.countDown()
}
}
})
}
latch.await()
println "latch complete"
通过执行上面的Groovy代码段可以发现,无论executor线程池中怎么运行,只要不阻塞死,主线程一定会在所有线程池计数归零时才开始运行。
CountDownLatch的应用
App启动页
在某些App启动时,既要加载广告,又要加载首屏数据,涉及到两个请求,这时可以让两个线程同时跑,主线程
流式/链式调用回掉
WatchDog
CountDownLatch的源码分析
CountDownLatch代码很少,去掉注释只有100多行。它本质上是对队列AbstractQueuedSynchronizer
的包装,底层通过CAS实现了原子性,其中CAS之前文章已经写过了,本质是自旋与CPU硬件实现。
AbstractQueuedSynchronizer分析
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 此处通过包装CAS实现原子性
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Unsafe.compareAndSwapObject在JVM侧的C实现
其实这部分以前已经分析过,下面的代码是JamVM的实现,本质上是CPU硬件帮你把事情搞定了
// unsafe.compareAndSwapObject(node, nextOffset, expect, update)
uintptr_t *compareAndSwapObject(Class *class, MethodBlock *mb,
uintptr_t *ostack) {
long long offset = *((long long *)&ostack[2]);
uintptr_t *addr = (uintptr_t*)((char *)ostack[1] + offset);
uintptr_t expect = ostack[4];
uintptr_t update = ostack[5];
int result;
// 此部分通过 __asm__ 汇编代码段实现
result = COMPARE_AND_SWAP(addr, expect, update);
*ostack++ = result;
return ostack;
}
ReadWriteLock
读写锁
真实项目使用
- Mybatis的缓存组件
- Redisson(基于Redis)的读写锁
Synchronization is not guaranteed to work unless both read and write operations are synchronized.
the implementation of Synchnornized in JVM
快速结论
在JamVM中的实现如下
- 通过Object中的C结构体的
object->lock->monitor
实现锁的标志 - 调度通过pThread的
pthread_mutex_lock
等操作进行lock与unlock,相当于把最难的调度通过系统调用甩锅给操作系统了 - Synchnornized是java中互斥锁的实现,用于对并发编程进行支持
Object的线程方法
在了解Java的同步之前,先复习一下Java的线程模型,在Java中,使用Object作为最常用的锁。在Object中,有许多native方法,主要如下
函数 | 功能 |
---|---|
wait() | 释放monitor与时间片,只能超时或者有其他线程notify才能恢复 |
sleep() | 释放时间片,但是持有monitor |
notify() | 通知调度器唤醒等待此monitor的BLOCKED线程队列中的一个,转为blocked状态 |
notifyAll() | 唤醒所有等待此monitor的线程队列,转为blocked状态 |
线程的状态有如下几种
提醒一下,被
notify()
调用后是进入了block的队列,需要通过调度器“摇号”挑选其中一个后才能进入同步区块
互斥锁(Synchnornized)
互斥锁在其它编程语言中一般叫做Mutex(|mjuːtex|)
,用于实现对共享资源的独占性,当进入同步代码块时,获取并独占锁,离开时释放锁。在这段时间中,其它线程访问此资源时,会成为上图的BLOCKED状态。
在Java中线程是通过对C中的pThread包装实现的,pThread接口通过systemcall在内核实现,因此线程的管理本质都是在进行系统调用,有一定的切换消耗,故Synchnornized是一种比较重的锁。
在Java中,Synchnornized可以看作一种语法糖,不需要自己去判断、配置Mutex,只需要用synchnornized(object){}
进行包裹即可,接下来我们对这个字段进行一下入门。
1. 对象同步
对象同步中有两种方法,第一种是通过代码块{}
的包裹,编译时通过MONITORENTER
与MONITOREXIT
将Sample.this
作为锁对象;第二种是为method添加access_flag
为synchronized,JVM在执行时将根据FLAG自动加\减锁,如下两种写法在JVM中调用是效果相同的
public class Sample {
public void do_work() {
synchronized (Sample.this) {
//do synchronized work
}
}
public synchronized void do_work() {
//do synchronized work
}
}
详见: Java中非static的synchronized方法和synchronized(this)用的是一个锁么?。
synchronized在方法中不会被继承,虽然继承synchronized方法有点诡异
再说个题外话,synchronized的锁范围越小越好,不建议放在方法上作为修饰,而是使用
synchronized (xxx.this)
代码块。common-pool旧版代码中就出现过此问题,导致上游业务出现死锁。
2. 类同步
类同步的对象是Sample.class
,以下两种是相同的
public static synchronized do_work(){
//do synchronized work
}
public void do_work(){
synchronized (Sample.class){
//do synchronized work
}
}
单例的同步方法看这里:http://www.jianshu.com/p/eebcb81b1394
3. synchronized的应用
除了网上用烂的单例同步,多线程for循环自增,并发HashMap等例子外,还可以再举例
3.1. 释放锁与时间片
我曾经在OkHttp中,介绍过Socket的KeepAlive连接自动清理的实现,这里使用了wait进行阻塞,避免无谓的自旋(Spin-waiting)损耗。
while (true) {
//执行清理并返回下场需要清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
synchronized (ConnectionPool.this) {
try {
//在timeout内释放锁与时间片
ConnectionPool.this.wait(TimeUnit.NANOSECONDS.toMillis(waitNanos));
} catch (InterruptedException ignored) {
}
}
}
}
注意wait在Java中一般都是需要在while循环中的,否则你的wait可能被路人线程给wakeup了,上文代码就是Best practice。
3.2. Dead Lock
虽然同步简化了编程,但是还有可能发生死锁,如下例子,当两个不同线程分别调用a.swap(b)
与b.swap(a)
时,在getValue()
时可能发生死锁。
//code from <Concurrent Programming in Java>
class Cell { // Do not use
private long value;
synchronized long getValue() { return value; }
synchronized void setValue(long v) { value = v; }
synchronized void swapValue(Cell other) {
long t = getValue();
long v = other.getValue();
setValue(v);
other.setValue(t);
}
}
Monitor(管程)
重点来了!管程是JVM中的互斥锁的实现,在Java中,Object.wait/notify,以及线程的wait/notify,synchronized生成的字节码MONITORENTER/EXIT等操作在JVM中都是通过Monitor控制与实现的。
在JVM中,每个java.lang.object
对象对应的C结构体均拥有一个lock指针,指针对应一个Monitor(也就是说,对象、lock指针与Monitor是一一对应的)。一个Monitor只能同时被一个线程持有。
管程的工作机制如下图
本图来自于维基百科,可以看作是对阻塞式条件变量
的一种封装
e
表示入口队列的线程,它将尝试获取对象的锁,处于blocked状态q
表示等待队列的线程,处于waiting状态wait
表示调用pThread_wait()
进行等待,进入q
队列,大多数由开发者手动调用与恢复,下文暂不考虑notified
表示调用pThread_notify()
后进入e
队列enter
与leave
表示独占线程对锁的获取与释放
比如现在有线程队列e
同时竞争一个对象,第一个跑的最快的线程enter后,立刻获取到了object的monitor,剩下的线程由于无法获得到monitor,就在e
队列中阻塞等待锁的释放。
enter the monitor:
enter the method
if the monitor is locked
add this thread to e
block this thread
else
lock the monitor
当任务完成后,调用调度器
leave the monitor:
schedule
return from the method
调度器将会释放monitor,并通过某种调度公平的调度策略(可能是FIFO,也可能是优先权值等)将monitor分配给下一个处于blocked态的线程。
schedule :
if there is a thread on e
select and remove one thread from e and restart it
(this thread will occupy the monitor next)
else
unlock the monitor
(the monitor will become unoccupied)
以Android6.0的vm为例,参考源码如下:
使用Object作为锁好不好?
Java中,通过设置Object结构体中的某个字段映射为锁,所有对象都可以成为锁,这样固然简化了编程,但是相比Ruby等语言使用Mutex作为专用锁,Java所耗用的结构体内存更多
参考
- http://android.group.iteye.com/group/wiki/3083-java-sync-communication
- http://ibruce.info/2013/12/07/java-interview-questions-concurrency/
- https://zh.wikipedia.org/wiki/%E7%9B%A3%E8%A6%96%E5%99%A8_(%E7%A8%8B%E5%BA%8F%E5%90%8C%E6%AD%A5%E5%8C%96)
- https://www.ibm.com/developerworks/cn/java/j-threads/
下面是在业务中常见的并发问题,优先用findBugs等工具扫描,并清空IDE的黄色报警,否则...
错误的单例
下面的double-lock单例代码是被FindBugs扫出的,改正如下。这里在底层主要是为汇编码加入了lock
指令防止指令重排
public class AESUtil{
- private static AES aes;
+ private static volatile AES aes;
static public AES getAES(){
if(aes == null){
synchronized(AESUtil.class){
if(aes == null){
aes = new AES();
}
}
return aes;
}
}
其实还有更简单的单例方法,就是在static代码段中构造field的实例化,由于ClassLoader加载class时`<clinit()>`阶段由JVM保证线程安全的,因此可以放心用,比如JDBC各种驱动就是这样初始化的。
错误的DataFormat
这个是在进行报表时出现的,当报表耗时大于定时任务间隔时,两个报表服务就会同时执行。如果`SimpleDateFormat`使用的是同一个,那么它的parse方法可能出现报错。具体原理我没有分享JDK源码,可能是它内部eval的状态机不是线程安全的。
解决方法: 1. 每次new一个 2. 使用ThreadLocal
title: Copy-on-write的介绍与应用 categories:
- 数据结构
本文目录
- 什么是写时复制
- 写时复制的应用场景
- 写时复制的实现
在并发编程中,如果需要实现对资源的冲突处理,一般采用互斥锁,队列、不可变来实现。上面的技术实现在很多书籍中都有,不过今天介绍的是一种新的方法--写时复制(Copy-on-write, COW)
。
关键词: COW, Copy on write, Redis
如果看的懂英文就直接看这里:
Copy on write (COW) is an optimization strategy that avoids copying large sized objects.
In a lot of real world programs, a value is copied to another variable and often is never written to. In most languages other than C++, all large sized objects are actually references. When you copy an object, all you copy is a pointer (shallow copy semantics). In such languages, COW is implemented at the language/runtime level, and not in the standard library.
In C++, copies are deep copies by default (value semantics), thus assigning large structures and strings are expensive, because the entire data is duplicated.
To avoid this, one can make a system where a copy is always shallow, but when you modify a copied object, the underlying object is duplicated, and then the changes are applied to the new copy.
总的来说,COW通过浅拷贝(shallow copy)只复制引用而避免复制值;当的确需要进行写入操作时,首先进行值拷贝,再对拷贝后的值执行写入操作,这样减少了无谓的复制耗时。
特点如下
- 读取安全(但是不保证缓存一致性),写入安全(代价是加了锁,而且需要全量复制)
- 不建议用于频繁读写场景下,全量复制很容易造成GC停顿,因此建议使用平时的ConcurrentXX包来实现。
- 适用于对象空间占用大,修改次数少,而且对数据实效性要求不高的场景。
这里的安全指在进行读取或者写入的过程中,数据不被修改。
写时复制的应用场景
写时复制最擅长的是并发读取场景,即多个线程/进程可以通过对一份相同快照,去处理实效性要求不是很高但是仍然要做的业务(比如实现FS\DB备份、日志、分布式路由),举例如下。
1. Unix下的fork()系统调用
fork()是一个系统调用,用于创建新的进程(process)。
fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent.
在以前的文章中说过,fork内部实际上是对clone()系统函数的调用,它的参数CLONE_FLAG
决定了需要共享哪些数据。在fork中,没有CLONE_VM
参数,也就意味着不会共享\竞争同一个内存,而是复制一个内存快照给子进程,这个内存在32位下是4G的大小,占用空间相当的大,如果通过类似memcpy进行内存复制的话,fork调用的耗时将相当显著,甚至阻塞业务,那么为什么在真正开发调用时却没有发生呢?因为内部也是通过COW机制实现的。
内核实现:
在内核侧,在进行了内存“复制”后,子进程与父进程指向同一个只读的Page分页。当子进程或者父进程发送修改内存请求后,由于是分页是只读的,OS此时才将内存进行复制为两份,并将这两份内存设置为可写权限,最后再处理刚刚发送的修改内存请求。通过上述策略,实现了延迟复制,进程的创建是不是变快了?
2. Redis的持久化
Redis是一个基于KV的MemCache框架,可以将数据全部存储在内存中,特别适用于抢购、红包等高并发场景,当你希望对数据进行全量Dump(bgsave)到文件中或者进行主从同步时,将进行下面的步骤。
- Redis forks. We now have a child and a parent process.
- The child starts to write the dataset to a temporary RDB file.
- When the child is done writing the new RDB file, it replaces the old one.
可以看出,Redis通过fork()系统调用实现了写时复制,而没有自己去造轮子
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
start = ustime();
//指向子线程的pid如果为0,表示fork成功,为正表示为parent线程
if ((childpid = fork()) == 0) {
int retval;
/* Child进程要执行的代码 */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
...
return C_OK;
}
return C_OK; /* unreached */
}
在rdbSave中(目前已经为子线程中),具体实现如下,代码太长就不贴了
- 创建了一个
temp-${getPid()}.rdb
的文件 - 调用
rioInitWithFile(rio *r, FILE *tmp)
,将r
初始化为rioBufferIO
- 对全局变量
server
进行forEach反序列化,并保持到缓存r中,并写入文件,注意这个Server指针已经与父进程无关了 - 进行fflush、fsync、fclose系统调用清除OS的FS缓存(这也是OS内部的COW优化)
- 进行
rename
系统调用,进行重命名
系统调用都是默认线程安全的,所以不用担心多次重命名等问题
可以看出,在Redis中没有memcpy等内存复制过程,而是直接使用server指针进行读取并写入文件,因为在fork时,已经duplicated了快照。
写时复制的实现
以Java为例,在CopyOnWriteArrayList中,写数据在锁的保护下,而读取可以任意进行,代码如下。
private transient volatile Object[] array;
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//类似于memcpy,构造一个新的对象
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//重新设置引用
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public E get(int index) {
//获取到的数据没有实效性
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}
其它可能需要深入了解的技能
#####1. 如何实现String\Map的写时复制?
这个一般只在糟糕的面试题中出现,因为写时复制主要用于处理大的数据,而大型的字符串、Map却很少见到场景(如果说非要来一个场景的话,就是Zookeeper中读取服务时,可能需要一个Map<String,Class>来实现)。在C++中,写时复制的String已经被废弃,并且Redis中设计的字符串可以更加优雅地扩容,在Java中,各类并发库已经很成熟,写时复制主要用于实现安全迭代,而没有String或者Map的需求。
如果非要让你写,可以这样处理:
- 在构造函数、写入函数中实现深拷贝,并加锁,比如put中就再包装一道HashMap。
- 在getter函数,实现无锁直接获取。
#####2. ConcurrentHashXXX与CopyOnWriteXXX的对比?
一个适用于写入量大的场景,一个适用于读取量大的场景,它们的线程安全关系如下
Normal | Concurrent | COW | |
---|---|---|---|
Read | Unsafe | Safe | Safe, may dirtyData |
Write | Unsafe | Safe | Safe, may slowest |
Ref
- https://zh.wikipedia.org/wiki/%E5%AF%AB%E5%85%A5%E6%99%82%E8%A4%87%E8%A3%BD
- http://ifeve.com/java-copy-on-write/
- http://www.ibm.com/developerworks/tivoli/library/t-snaptsm1/
- http://blog.csdn.net/jason314/article/details/5640969
- https://www.reddit.com/r/compsci/comments/31szui/trying_to_understand_fork_and_copyonwrite_cow/
- http://stackoverflow.com/questions/1570589/is-the-volatile-keyword-required-for-fields-accessed-via-a-reentrantlock
- https://www.quora.com/What-is-Copy-on-Write-and-how-is-it-used-in-C++
详见 PaaS