本书介绍

Java体系

要求熟悉以下

  • JVM实现: 最好看过C源码,具体有ClassLoader,线程池,锁,NIO、反射
  • GC: 常见GC的算法与区别
  • EE方向: JMX, HotDeploy
  • 各种业务引擎: RuleEngine, TaskEngine, BPM

本文直接分析JVM源码

使用Clion(GDB)调试小型JVM源码

详见使用Clion(GDB)调试小型JVM源码

JVM与源码阅读工具的选择

初次学习JVM时,不建议去看Android Art、Hotspot等重量级JVM的实现,它内部的防御代码很多,还有android与libcore、bionic库紧密耦合,以及分层、内联甚至能把编译器的语义分析绕进去,因此找一个教学用的、嵌入式小型的JVM有利于节约自己的时间。因为以前折腾过OpenWrt,听过有大神推荐过jamvm,只有不到200个源文件,非常适合学习。

在工具的选择上,个人推荐SourceInsight。对比了好几个工具clion,vscode,sublime,只有sourceinsight对索引、符号表的解析最准确。

sourceInsight的配置

阅读C代码除了Clion外,还推荐使用sourceInsight,这个软件一般在H等硬件厂商用的多,虽然很丑但是速度很快,使用教程见这里

  • 记得在View中打开Reference与ContextWindow
  • 使用时建议关闭浏览器与各种套壳应用(否则Reference会很慢)

什么是MarkSwip

虽然以前在各种文档、面试题中看过这个算法(标记清除算法),但是它完整的定义是这样的:

标记清除-将所有活动对象都做上标记,接着将没有标记的(非活动)对象进行回收的过程。

在Java中,

伪代码如下

funtion(obj){
  return 
}

引用计数法(Refference Count)

在Python、iOS中,使用了引用计数法作为GC机制(当然iOS中的更高级,叫做ARC,是在编译时就搞定了自动控制内存)

如何分析full-gc

ClassLoader

ClassLoader也就是类的加载器

比如

  • Spring与Quartz的ResourceLoaderClassLoadHelper
  • Mybatis中的ClassLoaderWrapper

Java Class文件的结构

在*.class文件中,以Byte流的形式进行Class的存储,通过一系列Load,Parse后,Java代码实际上可以映射为下图的C结构体,这里可以用javap -v -p命令或者IDE插件进行查看。

typedef struct {
    u4             magic;/*0xCAFEBABE*/
    u2             minor_version; /*网上有表可查*/
    u2             major_version; /*网上有表可查*/
    u2             constant_pool_count;
    cp_info        constant_pool[constant_pool_count-1];
    u2             access_flags;
    u2             this_class;
    u2             super_class;
    u2             interfaces_count;
    u2             interfaces[interfaces_count];
    //重要
    u2             fields_count;
    field_info     fields[fields_count];
    //重要
    u2             methods_count;
    method_info    methods[methods_count];
    u2             attributes_count;
    attribute_info attributes[attributes_count];
}ClassBlock;
  • 常量池(constant pool):类似于C中的DATA段与BSS段,提供常量、字符串、方法名等值或者符号(可以看作偏移定值的指针)的存放

  • access_flags: 对Class的flag修饰

      typedef enum {
      	ACC_PUBLIC = 0x0001,
      	ACC_FINAL = 0x0010,
      	ACC_SUPER = 0x0020,
      	ACC_INTERFACE = 0x0200,
      	ACC_ACSTRACT = 0x0400
      }AccessFlag
    
  • this class/super class/interface: 一个长度为u2的指针,指向常量池中真正的地址,将在Link阶段进行符号解引。

  • filed: 字段信息,结构体如下

    typedef struct fieldblock {
       char *name;
       char *type;
       char *signature;
       u2 access_flags;
       u2 constant;
       union {
           union {
               char data[8];
               uintptr_t u;
               long long l;
               void *p;
               int i;
           } static_value; 
           u4 offset;
       } u;
    } FieldBlock;
    
  • method: 提供descriptor, access_flags, Code等索引,并指向常量池:

    它的结构体如下,详细在这里

      method_info {
          u2             access_flags;
          u2             name_index;
          //the parameters that the method takes and the 
          //value that it return
          u2             descriptor_index;
          u2             attributes_count;
          attribute_info attributes[attributes_count];
      }
    

以上具体内容可以参考

  1. JVM文档
  2. 周志明的《深入理解Java虚拟机》,少见的国内精品书籍
  3. 一些国外教程的解析

Class与Object

typedef struct object Class;

typedef struct object {
   uintptr_t lock;
   Class *class;
} Object;

本文指的Class

前提: 已经获取到Class结构体对应的指针

下面是经过删减与注释的代码(删去了状态判断、Lock与异常处理),并替换宏变量为字符串

// class.c
Class *initClass(Class *class) {
   ClassBlock *cb = CLASS_CB(class);
   ConstantPool *cp = &cb->constant_pool;
   FieldBlock *fb = cb->fields;
   MethodBlock *mb;
   Object *excep;
   int state, i;

   linkClass(class);

   cb->state = CLASS_INITING;
   cb->initing_tid = threadSelf()->id;

   if(!(cb->access_flags & ACC_INTERFACE) && cb->super
              && (CLASS_CB(cb->super)->state != CLASS_INITED)) {
      initClass(cb->super);
   }

   /* Never used to bother with this as only static finals use it and
      the constant value's copied at compile time.  However, separate
      compilation can result in a getstatic to a (now) constant field,
      and the VM didn't initialise it... */

   for(i = 0; i < cb->fields_count; i++,fb++)
      if((fb->access_flags & ACC_STATIC) && fb->constant) {
         if((*fb->type == 'J') || (*fb->type == 'D'))
            fb->u.static_value.l = *(u8*)&(CP_INFO(cp, fb->constant));
         else
            fb->u.static_value.u = resolveSingleConstant(class, fb->constant);
      }

   if((mb = findMethod(class, "<clinit>", "()V")) != NULL)
      executeStaticMethod(class, mb);
 
   return class;
}

贴出上面主要是为了让你明白

  • 调用linkClass进行连接
  • 调用super的class中的initClass
  • 调用<clinit> 方法,也就是static代码段

上面部分实际上是对Class进行模式匹配(pattmatch)的遍历,伪代码如下

(define initClass
  (lambda (exp)
    (linkClass exp)
    (match [(?isSuperInited superClass) (initClass superClass)])
  	(clinit exp)))

在平时开发中,只需要背住就可以了。

Class与依赖注入

很多人认为学习JVM是“高手”才去做的,平时写业务时没用,下面举一个Spring断点调试技巧。在分析Spring的依赖注入时,很多人看到复杂源码就无法接着分析了。其实可以这样,首先在Bean中加入static代码段,并打上断点,然后启动程序。

@Bean(name="SSR")
public class DemoBean{
    static{
      	// 此处打上断点
        System.out.println("class loaded");
    }
  	....
}

等断点跳到这里时,可以发现是Class.newInstance()方法被调用,进而调用<clinit>,此时再向上分析Spring的代码堆栈,阅读源码与流程就轻而易举了。

附录

下面2个是常见面试题

求打印顺序

实例化AChildChild后,求输出顺序

public class AParent {
    static {println("AParent clinit");}//1
    public AParent() {System.out.println("AParent init");}//4
}
public class AChild extends AParent {
    static {println("AChild clinit");}//2
    public AChild() {println("AChild init");}// 5
}
public class AChildChild extends AChild {
    static {println("AChildChild clinit");}//3
    public AChildChild() {println("AChildChild init");}//6
}
AChildChild acc = new AChildChild();

其中1,2,3的顺序本文已经可以解释,4,5,6下次讲解init时进行分析

JDBC的加载

下图是加载mysql的例子,当程序员调用Class.forName时,static代码段就会执行

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    static {
        try {
            java.sql.DriverManager.registerDriver(new Driver());
        } catch (SQLException E) {
            throw new RuntimeException("Can't register driver!");
        }
    }
    public Driver() throws SQLException {}
}

JUC(java.util.current)、线程安全与并发的综述

本文先入一个门,方便学习并发

  • 线程安全的实现
  • 线程,线程池,JVM内部的实现
  • CAS、Sync(以及区别)
  • 平时遇到的并发问题

什么是并发(Concurrent),什么是并行(Parallels)?

这个概率比较混乱,在书上与StackOverFlow上答案各有各的说法。以下是一本专业书籍的引用

并发程序是指可以被同时发起执行的程序。而并行程序则是被设计成可以在并行的硬件上执行的并发程序。换句话说,并发程序代表了所有可以实现真正的或者可能的并发行为的程序。它是一个比较宽泛的概念。这其中包含了并行程序。并行程序是并发程序中的一种。 -《Go并发编程实战》

总的来说,就是并行需要硬件支持,并发同时可以用硬件或者时间片模拟。因此并行是并发的子类。

详细可以参考parallel-vs-concurrent

线程安全的实现

线程安全与并发似乎天天都遇到,

1. 无状态函数

无状态函数是指无副作用的函数,也就是函数式编程中所谓的“纯函数”,比如map,filter,它们在LISP语言中叫做Lambda表达式。因为不涉及共享变量,所以总是线程安全的。

2. Final不可变

3. 锁

3.1. CAS锁

3.2. 互斥锁

书籍推荐

  • 七周七并发模型
  • Java虚拟机并发编程

final变量

fina指read-only的意思,很容易理解

简单入门

class Student {
    final String name

    Student(String name) {
        this.name = name
    }
}

Student student = new Student("smith")
// 报错
student.name = "changed"

为什么String是final的

String通过final class 与 final array表示String即无法继承,也无法修改,让String在语义上保证不可变性

ThreadLocal

ThreadLocal是线程本地独有可见的变量(各管各的),一般用于上下文管理

简单入门

举个常见的反序列化例子

private static ThreadLocal<SimpleDateFormat> f = new ThreadLocal<SimpleDateFormat>() {
    @Override
    protected SimpleDateFormat initialValue() {
        return new SimpleDateFormat("yyyy-MM-dd");
    }
};

原理

threadLocal看起来像是在“栈”上,实际上它还是Thread的成员变量,还是在堆上的,每次实际上通过访问如下获得

//类型如下
//Thread
//ThreadLocal.ThreadLocalMap
//ThreadLocal.ThreadLocalMap.Entry[]
//T
Thread.currentThread().threadLocals.table[i].value

其中

  • i: 通过ThreadLocal的全局成员变量nextHashCode进行自增,每多一个ThreadLocal实例,自增一次
  • value: 通过initialValue传入的对象

在企业开发中的作用

ThreadLocal是线程本地独有可见的变量,一般用于上下文管理

比如

  • 管理Spring-transaction: 数据库事务
  • 管理BPM流程上下文: 这个主要是在企业级应用中使用,自己跳自己的伞,一个线程走到底
  • 业务路由: Tomcat中的getRequestDispatcher进行重定向,比如资源静态化的Filter

在序列化的作用

在进行序列化与反序列化时,内部一定会通过一个状态机来维护上下文,如果它不是线程安全的,那么状态机一定会出问题

  • SimpleDataFormat.parse(不是线程安全)
  • BigDecimal.toString(线程安全)
  • String.<init>(线程安全)

线程/进程区别

通过strace分析两个C语言写的forkpthread_create,最终可以发现是系统调用clone的共享FLAG不用

a review for thread mode


本文首先介绍了一些线程基础,比如并发、并行、内存分配、系统调用、POSIX线程。接着通过strace分析了线程与进程的区别。最后以Android、Golang等线程模型进行了分析。

1. 基础

1.1. OS下如何进行内存分配?用户区与内核区有什么区别?

在32位的Linux操作系统中,当一个进程启动后,将被分配4G的虚拟内存。内存可以分为两个空间,一个是用户空间(0~3G),另一个是内核空间(3G~4G)。其中用户空间就是代码运行的空间,比如堆栈、BSS(未初始化数据段)、DATA(已经初始化数据段)、TEXT(代码二进制段);而在内核空间中,是OS内核的映射,只有在执行syscall系统调用时,才能进行重写。

32 Bit OS Virtual Memory

在用户态中,执行用户代码,比如直接运行C程序、或者运行JVM虚拟机等。

在内核中,主要负责I/O(显示,层三以下的网络,FS),Memory(虚拟内存,页面替换/缓存), Process(信号、线程/进程管理,CPU调度)的管理,直接控制CPU、内存等硬件,权限(privilege)非常大;

1.2. 系统调用中断(SCI)是什么?

系统调用是用户与内核间的一个桩(stub),当在用户态执行高权限任务,需要通过系统调用切换入内核态去执行最底层任务。比如在C语言中调用getTime()时,大致流程如下

1. app method(User Application)
    |
    |调用stdlibc标准库
    |
2. systemcall_stub(std libc)
    |
    |系统调用,进入内核态
    |
3. system_call_table[call_number](Kernel)
    |
    |通过查表调用硬件函数
    |
4. hardware_call(Kernel)
  1. 在App层面,开发者不需要自己写系统调用,系统会提供相关C标准库的SDK供开发者使用,比如开发者调用getTime()时,实际是使用了标准库的time.h头文件。
  2. 代码在执行时,OS自动加载标准库。比如在android的bionic库中,实际执行getTime的系统调用是这里的平台相关的汇编代码,将系统调用的ID、参数传入内核。
  3. 内核通过系统调用ID进行表的索引,寻找真正的硬件调用函数
  4. 进行硬件相关的调用

在Mac下打开ActivityManager或者在Terminal中运行top,就可以显示地看到用户与系统的CPU占用

User and Kernel CPU usage

1.3. POSIX线程模型

POSIX是IEEE P1003.1中的线程标准,目前所有的系统,甚至windows都支持POSIX。它提供了用户态下的线程编程接口,开发者在进行线程开发时,只用引用pthread.h头文件调用即可。程序在运行时通过系统调用,在内核中进行线程的实现。它有很多函数,比如create, exit, join, yield等,具体可以去各个平台下的libc源码/sdk中去看Header文件中方法的定义,比如android中使用biolibc中pthread.h的代码在这里,这里的头文件是对内核线程的包装。

2. 线程与进程的区别

这是一道经典的面试题,大多数回答者都是回忆起当初学习操作系统课本中的知识。然而课本中太偏向于内核,从开始就学习内核底层,而脱离开发,笔者认为是不太明智的。因此通过设计一个系统调用栈的分析,让读者有更清晰的了解。

本文线程特指32位下使用glibc的Linux系统中的POSIX模型,即用户面线程,进程特指unstd.h中的fork产生的进程。

本测试基于Ubuntu 14.04 i386

1. 测试代码设计

1.1. 线程测试代码

//modified from https://computing.llnl.gov/tutorials/pthreads/samples/hello.c

//todo run:
//clang -Wall -g pthread.c -o pthread.out -lpthread
//strace -Cfo ./pthread.strace.log ./pthread.out

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void*
PrintHello(void *threadid)
{
   long tid;
   tid = (long)threadid;
   printf("Hello World! It's me, thread #%ld!\n", tid);
   pthread_exit(NULL);
}

int 
main(int argc, char *argv[]){
   pthread_t thread;
   int rc = 0;
   long t = 0;
   printf("In main: creating thread %ld\n", t);
   //注意这里是一个函数指针,不要傻眼了
   rc = pthread_create(&thread, NULL, PrintHello, (void *)t);
   if (rc){
     exit(-1);
   }
}

1.2. 进程测试代码

//todo run:
//clang -Wall -g fork.c -o fork.out
//strace -Cfo ./fork.strace.log ./fork.out

#include <unistd.h>

int 
main(int argc, char *argv[])
{
  pid_t pid;
  pid = fork();
  if(pid < 0){
    return -1;
  }    
  return 0;
}

2. 测试结果

在编译完成后,调用strace命令后,结果如下

2.1. 进程的strace路线如下

19948 execve("./fork.out", ["./fork.out"], [/* 68 vars */]) = 0
19948 brk(0)                            = 0x9bc000
19948 open("/lib/x86_64-linux-gnu/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
19948 read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\37\2\0\0\0\0\0"..., 832) = 832
.....
19948 clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5adac4ca10) = 19949
....
19949 +++ exited with 0 +++

2.2. 线程的strace路线如下

21958 execve("./pthread.out", ["./pthread.out"], [/* 68 vars */]) = 0
21958 open("/lib/x86_64-linux-gnu/libpthread.so.0", O_RDONLY|O_CLOEXEC) = 3
....
21958 access("/etc/ld.so.nohwcap", F_OK) = -1 ENOENT (No such file or directory)
21958 open("/lib/x86_64-linux-gnu/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
21958 read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\37\2\0\0\0\0\0"..., 832) = 832
21958 fstat(3, {st_mode=S_IFREG|0755, st_size=1845024, ...}) = 0
21958 mmap(NULL, 3953344, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f34229e4000
....
21958 clone(child_stack=0x7f34229e2fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f34229e39d0, tls=0x7f34229e3700, child_tidptr=0x7f34229e39d0) = 21959
....
21958 +++ exited with 0 +++

3. 测试结论

通过上述的调用栈分析,可以得知均是通过调用x86_64-linux-gnu下的libc库,接着通过systemcall函数clone()实现对内核Process的控制,主要区别在函数参数中clone_flag上的不同,clone_flag指定了可以共享的资源,下图显示了进程与线程的flag

//clone flag between thread and process
//⚠️: 省略了`CLONE_`前缀

//进程的FLAG参数
flags=CHILD_CLEARTID|CHILD_SETTID|SIGCHLD

//线程的FLAG参数
flags=VM|FS|FILES|SIGHAND|THREAD|SYSVSEM|SETTLS|PARENT_SETTID|CHILD_CLEARTID

通过对clone进行man查询,解释如下

进程的参数解释:

  • CLONE_CHILD_CLEARTID: Erase child thread ID at location ctid in child memory when the child exits, and do a wakeup on the futex at that address。
  • CLONE_SETTLS: thread local storage (TLS) area,注意这个不可移植
  • CLONE_SIGHAND: 共享signal handlers

线程的一些参数解释:

  • CLONE_VM: the calling process and the child process run in the same memory space. (注意这里说的是memory space,指通过mmap()分配的内存。再多说一点,线程中的栈内存由pthread_attr_t属性中的pthread_attr_setstacksize()函数实现,默认可能为8MB(你可以运行ulimit -a查看最大值),当然在实际中我们使用栈内存大多都是几KB而已;堆内存是共享的,这里不讨论)
  • CLONE_FS: 共享文件系统,如下函数chroot(2), chdir(2), or umask(2)会被影响。
  • CLONE_FILES: 共享file descriptor table
  • CLONE_SIGHAND: 共享signal handlers
  • CLONE_THREAD: 共享thread group,即有相同的PID,独立的TID;
  • CLONE_SYSVSEM: 共享System V semaphore undo values列表,俺表示目前还不懂。
  • CLONE_SETTLS: thread local storage (TLS) area,注意这个不可移植
  • CLONE_PARENT_SETTID: Store child thread ID at location ptid in parent and child memory.
  • CLONE_CHILD_CLEARTID: Erase child thread ID at location ctid in child memory when the child exits, and do a wakeup on the futex at that address。

接着结合一些教科书,可以得知最终结论

进程线程
用户层函数fork()pthread_create()
内核实现clone()clone()
内存新复制的内存(Copy-on-Write),独立4G(1G+3G)共享4G内存:其中8M左右的栈内存是私有的,可以通过参数决定;共享堆内存
创建耗时复制的flag少,所以耗时多
上下文切换耗时主要是切换内存空间几乎只有进出内核的损失
内部通信IPC(Socket, Pipe, ASM...)共享的数据段(比如说DATA段的全局变量,更简单)
举例Redis备份,运行含全局锁的脚本语言...I/O Select 的消息处理、线程池...

高级语言对内核线程的封装实现

除了通过POSIX标准外,高级语言也可以自己通过系统调用对内核的线程进行实现,主要有如下三种。

1. 纯内核线程实现(1:1)

此线程模型将内核线程与App线程一一对应,可以看作为一种简单的映射关系,这里的代表有POSIX线程模型(pthread),以及依赖pThread标准库的Java与Ruby(1.9+)线程模型。

以在Android/ARTJvm下创建线程为例,具体实现调用栈如下

java.lang.Thread
    |
POSIX thread(user mode){
    0. art.runtime.Thread::CreateNativeThread(cpp, in jvm)
    1. pthread_create(pthread.h,标准库头文件)
    2. bionic标准库下的so文件,进行SystemCall(libc)
    3. 用户态陷入内核态
}
    |
Kernal thread(kernal mode)

可以看出,在JVM下的实现主要是对POSIX线程的包装与映射,自己本身只是做了点微小的工作,特点如下:

  1. 移植性较差,需要适配各种libc库,但是由于被OS直接管理,因此在分配任务上可以充分借用内核的高效调度,能够高效利用物理核并实现真正的并行。
  2. 用户态与内核态切换有一定的消耗损失

2. 纯用户态实现(1:N)

将线程的调度在用户态实现,也称green thread,自己写调度算法,可以将一个native线程映射为多个app thread(这里也可以叫做线程包),这里的代表有Ruby(1.8-),Java等老版本,特点如下:

  1. 移植性好,没有切换、映射到内核的损失
  2. 需要自己维护Scheduler
  3. 由于内核并不了解调度细节,很难进行多核利用

3. 混合实现(M:N)

可以同时运行M个kernel线程下管理N个app线程,比如golang。通过设置GOMAXPROCS个native线程,然后通过go关键词创建app线程,它的特点如下:

  1. 调度器实现比较困难
  2. 通过语法糖与管道简化了并发编程,切换损失低
  3. 部分调度需要自己主动释放时间片
golang threading model(N)
    ↓
    ↓ goroutine
    ↓
Kernal thread model(M)

详见libtask与许式伟的《go语言编程》

总结

  1. Concurrent是Parallels的父类
  2. 在启动一个程序后,将分配用户态与内核态任务,通过系统调用执行内核中的高权限任务
  3. POSIX是一种线程标准,或者是一种接口,由libc库实现
  4. 线程与进程最大的区别在于内核函数clone函数的flag不同,导致共享资源不同。最终创建、切换耗时不同;以及内存分配、内部通信复杂度不同。
  5. 在Java中,java.lang.Thread与内核线程一一对应;在某些旧版语言中,实现了一个内核线程对应多个高层线程;在golang中,通过goroutine实现M个内核线程对应N个高层线程;

REFFERENCE

  1. https://www.zhihu.com/question/21461752
  2. https://blog.codinghorror.com/understanding-user-and-kernel-mode/
  3. http://stackoverflow.com/questions/1311402/differences-between-user-and-kernel-modes
  4. https://zh.wikipedia.org/wiki/%E5%BF%99%E7%A2%8C%E7%AD%89%E5%BE%85
  5. https://www.ibm.com/developerworks/cn/linux/l-system-calls/

JVM线程生命周期的native实现


本文不是入门文章,对读者的理论基础与折腾要求都有点高

  • 了解Java层Thread的各种玩法
  • 知道什么是pthread,什么是系统调用,什么是Glibc/NPTL(Native POSIX Thread Library)
  • 了解操作系统内核中线程的理论流程
  • 需要会配置Clion/VSCode

配置编译参数并测试

首先还是按照上次讲的使用Clion(GDB)调试小型JVM源码方法编译,只是编译参数改一下,以免日志过多

- ./configure --with-classpath-install-dir=/tmp/classpath --enable-trace
+ ./configure --with-classpath-install-dir=/tmp/classpath --enable-tracethread

编译成功后,分别用直接运行与strace进行测试

➜  cd $HOME
➜  $HOME/Desktop/jamvm-2.0.0/src/jamvm -cp . A
....
Hello
➜  strace $HOME/Desktop/jamvm-2.0.0/src/jamvm -cp . A
....
clone(child_stack=0x7f96bf542ff0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f96bf5439d0, tls=0x7f96bf543700, child_tidptr=0x7f96bf5439d0) = 17140
....

new Thread创建的源码分析

首先构造一个Java代码创建一个线程,并编译为Class文件

//A.java
class A{
    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"233");
            }
        }).start();
    }
}

首先在Java层中使用普通JDK进行断点,并观察断点运行时的线程名称,位置如下

  • start()
  • run()
  • exit()
  • ThreadGroup.remove()断点

接着看Java中线程的源码

通过对Thread的<init>start进行分析,可以发现start中调用了native方法。这里的start需要注意,在OracleJVM中使用了start0作为native方法,而在GNU Classpath/Android中出使用了java.lang.VMThread.create(this, stacksize);实现,本文主要分析java.lang.VMThread.create的实现createJavaThread。

在JVM上通过断点调试可以发现,此native方法被JVM路由到thread.c的createJavaThread

我们在这里打断点

// src/thread.c
void createJavaThread(Object *jThread, long long stack_size) {
   	
  	...
  	Thread *thread = sysMalloc(sizeof(Thread));
	...
  	// native 与 Java 线程的上下文环境进行绑定
    thread->ee = ee;
    ee->thread = jThread;
    ee->stack_size = stack_size;
    ...

    disableSuspend(self);
	// 通过系统调用创建native线程, `threadStart`是创建成功后的回掉函数指针
    if(pthread_create(&thread->tid, &attributes, threadStart, thread)) {
        ...
        return;
    }

  	/* 自旋等待线程变更状态,这里一般来说将会变为 RUNNING */
    pthread_mutex_lock(&lock);
    /* Wait for thread to start */
    while(classlibGetThreadState(thread) == CREATING)
        pthread_cond_wait(&cv, &lock);
    pthread_mutex_unlock(&lock);
    enableSuspend(self);
}

当通过系统调用创建线程完成后,在新的线程中调用threadStart函数指针

// 新线程启动后将立刻调用的函数
void *threadStart(void *arg) {
    Thread *thread = (Thread *)arg;
    Object *jThread = thread->ee->thread;

    /* Parent thread created thread with suspension disabled.
       This is inherited so we need to enable */
    enableSuspend(thread);

    /* Complete initialisation of the thread structure, create the thread
       stack and add the thread to the thread list */
    initThread(thread, INST_DATA(jThread, int, daemon_offset), &thread);

    /* Add thread to thread ID map hash table. */
    addThreadToHash(thread);

    /* Set state to running and notify creating thread */
  	// 设置状态为 RUNNING,通过`pthread_cond_broadcast`结束`pthread_cond_wait`的状态
    signalThreadRunning(thread);

    /* Execute the thread's run method */
  	/* 开始执行Java代码 class=java.lang.Thread,nam=run,sign=`()v` */
    executeMethod(jThread, CLASS_CB(jThread->class)->method_table[run_mtbl_idx]);

    /* Run has completed.  Detach the thread from the VM and exit */
  	/* 这里执行了java.lang.ThreadGroup: removeThread(Ljava/lang/Thread;)V */
    detachThread(thread);

    TRACE("Thread %p id: %d exited\n", thread, thread->id);
    return NULL;
}

如何通过Class引用打印名称 slash2DotsDup(CLASS_CB(class)->name)

可以发现,总体流程是非常清晰的

  1. 主线程通过pthread_create创建新线程
  2. 新线程执行Thread.run() 的Java代码段,执行完成后detach,移除ThreadGroup,结束线程

总的来说,这里分析并不是非常细致,主要是把流程先通起来,后续有具体例子(比如线程池)时再分析

总结

在JamVM中

  • Java线程与pthread线程一一对应,通过内核调度实现。但是不同厂商不一定。
  • 在Java代码中Runnable中的代码段的执行实际上通过pthread_create的函数指针threadStart进行包装后,最终调用executeMethod实现解释字节码

附录

pthead相关知识

线程模型的综述中已经讲过pthread相关知识,这里再回顾一下。JamVM使用pthread的头文件进行开发,在Linux中一般由Glibc实现,最后通过SYSCALL系统调用通过内核去创建线程。

java.lang.Thread
    |
POSIX thread(user mode){
    1. pthread_create(pthread.h,标准库头文件)
    2. bionic/glibc等标准库下的so文件,进行SystemCall
    3. 用户态陷入内核态
}
    |
Kernal thread(kernal mode)

pthead_create参数解释

入参如下

extern int pthread_create (pthread_t *__restrict __newthread,
			   const pthread_attr_t *__restrict __attr,
			   void *(*__start_routine) (void *),
			   void *__restrict __arg) __THROWNL __nonnull ((1, 3));

相关参数

  • _newthread: 即tid, 通过某种算法计算出唯一的Id
  • _attr: 线程类型,初学者可以配置为默认,即NULL
  • _start_routine: 是一个函数指针,类似于动态语言中的闭包(Closure)。线程创建后将执行此函数
  • _arg: 供_start_routine使用的入参

Glibc中NPTL的pthread_create的实现

下次有人再问你线程与进程的区别这种烂大街的问题,你就回答它们就是flag不同,导致(通过COW)共享的内存也不同而已

// glibc-2.25/sysdeps/unix/sysv/linux/createthread.c
const int clone_flags = (CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SYSVSEM
			   | CLONE_SIGHAND | CLONE_THREAD
			   | CLONE_SETTLS | CLONE_PARENT_SETTID
			   | CLONE_CHILD_CLEARTID
			   | 0);

TLS_DEFINE_INIT_TP (tp, pd);

// 此部分调用 external `__clone`
if (__glibc_unlikely (ARCH_CLONE (&start_thread, STACK_VARIABLES_ARGS,
                                  clone_flags, pd, &pd->tid, tp, &pd->tid)
                      == -1))
  return errno;

__clone的实现在内核中,通过 x86_64 ABI与网上找的例子,最终汇编代码如下

eax = 120 (syscall number for sys_clone)
ebx = unsigned long flags
ecx = void *child_stack
edx = void *ptid
esi = void *ctid
edi = struct pt_regs *regs  
int 80H

内核收到80H中断请求后,将进行内核态线程的创建与调度,后面内核线程以后再写...

JVM中自动启动的线程

在JVM中,创建了main线程(Native线程,对应 JVM下的用户主线程) + 如下4个Java-level的线程 ,可以通过对createVMThread进行findUsage定位

1. GC线程

通过WhileLoop与Sleep每隔1000ms进行一次GC,这里后期将专门分析GC流程(当初买的一本GC实现终于可以用啦)

/* Create and start VM thread for asynchronous GC */
if(args->asyncgc)
  createVMThread("Async GC", asyncGCThreadLoop);
2. Reference处理线程

这里配合GC的线程,共2个,日后在详细讲

/* Create and start VM threads for the reference handler and finalizer */
createVMThread("Finalizer", finalizerThreadLoop);
createVMThread("Reference Handler", referenceHandlerThreadLoop);
3. 信号处理线程

通过WhileLoop+sigwait()系统阻塞监听信号实现处理 SIGINTSIGQUIT

 /* Create the signal handler thread.  It is responsible for
 catching and handling SIGQUIT (thread dump) and SIGINT
 (user-termination of the VM, e.g. via Ctrl-C).  Note it
 must be a valid Java-level thread as it needs to run the
 shutdown hooks in the event of user-termination */
createVMThread("Signal Handler", classlibSignalThread);

参考

开源项目中的线程池

下面介绍几个开源项目中如何配置的线程池

JDK自带Executors里面几个线程池

这几个就不介绍了,可以直接看JavaDoc

corePoolSizemaximumPoolSizekeepAliveBlockingQueue
newCachedThreadPool0Integer.MAX_VALUE60sSynchronousQueue
newFixedThreadPool${nThreads}${nThreads}0sLinkedBlockingQueue
newSingleThreadExecutor110sLinkedBlockingQueue

初始线程数并不是越大越好,它最终取决于代码中可以并行运算的比例。当并发线程太多时,系统整体性能反而会下降,因为系统把很多时间花在了线程调度上,详见阿姆达尔定律
AmdahlsLaw.svg.png

RxJava中几个线程池

在RxJava2中主要有如下4个线程池(划掉的不进行分析)

  • SingleScheduler
  • ComputationScheduler
  • IoScheduler
  • NewThreadScheduler

本文只专注于它的“执行”过程,而不是调度过程,最重要的部分在Worker中,因此可以这样构造代码研究

// Groovy代码
5.times {
    Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Runnable() {
        @Override
        void run() {
            println "inner = " + Thread.currentThread().getStackTrace().join('\n')
            println "inner = " + Thread.currentThread().getName()
        }
    });
}
Thread.currentThread().join()

下面假设你已经把上面流程全部调通。以下为我的调试结果

  • 线程池: 一群只会干活的奋斗者,所有线程池构造都是newScheduledThreadPool(1, factory)
  • Worker: Worker是对线程池的包装,类似于基层领导,一个Worker被分配一个线程池,当然这个线程池可能身兼数职。
  • EventLoopWorker: 对线程池外部状态进行维护,可以看作某些公司的HR

Worker状态

创建

调用createWorker创建一个Worker

//IoScheduler
public Worker createWorker() {
    //CachedWorkerPool: 变长队列,可以创建与缓存无数个线程池
    return new EventLoopWorker(pool.get());
}

//ComputationScheduler
public Worker createWorker() {
    //FixedSchedulerPool: 用数组pool[CPUSize]实现,通过取余调用(实现代码为`pool[i++%4]`)分配任务,不能创建更多的线程池
    return new EventLoopWorker(pool.get().getEventLoop());
}
干活

work执行schedule,最终调用线程池的executor.submit(),并返回一个包装后的Future,这里没什么可讲的。

任务结束

onComplete后RxJava框架调用FlowableSubscribeOndispose方法,最终主动调用worker.dispose()

//IoScheduler.EventLoopWorker
@Override
public void dispose() {
  if (once.compareAndSet(false, true)) {
    // 让线程池执行cancel命令,并清空所有Future
    tasks.dispose();
    // 设置Worker超时,加入“短命单”,60s后定时清理一次
    // 这里原理是构造了每隔60s自动执行的`evictorService`,它是一个清洁线程池,定期执行`evictExpiredWorkers`方法,移除队列`expiringWorkerQueue`中超时的Worker
    // releasing the pool<threadWorker> should be the last action
    pool.release(threadWorker);
  }
}

 //ComputationScheduler.EventLoopWorker
 @Override
 public void dispose() {
   if (!disposed) {
     disposed = true;
     // 斩立决,让线程池执行cancel命令,并清空所有Future
     both.dispose();
   }
 }

IoScheduler的模式正是某些私营公司在工作量大时海量招聘时堆人,加班奋斗后变成甘蔗渣,最后进入短名单被HR定期优化的流程。

ComputationScheduler的模式是某些公司工作量大时加班再多也不招人,加班奋斗后变成甘蔗渣,最后直接被通知斩立决马上走人的流程。

通过断点也可以发现,由于IoScheduler是在static初始化并启动,因此无论是否使用IoScheduler,清洁工线程池evictorService将自动启动且永远不会自动关闭,除非将IoScheduler进行主动shutdown。发现了什么了吗,执行者只是可替代的资源,只有Pool才是核心竞争力!

Dubbo中的线程池

Dubbo是一款事务框架,这种纯框架中使用的线程池一般可以作为各种场景负载下的教科书,我们将Executors作为关键字进行统计

打开IDEA,运行cmd+shift+F,搜索Executors.即可

如果不使用IDEA,可以通过Shell的grep暴力搜索

grep 'Executors.' * -R|grep -v test|grep -v import|grep -v class

经过统计,最终如下

newCachedThreadPool: 10
最低线程限制为1个,处理实际业务与Socket(JBOSS、Netty、bootstrap)等

newScheduledThreadPool: 11
最低线程数在1~3之间

URL: 定期重连,清除过期者,心跳,延期等。调用BO失败后定期重试

文件: 更新定时统计/图表/日志, 检查文件更新(可用于热部署),

newFixedThreadPool: 1
线程限制固定为1个,用于文件缓存写入

在并发业务中,一般分为有状态的与无状态的服务,其中【有状态】的服务需要处理竞态场景(比如a++这样的操作),主要有两种解决方案

  • 使用互斥锁保证独立性,但是由于需要线程挂起/切换/调度,导致效率较低,因此对它的要求是包裹越小越好,详情看这里的对比与以前写的理论分析。这里典型的实现有Java的synchronized以及其它语言的Mutex
  • 通过busy waiting不断尝试,即while(CAS(old,new)) {};。这里主要有JDK5.0以上的atomic库,以及常见的自旋锁(SpinLock,比如Linux内核/通信设备就有使用),它们底层均通过CAS指令保证原子性,这个是本文将要介绍的内容。

快速结论

  • Java的CAS是通过汇编命令cmpxchg进行的包装与定制实现atomic memory operation (AMO) i
  • CAS由于是进行主动比较,因此compareAndSet一定会放在一个自旋中,它适用于冲突(自旋次数)比较少的场景。在某些激烈的场景下可能性能还不如互斥锁。
  • CPU汇编层面没有实现CAS的公平调用,需要上层业务自行实现

CAS的简介

如果没有听过CAS的话,下面是简要入门,比如下面可以达到最终总是100,而不会小于100

AtomicInteger a = new AtomicInteger(0)
def runnable = {
    println "a=${a.incrementAndGet()}, ${Thread.currentThread().getName()}"
}
100.times {
    new Thread(runnable).start()
}
Thread.currentThread().join()

CAS入门文章

配置GDB断点(选读)

本文将通过GDB调试JamVM实现对CAS的源码分析,首先要搭建GDB调试环境,然后才能进行测试

测试代码如下

import java.util.concurrent.atomic.AtomicInteger;

class A{
    public static void main(String[] args){
        AtomicInteger a = new AtomicInteger(1);
        System.out.println("a = " + a.incrementAndGet());
    }
}

接着使用Javac进行编译

javac A.java

最后使用JamVM与GDB进行断点测试

jamvm -cp . A

断点打在natives.ccompareAndSwapInt方法中,就可以分析调用栈了

CAS在JVM上的流程分析

CAS在Java层上的实现

CAS在Java中最后的实现都是sun.misc.Unsafe类,而它几乎全部是Native方法,几乎没有什么可以分析,值得注意的是,在不同JVM中实现的过程不一定一样,但是最后肯定都有一个循环与CAS,下文是OracleJDK的实现

// 1. 开发者调用JDK
AtomicInteger.incrementAndGet();
// 2. JDK内部自旋并不断尝试(getAndAddInt)
int var5;
do {
  var5 = getIntVolatile(this, offset);
} while(!compareAndSwapInt(this, offset, var5, var5 + 1));
return var5;

其中offset是通过Unsafe方法获取var5相对于AtomicInteger这个class的native内存地址偏移

上面的 getIntVolatilecompareAndSwapInt 均是native方法,其中thisAtomicInteger这个结构体

  • this与offset: 表示内存中的数值
  • var5: 是旧的数值
  • var5+1: 是新的数值

在多线程下,如果没有竞争,1~2次就可以完成循环;在有一定竞争情况下,也就是memory被反复改,它通过不断自旋实现重试

CAS在native层的实现

下面是迷你虚拟机JamVM的unsafe.compareAndSwapInt的C语言实现,最终还是调用到了汇编,通过CPU硬件实现

首先分析objectFieldOffset,它本质是获取Class的Slot中的“value”对应的偏移量

// 在Java中,获取结构体的偏移位置
// Field f = AtomicInteger.class.getDeclaredField("value")
// valueOffset = unsafe.objectFieldOffset(f);
//JamVM natives.c
uintptr_t *objectFieldOffset(Class *class, MethodBlock *mb, uintptr_t *ostack) {
    FieldBlock *fb = fbFromReflectObject((Object*)ostack[1]);

    *(long long*)ostack = (long long)(uintptr_t)
                          &(INST_DATA((Object*)NULL, int, fb->u.offset));
    return ostack + 2;
}

在本文的 AtomicInteger 中,valueOffset为12常量,为什么要做这一步呢,因此CAS基本上都是native操作,需要直接操作内存,后续将通过valueOffset获取到 value

接着分析compareAndSwapInt

// 在Java中,调用如下
// compareAndSwapInt(this, offset, var5, var5 + 1)
//JamVM natives.c
uintptr_t *compareAndSwapInt(Class *class, MethodBlock *mb, uintptr_t *ostack) {
    long long offset = *((long long *)&ostack[2]);
    unsigned int *addr = (unsigned int*)((char *)ostack[1] + offset);
    unsigned int expect = ostack[4];
    unsigned int update = ostack[5];
    int result;
      //调用平台CPU特定的汇编代码实现
    result = COMPARE_AND_SWAP_32(addr, expect, update);
    *ostack++ = result;
    return ostack;
}

// x86-64 的汇编实现宏
#define COMPARE_AND_SWAP_32(addr, old_val, new_val)        \
({                                                         \
    char result;                                           \
    __asm__ __volatile__ ("                                \
        lock;                                              \
        cmpxchgl %4, %1;                                   \
        sete %0"                                           \
    : "=q" (result), "=m" (*addr)/*out*/                   \
    : "m" (*addr), "a" (old_val), "r" (new_val) /*in*/     \
    : "memory");     /*Clobbers, reload from memory*/      \
    result;                                                \
})

通过使用CLion对源码进行文本搜索compareAndSwapInt方法,可以快速找到CAS的实现类

`

这里的汇编格式阅读起来就比较费劲了,你可以选择学一下它的格式(如果要看JVM代码,早晚都得学),并通过这里这里的文档了解q,m,r是什么意思

如果你不想看的话,直接上HopperDisassembler将二进制的进行反编译,注意这里正好操作是反过来的

                     compareAndSwapInt:
0000000000030e80         mov        rcx, qword [ds:rdx+0x10]
0000000000030e84         mov        rax, qword [ds:rdx+0x20]
0000000000030e88         add        rcx, qword [ds:rdx+0x8]
0000000000030e8c         mov        rsi, qword [ds:rdx+0x28]
/* COMPARE_AND_SWAP_32 start */
0000000000030e90         lock cmpxchg dword [ds:rcx], esi
0000000000030e94         sete       al
0000000000030e97         movsx      rax, al
/* COMPARE_AND_SWAP_32 end */
0000000000030e9b         mov        qword [ds:rdx], rax
0000000000030e9e         lea        rax, qword [ds:rdx+0x8]
0000000000030ea2         ret

你可能需要知道各个寄存器的相关约定

最终COMPARE_AND_SWAP_32的伪代码如下,如果兴趣非常强,可以看网上开源的Verilog实现

//下文的 <- 指赋值的意思,Intel文档也是这样写的
--INPUT:
rcx <- memory <- addr
al <- old_val
esi <- new_val
--OUTPUT:
rax <- result
rcx <-memory <- addr
--PROCEDURE:
//cmpxchg dword [ds:rcx], esi
IF al == rcx
THEN
  ZF <- 1;
  rcx <- esi;
ELSE
  ZF <- 0;
  al <- rcx;
FI;
//sete       al
IF ZF == 1
THEN
  al <- 0;
ELSE
  al <- 1;
FI;
//movsx      rax, al
rax <- al
return rax

用C语言简化是这样的

if(old_val == *addr){
  *addr = new_val;
  return true;
} else{
  old_val = *addr;
  return false;
}

详细汇编分析可以参考这篇类似的文章Is x86 CMPXCHG atomic?

注意上面CPU里的Lock是用来锁多核下的Bus实现内存屏障,而不是高级语言中锁代码段的,只是碰巧名称类似而已

附录

CMPXCHG 的一些介绍

通过查询Intel文档,可以得知

OpcodeInstructionDescription
0F B0/rCMPXCHG r/m8,r8Compare AL(accumulator, 累加器) with r/m8. If equal, ZF is set and r8 is loaded into r/m8. Else, clear ZF and load r/m8 into AL.

调用方法

CMPXCHG DEST, SRC

伪代码

IF accumulator == DEST
THEN
  ZF <- 1;
  DEST <- SRC;
ELSE
  ZF <- 0;
  accumulator <- DEST;
FI;

CAS中ABA的问题

使用AtomicStampedReference

todo

简单的例子

自增

int a = 0
ReentrantLock lock = new ReentrantLock()
100.times {
    new Thread(new Runnable() {
        @Override
        void run() {
            try {
                lock.lock()
                a++;
            } finally {
                lock.unlock()
            }
        }
    }).run()
}
println "a = $a"

与 synchronized 的区别

下面是使用object作为lock实现

int a = 0
def lock = new Object()
100.times {
    new Thread(new Runnable() {
        @Override
        void run() {
            synchronized (lock){
                a++
            }
        }
    }).run()
}
println "a = $a"

在功能上,ReentrantLock 比 synchronized 多了tryLock,超时,公平调度等额外功能

在native实现上

  • synchronized使用了Object中的monitor作为lock,调度通过OS实现(比如pthread_mutex等调用)
  • ReentrantLock使用了AQS实现Java层的公平与非公平调度

在Groovy中通过闭包去掉模版代码

参考这里: http://blog.johanneslink.net/2011/10/25/simplified-use-of-locks-in-groovy/

CountDownLatch

英文名词

  • predecessor| ˈpriːdɪsɛsə |: 前任者; 先輩. eg: the Prime Minister learned from his predecessor's mistakes.
  • latch: 门闩,CountDownLatch表示自动倒计时关上的门锁
  • elide | ɪˈlʌɪd |: 省略. eg: the null check may be elided.

应用

CountDownLatch一般用于异步转同步操作,比如SpringCloud/ElasticSearch中在主线程实现阻塞等待Response

比如SpringCloud中的Feign框架利用Rxjava等待异步网络请求,并转为同步操作

return command.submit(request)
    // 类似于前端的await,在Java中用CountDownLatch实现
    .toBlocking()
    .single();

再比如ElasticSearch客户端中基于Netty异步请求,同样也是基于await实现的

CountDownLatch是并发编程中通过CAS队列包装实现的锁,先举个例子(Groovy代码)

def s = Executors.newCachedThreadPool()
def latch = new CountDownLatch(3)
3.times{
    s.execute(new Runnable() {
        @Override
        void run() {
            try {
                println "start ${it}"
                TimeUnit.SECONDS.sleep(2)
                println "end ${it}"
            }catch (Exception e){
                e.printStackTrace()
            }finally{
                latch.countDown()
            }
        }
    })
}
latch.await()
println "latch complete"

通过执行上面的Groovy代码段可以发现,无论executor线程池中怎么运行,只要不阻塞死,主线程一定会在所有线程池计数归零时才开始运行。

CountDownLatch的应用

App启动页

在某些App启动时,既要加载广告,又要加载首屏数据,涉及到两个请求,这时可以让两个线程同时跑,主线程

流式/链式调用回掉

WatchDog

CountDownLatch的源码分析

CountDownLatch代码很少,去掉注释只有100多行。它本质上是对队列AbstractQueuedSynchronizer的包装,底层通过CAS实现了原子性,其中CAS之前文章已经写过了,本质是自旋与CPU硬件实现。

AbstractQueuedSynchronizer分析

private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
  // 此处通过包装CAS实现原子性
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

Unsafe.compareAndSwapObject在JVM侧的C实现

其实这部分以前已经分析过,下面的代码是JamVM的实现,本质上是CPU硬件帮你把事情搞定了

//  unsafe.compareAndSwapObject(node, nextOffset, expect, update)
uintptr_t *compareAndSwapObject(Class *class, MethodBlock *mb,
                                uintptr_t *ostack) {

    long long offset = *((long long *)&ostack[2]);
    uintptr_t *addr = (uintptr_t*)((char *)ostack[1] + offset);
    uintptr_t expect = ostack[4];
    uintptr_t update = ostack[5];
    int result;
	// 此部分通过 __asm__ 汇编代码段实现
    result = COMPARE_AND_SWAP(addr, expect, update);
    *ostack++ = result;
    return ostack;
}

ReadWriteLock

读写锁

真实项目使用

  • Mybatis的缓存组件
  • Redisson(基于Redis)的读写锁

Synchronization is not guaranteed to work unless both read and write operations are synchronized.

the implementation of Synchnornized in JVM


快速结论

在JamVM中的实现如下

  • 通过Object中的C结构体的object->lock->monitor实现锁的标志
  • 调度通过pThread的pthread_mutex_lock等操作进行lock与unlock,相当于把最难的调度通过系统调用甩锅给操作系统了
  • Synchnornized是java中互斥锁的实现,用于对并发编程进行支持

Object的线程方法

在了解Java的同步之前,先复习一下Java的线程模型,在Java中,使用Object作为最常用的锁。在Object中,有许多native方法,主要如下

函数功能
wait()释放monitor与时间片,只能超时或者有其他线程notify才能恢复
sleep()释放时间片,但是持有monitor
notify()通知调度器唤醒等待此monitor的BLOCKED线程队列中的一个,转为blocked状态
notifyAll()唤醒所有等待此monitor的线程队列,转为blocked状态

线程的状态有如下几种

source: &lt;http://n.ethz.ch/~anoever/recitation05.pdf&gt;

提醒一下,被notify()调用后是进入了block的队列,需要通过调度器“摇号”挑选其中一个后才能进入同步区块

互斥锁(Synchnornized)

互斥锁在其它编程语言中一般叫做Mutex(|mjuːtex|),用于实现对共享资源的独占性,当进入同步代码块时,获取并独占锁,离开时释放锁。在这段时间中,其它线程访问此资源时,会成为上图的BLOCKED状态。

在Java中线程是通过对C中的pThread包装实现的,pThread接口通过systemcall在内核实现,因此线程的管理本质都是在进行系统调用,有一定的切换消耗,故Synchnornized是一种比较重的锁。

在Java中,Synchnornized可以看作一种语法糖,不需要自己去判断、配置Mutex,只需要用synchnornized(object){}进行包裹即可,接下来我们对这个字段进行一下入门。

1. 对象同步

对象同步中有两种方法,第一种是通过代码块{}的包裹,编译时通过MONITORENTERMONITOREXITSample.this作为锁对象;第二种是为method添加access_flag为synchronized,JVM在执行时将根据FLAG自动加\减锁,如下两种写法在JVM中调用是效果相同的

public class Sample {

  public void do_work() {
    synchronized (Sample.this) {
      //do synchronized work
    }
  }

  public synchronized void do_work() {
    //do synchronized work
  }
}

详见: Java中非static的synchronized方法和synchronized(this)用的是一个锁么?

synchronized在方法中不会被继承,虽然继承synchronized方法有点诡异

再说个题外话,synchronized的锁范围越小越好,不建议放在方法上作为修饰,而是使用synchronized (xxx.this)代码块。common-pool旧版代码中就出现过此问题,导致上游业务出现死锁。

2. 类同步

类同步的对象是Sample.class,以下两种是相同的

public static synchronized do_work(){
    //do synchronized work
}

public void do_work(){
    synchronized (Sample.class){
          //do synchronized work
    }
}

单例的同步方法看这里:http://www.jianshu.com/p/eebcb81b1394

3. synchronized的应用

除了网上用烂的单例同步,多线程for循环自增,并发HashMap等例子外,还可以再举例

3.1. 释放锁与时间片

我曾经在OkHttp中,介绍过Socket的KeepAlive连接自动清理的实现,这里使用了wait进行阻塞,避免无谓的自旋(Spin-waiting)损耗。

while (true) {
  //执行清理并返回下场需要清理的时间
  long waitNanos = cleanup(System.nanoTime());
  if (waitNanos == -1) return;
  if (waitNanos > 0) {
    synchronized (ConnectionPool.this) {
      try {
        //在timeout内释放锁与时间片
        ConnectionPool.this.wait(TimeUnit.NANOSECONDS.toMillis(waitNanos));
      } catch (InterruptedException ignored) {
      }
    }
  }
}

注意wait在Java中一般都是需要在while循环中的,否则你的wait可能被路人线程给wakeup了,上文代码就是Best practice。

3.2. Dead Lock

虽然同步简化了编程,但是还有可能发生死锁,如下例子,当两个不同线程分别调用a.swap(b)b.swap(a)时,在getValue()时可能发生死锁。

//code from <Concurrent Programming in Java>
class Cell {      // Do not use
 private long value;
 synchronized long getValue() { return value; }

 synchronized void setValue(long v) { value = v; }
 synchronized void swapValue(Cell other) {
  long t = getValue();
  long v = other.getValue();
  setValue(v);
  other.setValue(t);
 }
}

Monitor(管程)

重点来了!管程是JVM中的互斥锁的实现,在Java中,Object.wait/notify,以及线程的wait/notify,synchronized生成的字节码MONITORENTER/EXIT等操作在JVM中都是通过Monitor控制与实现的。

在JVM中,每个java.lang.object对象对应的C结构体均拥有一个lock指针,指针对应一个Monitor(也就是说,对象、lock指针与Monitor是一一对应的)。一个Monitor只能同时被一个线程持有。

管程的工作机制如下图

source: http://en.wikipedia.org/wiki/Monitor_(synchronization)

本图来自于维基百科,可以看作是对阻塞式条件变量的一种封装

  • e表示入口队列的线程,它将尝试获取对象的锁,处于blocked状态
  • q表示等待队列的线程,处于waiting状态
  • wait表示调用pThread_wait()进行等待,进入q队列,大多数由开发者手动调用与恢复,下文暂不考虑
  • notified表示调用pThread_notify()后进入e队列
  • enterleave表示独占线程对锁的获取与释放

比如现在有线程队列e同时竞争一个对象,第一个跑的最快的线程enter后,立刻获取到了object的monitor,剩下的线程由于无法获得到monitor,就在e队列中阻塞等待锁的释放。

 enter the monitor:
    enter the method
    if the monitor is locked
        add this thread to e
        block this thread
    else
        lock the monitor

当任务完成后,调用调度器

 leave the monitor:
    schedule
    return from the method

调度器将会释放monitor,并通过某种调度公平的调度策略(可能是FIFO,也可能是优先权值等)将monitor分配给下一个处于blocked态的线程。

  schedule :
    if there is a thread on e
      select and remove one thread from e and restart it
      (this thread will occupy the monitor next)
    else
      unlock the monitor
      (the monitor will become unoccupied)

以Android6.0的vm为例,参考源码如下:

使用Object作为锁好不好?

Java中,通过设置Object结构体中的某个字段映射为锁,所有对象都可以成为锁,这样固然简化了编程,但是相比Ruby等语言使用Mutex作为专用锁,Java所耗用的结构体内存更多

参考

  1. http://android.group.iteye.com/group/wiki/3083-java-sync-communication
  2. http://ibruce.info/2013/12/07/java-interview-questions-concurrency/
  3. https://zh.wikipedia.org/wiki/%E7%9B%A3%E8%A6%96%E5%99%A8_(%E7%A8%8B%E5%BA%8F%E5%90%8C%E6%AD%A5%E5%8C%96)
  4. https://www.ibm.com/developerworks/cn/java/j-threads/

下面是在业务中常见的并发问题,优先用findBugs等工具扫描,并清空IDE的黄色报警,否则...

错误的单例

下面的double-lock单例代码是被FindBugs扫出的,改正如下。这里在底层主要是为汇编码加入了lock指令防止指令重排

public class AESUtil{
-   private static AES aes;
+   private static volatile AES  aes;
    static public AES getAES(){
        if(aes == null){
        synchronized(AESUtil.class){
            if(aes == null){
                aes = new AES();
            }
        }
        return aes;
    }

}

其实还有更简单的单例方法,就是在static代码段中构造field的实例化,由于ClassLoader加载class时`<clinit()>`阶段由JVM保证线程安全的,因此可以放心用,比如JDBC各种驱动就是这样初始化的。

错误的DataFormat

这个是在进行报表时出现的,当报表耗时大于定时任务间隔时,两个报表服务就会同时执行。如果`SimpleDateFormat`使用的是同一个,那么它的parse方法可能出现报错。具体原理我没有分享JDK源码,可能是它内部eval的状态机不是线程安全的。

解决方法: 1. 每次new一个 2. 使用ThreadLocal


title: Copy-on-write的介绍与应用 categories:

  • 数据结构

本文目录

  • 什么是写时复制
  • 写时复制的应用场景
  • 写时复制的实现

在并发编程中,如果需要实现对资源的冲突处理,一般采用互斥锁,队列、不可变来实现。上面的技术实现在很多书籍中都有,不过今天介绍的是一种新的方法--写时复制(Copy-on-write, COW)

关键词: COW, Copy on write, Redis

如果看的懂英文就直接看这里:

Copy on write (COW) is an optimization strategy that avoids copying large sized objects.

In a lot of real world programs, a value is copied to another variable and often is never written to. In most languages other than C++, all large sized objects are actually references. When you copy an object, all you copy is a pointer (shallow copy semantics). In such languages, COW is implemented at the language/runtime level, and not in the standard library.

In C++, copies are deep copies by default (value semantics), thus assigning large structures and strings are expensive, because the entire data is duplicated.

To avoid this, one can make a system where a copy is always shallow, but when you modify a copied object, the underlying object is duplicated, and then the changes are applied to the new copy.

总的来说,COW通过浅拷贝(shallow copy)只复制引用而避免复制值;当的确需要进行写入操作时,首先进行值拷贝,再对拷贝后的值执行写入操作,这样减少了无谓的复制耗时。

特点如下

  • 读取安全(但是不保证缓存一致性),写入安全(代价是加了锁,而且需要全量复制)
  • 不建议用于频繁读写场景下,全量复制很容易造成GC停顿,因此建议使用平时的ConcurrentXX包来实现。
  • 适用于对象空间占用大,修改次数少,而且对数据实效性要求不高的场景。

这里的安全指在进行读取或者写入的过程中,数据不被修改。

写时复制的应用场景

写时复制最擅长的是并发读取场景,即多个线程/进程可以通过对一份相同快照,去处理实效性要求不是很高但是仍然要做的业务(比如实现FS\DB备份、日志、分布式路由),举例如下。

1. Unix下的fork()系统调用

fork()是一个系统调用,用于创建新的进程(process)。

fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent.

在以前的文章中说过,fork内部实际上是对clone()系统函数的调用,它的参数CLONE_FLAG决定了需要共享哪些数据。在fork中,没有CLONE_VM参数,也就意味着不会共享\竞争同一个内存,而是复制一个内存快照给子进程,这个内存在32位下是4G的大小,占用空间相当的大,如果通过类似memcpy进行内存复制的话,fork调用的耗时将相当显著,甚至阻塞业务,那么为什么在真正开发调用时却没有发生呢?因为内部也是通过COW机制实现的。

内核实现:

在内核侧,在进行了内存“复制”后,子进程与父进程指向同一个只读的Page分页。当子进程或者父进程发送修改内存请求后,由于是分页是只读的,OS此时才将内存进行复制为两份,并将这两份内存设置为可写权限,最后再处理刚刚发送的修改内存请求。通过上述策略,实现了延迟复制,进程的创建是不是变快了?

2. Redis的持久化

Redis是一个基于KV的MemCache框架,可以将数据全部存储在内存中,特别适用于抢购、红包等高并发场景,当你希望对数据进行全量Dump(bgsave)到文件中或者进行主从同步时,将进行下面的步骤。

  • Redis forks. We now have a child and a parent process.
  • The child starts to write the dataset to a temporary RDB file.
  • When the child is done writing the new RDB file, it replaces the old one.

可以看出,Redis通过fork()系统调用实现了写时复制,而没有自己去造轮子

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);

    start = ustime();
    //指向子线程的pid如果为0,表示fork成功,为正表示为parent线程
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child进程要执行的代码 */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        ...
        return C_OK;
    }
    return C_OK; /* unreached */
}

在rdbSave中(目前已经为子线程中),具体实现如下,代码太长就不贴了

  1. 创建了一个temp-${getPid()}.rdb的文件
  2. 调用rioInitWithFile(rio *r, FILE *tmp),将r初始化为rioBufferIO
  3. 对全局变量server进行forEach反序列化,并保持到缓存r中,并写入文件,注意这个Server指针已经与父进程无关了
  4. 进行fflush、fsync、fclose系统调用清除OS的FS缓存(这也是OS内部的COW优化)
  5. 进行rename系统调用,进行重命名

系统调用都是默认线程安全的,所以不用担心多次重命名等问题

可以看出,在Redis中没有memcpy等内存复制过程,而是直接使用server指针进行读取并写入文件,因为在fork时,已经duplicated了快照。

写时复制的实现

以Java为例,在CopyOnWriteArrayList中,写数据在锁的保护下,而读取可以任意进行,代码如下。

private transient volatile Object[] array;


public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //类似于memcpy,构造一个新的对象
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        //重新设置引用
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

public E get(int index) {
	//获取到的数据没有实效性
    return get(getArray(), index);
}

final Object[] getArray() {
        return array;
}

其它可能需要深入了解的技能

#####1. 如何实现String\Map的写时复制?

这个一般只在糟糕的面试题中出现,因为写时复制主要用于处理大的数据,而大型的字符串、Map却很少见到场景(如果说非要来一个场景的话,就是Zookeeper中读取服务时,可能需要一个Map<String,Class>来实现)。在C++中,写时复制的String已经被废弃,并且Redis中设计的字符串可以更加优雅地扩容,在Java中,各类并发库已经很成熟,写时复制主要用于实现安全迭代,而没有String或者Map的需求。

如果非要让你写,可以这样处理:

  • 在构造函数、写入函数中实现深拷贝,并加锁,比如put中就再包装一道HashMap。
  • 在getter函数,实现无锁直接获取。

#####2. ConcurrentHashXXX与CopyOnWriteXXX的对比?

一个适用于写入量大的场景,一个适用于读取量大的场景,它们的线程安全关系如下

NormalConcurrentCOW
ReadUnsafeSafeSafe, may dirtyData
WriteUnsafeSafeSafe, may slowest

Ref

  1. https://zh.wikipedia.org/wiki/%E5%AF%AB%E5%85%A5%E6%99%82%E8%A4%87%E8%A3%BD
  2. http://ifeve.com/java-copy-on-write/
  3. http://www.ibm.com/developerworks/tivoli/library/t-snaptsm1/
  4. http://blog.csdn.net/jason314/article/details/5640969
  5. https://www.reddit.com/r/compsci/comments/31szui/trying_to_understand_fork_and_copyonwrite_cow/
  6. http://stackoverflow.com/questions/1570589/is-the-volatile-keyword-required-for-fields-accessed-via-a-reentrantlock
  7. https://www.quora.com/What-is-Copy-on-Write-and-how-is-it-used-in-C++

详见 PaaS