简栈文化

Java技术人的成长之路~

前言

零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。

零拷贝操作减少了在用户空间与内核空间之间切换模式的次数

举例来说,如果要读取一个文件并通过网络发送它,传统方式下如下图,传统的I/O操作进行了4次用户空间与内核空间的上下文切换,以及4次数据拷贝。其中4次数据拷贝中包括了2次DMA拷贝和2次CPU拷贝。通过零拷贝技术完成相同的操作,减少了在用户空间与内核空间交互,并且不需要CPU复制数据。

http://static.cyblogs.com/浅析零拷贝技术-1.png

linux中零拷贝技术

Linux系统的“用户空间”和“内核空间”

从Linux系统上看,除了引导系统的BIN区,整个内存空间主要被分成两个部分:内核空间(Kernel space)、用户空间(User space)。

“用户空间”和“内核空间”的空间、操作权限以及作用都是不一样的。

内核空间是Linux自身使用的内存空间,主要提供给程序调度、内存分配、连接硬件资源等程序逻辑使用;用户空间则是提供给各个进程的主要空间。

用户空间不具有访问内核空间资源的权限,因此如果应用程序需要使用到内核空间的资源,则需要通过系统调用来完成:从用户空间切换到内核空间,然后在完成相关操作后再从内核空间切换回用户空间。

Linux 中零拷贝技术的实现方向
  • 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。这种方式依旧存在用户空间和内核空间的上下文切换,但是硬件上的数据不会拷贝一份到内核空间,而是直接拷贝至了用户空间,因此直接I/O不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。
  • 在数据传输过程中,避免数据在用户空间缓冲区和系统内核空间缓冲区之间的CPU拷贝,以及数据在系统内核空间内的CPU拷贝。本文主要讨论的就是该方式下的零拷贝机制。
  • copy-on-write(写时复制技术):在某些情况下,Linux操作系统的内核空间缓冲区可能被多个应用程序所共享,操作系统有可能会将用户空间缓冲区地址映射到内核空间缓存区中。当应用程序需要对共享的数据进行修改的时候,才需要真正地拷贝数据到应用程序的用户空间缓冲区中,并且对自己用户空间的缓冲区的数据进行修改不会影响到其他共享数据的应用程序。所以,如果应用程序不需要对数据进行任何修改的话,就不会存在数据从系统内核空间缓冲区拷贝到用户空间缓冲区的操作。
mmap
1
2
buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用mmap(),磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。

http://static.cyblogs.com/浅析零拷贝技术-2.png

使用mmap替代read很明显减少了一次拷贝,当拷贝数据量很大时,无疑提升了效率。但是使用mmap是有代价的。当你使用mmap时,你可能会遇到一些隐藏的陷阱。例如,当你的程序map了一个文件,但是当这个文件被另一个进程截断(truncate)时, write系统调用会因为访问非法地址而被SIGBUS信号终止。SIGBUS信号默认会杀死你的进程并产生一个coredump,如果你的服务器这样被中止了,那会产生一笔损失。

当然也有办法解决,本文忽略解决办法。

sendfile
1
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

http://static.cyblogs.com/浅析零拷贝-03.jpg

sendfile实现的零拷贝I/O只使用了2次用户空间与内核空间的上下文切换,以及3次数据的拷贝。其中3次数据拷贝中包括了2次DMA拷贝和1次CPU拷贝。

splice

sendfile只适用于将数据从文件拷贝到套接字上,限定了它的使用范围。Linux在2.6.17版本引入splice系统调用,用于在两个文件描述符中移动数据:

1
2
3
#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <fcntl.h>
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);

Java中如何使用零拷贝

DMA简介

io读写的方式:中断,DMA

DMA 直接内存存取,是允许外设组件将I/O数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。,相较于中断方式,减少cpu中断次数,不用cpu拷贝数据。

主要流程如下:

  1. 用户进程发起数据读取请求
  2. 系统调度为该进程分配cpu
  3. cpu向DMA发送io请求
  4. 用户进程等待io完成,让出cpu
  5. 系统调度cpu执行其他任务
  6. 数据写入至io控制器的缓冲寄存器
  7. DMA不断获取缓冲寄存器中的数据(需要cpu时钟)
  8. 传输至内存(需要cpu时钟)
  9. 所需的全部数据获取完毕后向cpu发出中断信号

参考地址

  1. 浅析Linux中的零拷贝技术:https://www.jianshu.com/p/fad3339e3448
  2. 零复制:https://zh.wikipedia.org/wiki/零复制
  3. 零拷贝原理-数据的收发-软中断和DMA:https://blog.csdn.net/xiaofei0859/article/details/74321216
  4. 浅谈 Linux下的零拷贝机制:https://cloud.tencent.com/developer/article/1152609

参考地址

前提

这篇文章主要分析一下Introspector(内省)的用法。Introspector是一个专门处理JavaBean的工具类,用来获取JavaBean里描述符号,常用的JavaBean的描述符号相关类有BeanInfoPropertyDescriptorMethodDescriptorBeanDescriptorEventSetDescriptorParameterDescriptor。下面会慢慢分析这些类的使用方式,以及Introspector的一些特点。

JavaBean是什么

JavaBean是一种特殊(其实说普通也可以,也不是十分特殊)的类,主要用于传递数据信息,这种类中的方法主要用于访问私有的字段,且方法名符合某种命名规则(字段都是私有,每个字段具备SetterGetter方法,方法和字段命名满足首字母小写驼峰命名规则)。如果在两个模块之间传递信息,可以将信息封装进JavaBean中,这种对象称为值对象(Value Object)或者VO。这些信息储存在类的私有变量中,通过SetterGetter方法获得。JavaBean的信息在Introspector里对应的概念是BeanInfo,它包含了JavaBean所有的Descriptor(描述符),主要有PropertyDescriptorMethodDescriptor(MethodDescriptor里面包含ParameterDescriptor)、BeanDescriptorEventSetDescriptor

属性Field和属性描述PropertiesDescriptor的区别

如果是严格的JavaBean(Field名称不重复,并且Field具备SetterGetter方法),它的PropertyDescriptor会通过解析SetterGetter方法,合并解析结果,最终得到对应的PropertyDescriptor实例。所以PropertyDescriptor包含了属性名称和属性的SetterGetter方法(如果存在的话)。

内省Introspector和反射Reflection的区别

  • Reflection:反射就是运行时获取一个类的所有信息,可以获取到类的所有定义的信息(包括成员变量,成员方法,构造器等)可以操纵类的字段、方法、构造器等部分。可以想象为镜面反射或者照镜子,这样的操作是带有客观色彩的,也就是反射获取到的类信息是必定正确的。
  • Introspector:内省基于反射实现,主要用于操作JavaBean,基于JavaBean的规范进行Bean信息描述符的解析,依据于类的SetterGetter方法,可以获取到类的描述符。可以想象为“自我反省”,这样的操作带有主观的色彩,不一定是正确的(如果一个类中的属性没有SetterGetter方法,无法使用内省)。

常用的内省相关类

主要介绍一下几个核心类所提供的方法。

Introspector

Introspector类似于BeanInfo的静态工厂类,主要是提供静态方法通过Class实例获取到BeanInfo,得到BeanInfo之后,就能够获取到其他描述符。主要方法:

  • public static BeanInfo getBeanInfo(Class beanClass):通过Class实例获取到BeanInfo实例。

BeanInfo

BeanInfo是一个接口,具体实现是GenericBeanInfo,通过这个接口可以获取一个类的各种类型的描述符。主要方法:

  • BeanDescriptor getBeanDescriptor():获取JavaBean描述符。
  • EventSetDescriptor[] getEventSetDescriptors():获取JavaBean的所有的EventSetDescriptor
  • PropertyDescriptor[] getPropertyDescriptors():获取JavaBean的所有的PropertyDescriptor
  • MethodDescriptor[] getMethodDescriptors():获取JavaBean的所有的MethodDescriptor

这里要注意一点,通过BeanInfo#getPropertyDescriptors()获取到的PropertyDescriptor数组中,除了Bean属性的之外,还会带有一个属性名为classPropertyDescriptor实例,它的来源是ClassgetClass方法,如果不需要这个属性那么最好判断后过滤。

PropertyDescriptor

PropertyDescriptor类表示JavaBean类通过存储器(SetterGetter)导出一个属性,它应该是内省体系中最常见的类。主要方法:

  • synchronized Class getPropertyType():获得属性的Class对象。
  • synchronized Method getReadMethod():获得用于读取属性值的方法;
  • synchronized Method getWriteMethod():获得用于写入属性值的方法。
  • int hashCode():获取对象的哈希值。
  • synchronized void setReadMethod(Method readMethod):设置用于读取属性(Getter)值的方法。
  • synchronized void setWriteMethod(Method writeMethod):设置用于写入属性值(Setter)的方法。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
复制public class Main {

public static void main(String[] args) throws Exception {
BeanInfo beanInfo = Introspector.getBeanInfo(Person.class);
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
if (!"class".equals(propertyDescriptor.getName())) {
System.out.println(propertyDescriptor.getName());
System.out.println(propertyDescriptor.getWriteMethod().getName());
System.out.println(propertyDescriptor.getReadMethod().getName());
System.out.println("=======================");
}
}
}

public static class Person {

private Long id;
private String name;
private Integer age;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
复制age
setAge
getAge
=======================
id
setId
getId
=======================
name
setName
getName
=======================

不正当使用Introspector会导致内存溢出

如果框架或者程序用到了JavaBeans Introspector,那么就相当于启用了一个系统级别的缓存,这个缓存会存放一些曾加载并分析过的Javabean的引用,当web服务器关闭的时候,由于这个缓存中存放着这些Javabean的引用,所以垃圾回收器不能对Web容器中的JavaBean对象进行回收,导致内存越来越大。还有一点值得注意,清除Introspector缓存的唯一方式是刷新整个缓存缓冲区,这是因为JDK没法判断哪些是属于当前的应用的引用,所以刷新整个Introspector缓存缓冲区会导致把服务器的所有应用的Introspector缓存都删掉。Spring中提供的org.springframework.web.util.IntrospectorCleanupListener就是为了解决这个问题,它会在Web服务器停止的时候,清理一下这个Introspector缓存,使那些Javabean能被垃圾回收器正确回收。

也就是说JDKIntrospector缓存管理是有一定缺陷的。但是如果使用在Spring体系则不会出现这种问题,因为SpringIntrospector缓存的管理移交到Spring自身而不是JDK(或者在Web容器销毁后完全不管),在加载并分析完所有类之后,会针对类加载器对Introspector缓存进行清理,避免内存泄漏的问题,详情可以看CachedIntrospectionResultsSpringBoot刷新上下文的方法AbstractApplicationContext#refresh()finally代码块中存在清理缓存的方法AbstractApplicationContext#resetCommonCaches();。但是有很多程序和框架在使用了JavaBeans Introspector之后,都没有进行清理工作,比如Quartz、Struts等,这类操作会成为内存泄漏的隐患。

小结

  • 在标准的JavaBean中,可以考虑使用Introspector体系解析JavaBean,主要是方便使用反射之前的时候快速获取到JavaBeanSetterGetter方法。
  • Spring体系中,为了防止JDK对内省信息的缓存无法被垃圾回收机制回收导致内存溢出,主要的操作除了可以通过配置IntrospectorCleanupListener预防,还有另外一种方式,就是通过CachedIntrospectionResults类自行管理Introspector中的缓存(这种方式才是优雅的方式,这样可以避免刷新整个Introspector的缓存缓冲区而导致其他应用的Introspector也被清空),也就是把Jdk自行管理的Introspector相关缓存交给Spring自己去管理。在SpringBoot刷新上下文的方法AbstractApplicationContext#refresh()中finally代码块中存在清理缓存的方法AbstractApplicationContext#resetCommonCaches();,里面调用到的CachedIntrospectionResults#clearClassLoader(getClassLoader())方法就是清理指定的ClassLoader下的所有Introspector中的缓存的引用。

(本文完 e-a-20191225 c-1-d 更新了博客主题,感觉比以前好看一点点…)

参考地址

Linux整体架构图

我们先来看一张Linux整体架构图。

http://static.cyblogs.com/3433091-63269eb8f87c2bb9.png

系统调用

​ 系统调用时操作系统的最小功能单位。根据不同的应用场景,不同的Linux发行版本提供的系统调用数量也不尽相同,大致在240-350之间。这些系统调用组成了用户态跟内核态交互的基本接口,例如:用户态想要申请一块20K大小的动态内存,就需要brk系统调用,将数据段指针向下偏移,如果用户态多处申请20K动态内存,同时又释放呢?这个内存的管理就变得非常的复杂。

库函数

​ 库函数就是屏蔽这些复杂的底层实现细节,减轻程序员的负担,从而更加关注上层的逻辑实现。它对系统调用进行封装,提供简单的基本接口给用户,这样增强了程序的灵活性,当然对于简单的接口,也可以直接使用系统调用访问资源,例如:open()write()read()等等。库函数根据不同的标准也有不同的版本,例如:glibc库,posix库等。

Shell

Shell顾名思义,就是外壳的意思。就好像把内核包裹起来的外壳。它是一种特殊的应用程序,俗称命令行。为了方便用户和系统交互,一般一个Shell对应一个终端,呈现给用户交互窗口。当然Shell也是编程的,它有标准的shell语法,符合其语法的文本叫Shell脚本。很多人都会用Shell脚本实现一些常用的功能,可以提高工作效率。

为什么要区分用户态与内核态?

CPU的所有指令中,有一些指令是非常危险的,如果错用,将导致整个系统崩溃。比如:清内存、设置时钟等。如果所有的程序都能使用这些指令,那么你的系统一天死机N回就不足为奇了。所以,CPU将指令分为特权指令和非特权指令,对于那些危险的指令,只允许操作系统及其相关模块使用,普通的应用程序只能使用那些不会造成灾难的指令。IntelCPU将特权级别分为4个级别:RING0RING1RING2RING3

​ 当一个任务(进程)执行系统调用而陷入内核代码中执行时,我们就称进程处于内核运行态(或简称为内核态)。此时处理器处于特权级最高的(0级)内核代码中执行。

  • 当进程处于内核态时,执行的内核代码会使用当前进程的内核栈。每个进程都有自己的内核栈。
  • 当进程在执行用户自己的代码时,则称其处于用户运行态(用户态)。即此时处理器在特权级最低的(3级)用户代码中运行。

​ 当正在执行用户程序而突然被中断程序中断时,此时用户程序也可以象征性地称为处于进程的内核态。Linux使用了Ring3级别运行用户态,Ring0作为 内核态,没有使用Ring1Ring2Ring3状态不能访问Ring0的地址空间,包括代码和数据。Linux进程的4GB地址空间,3G-4G部分大家是共享的,是内核态的地址空间,这里存放在整个内核的代码和所有的内核模块,以及内核所维护的数据。用户运行一个程序,该程序所创建的进程开始是运 行在用户态的,如果要执行文件操作,网络数据发送等操作,必须通过writesend等系统调用,这些系统调用会调用内核中的代码来完成操作,这时,必 须切换到Ring0,然后进入3GB-4GB中的内核地址空间去执行这些代码完成操作,完成后,切换回Ring3,回到用户态。

这样,用户态的程序就不能 随意操作内核地址空间,具有一定的安全保护作用。

处理器总处于以下状态中的一种:

  • 1、内核态,运行于进程上下文,内核代表进程运行于内核空间;

  • 2、内核态,运行于中断上下文,内核代表硬件运行于内核空间;

  • 3、用户态,运行于用户空间。

用户态到内核态怎样切换?

从用户态到内核态切换可以通过三种方式:

**系统调用:**这是用户态进程主动要求切换到内核态的一种方式,用户态进程通过系统调用申请使用操作系统提供的服务程序完成工作,比如前例中fork()实际上就是执行了一个创建新进程的系统调用。而系统调用的机制其核心还是使用了操作系统为用户特别开放的一个中断来实现,例如Linux的int 80h中断。

**异常:**当CPU在执行运行在用户态下的程序时,发生了某些事先不可知的异常,这时会触发由当前运行进程切换到处理此异常的内核相关程序中,也就转到了内核态,比如缺页异常。

**外设中断:**当外围设备完成用户请求的操作后,会向CPU发出相应的中断信号,这时CPU会暂停执行下一条即将要执行的指令转而去执行与中断信号对应的处理程序,如果先前执行的指令是用户态下的程序,那么这个转换的过程自然也就发生了由用户态到内核态的切换。比如硬盘读写操作完成,系统会切换到硬盘读写的中断处理程序中执行后续操作等。

这3种方式是系统在运行时由用户态转到内核态的最主要方式,其中系统调用可以认为是用户进程主动发起的,异常和外围设备中断则是被动的。

参考地址

淘宝根据自身业务需求研发了TDDL(Taobao Distributed Data Layer)框架,主要用于解决分库分表场景下的访问路由(持久层与数据访问层的配合)以及异构数据库之间的数据同步,它是一个基于集中式配置的JDBC DataSource实现,具有分库分表Master/Salve动态数据源配置等功能。

就目前而言,许多大厂也在出一些更加优秀和社区支持更广泛的DAL层产品,比如Hibernate Shards、Ibatis-Sharding等。TDDL位于数据库和持久层之间,它直接与数据库建立交道,如图所示:

http://static.cyblogs.com/20160601111503650.png

​ 淘宝很早就对数据进行过分库的处理,上层系统连接多个数据库,中间有一个叫做DBRoute的路由来对数据进行统一访问。DBRoute对数据进行多库的操作、数据的整合,让上层系统像操作一个数据库一样操作多个库。但是随着数据量的增长,对于库表的分法有了更高的要求,例如,你的商品数据到了百亿级别的时候,任何一个库都无法存放了,于是分成2个、4个、8个、16个、32个……直到1024个、2048个。好,分成这么多,数据能够存放了,那怎么查询它?这时候,数据查询的中间件就要能够承担这个重任了,它对上层来说,必须像查询一个数据库一样来查询数据,还要像查询一个数据库一样快(每条查询在几毫秒内完成),TDDL就承担了这样一个工作。在外面有些系统也用DAL(数据访问层) 这个概念来命名这个中间件。下图展示了一个简单的分库分表数据查询策略:

http://static.cyblogs.com/20160601111544711.png

TDDL的主要优点:

1
2
3
4
5
6
7
8
9
10
1、数据库主备和动态切换
2、带权重的读写分离
3、单线程读重试
4、集中式数据源信息管理和动态变更
5、剥离的稳定jboss数据源
6、支持mysql和oracle数据库
7、基于jdbc规范,很容易扩展支持实现jdbc规范的数据源
8、无server,client-jar形式存在,应用直连数据库
9、读写次数,并发度流程控制,动态变更
10、可分析的日志打印,日志流控,动态变更12345678910

TDDL的体系架构

TDDL其实主要可以划分为3层架构,分别是Matrix层、Group层和Atom层。

**Matrix层:**用于实现分库分表逻辑,底层持有多个Group实例。而Group层和Atom共同组成了动态数据源

**Group层:**实现了数据库的Master/Salve模式的写分离逻辑,底层持有多个Atom实例。

**Atom层 (TAtomDataSource):**实现数据库ip,port,password,connectionProperties等信息的动态推送,以及持有原子的数据源分离的JBOSS数据源)。

http://static.cyblogs.com/20160601111711995.png

​ 持久层只关心对数据源的CRUD操作,而多数据源的访问并不应该由它来关心。也就是说TDDL透明给持久层的数据源接口应该是统一且“单一”的,至于数据库到底如何分库分表持久层无需知道也无需编写对应的SQL去实行应对策略。这个时候对TDDL一些疑问就出现了,TDDL需要对SQL进行二次解析和拼装吗?答案是不解析仅拼装。TDDL只需要从持久层拿到发出的SQL再按照一些分库分表条件,进行特定的SQL扩充以此满足访问路路由操作。
TDDL除了拿到分库分表条件外,还需要拿到order bygroup bylimitjoin等信息,SUMMAXMIN等聚合函数信息,DISTINCT信息。具有这些关键字的SQL将会在单库和多库情况下进行,语义是不同的。TDDL必须对使用这些关键字的SQL返回的结果做出合适的处理;
TDDL行复制需重新拼写SQL,带上sync_version字段。不通过SQL解析,因为TDDL遵守JDBC规范,它不可能去扩充JDBC规范里面的接口,所以只能通过SQL中加额外的字符条件(也就是HINT方式)或者ThreadLocal方式进行传递,前者使SQL过长,后者难以维护,开发debug时不容易跟踪,而且需要判定是在一条SQL执行后失效还是1个连接关闭后才失效;
TDDL现在也同时支持Hint方式和ThreadLocal方式传递这些信息;

参考地址

前言

Java提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景下能够展现出非常高的效率。本文旨在对锁相关源码(本文中的源码来自JDK 8和Netty 3.10.6)、使用场景进行举例,为读者介绍主流锁的知识点,以及不同的锁的适用场景。

Java中往往是按照是否含有某一特性来定义锁,我们通过特性将锁进行分组归类,再使用对比的方式进行介绍,帮助大家更快捷的理解相关知识。下面给出本文内容的总体分类目录:

img

1. 乐观锁 VS 悲观锁

乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在Java和数据库中都有此概念对应的实际应用。

先说概念。对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。Java中,synchronized关键字和Lock的实现类都是悲观锁。

而乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。

乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的。

img

根据从上面的概念描述我们可以发现:

  • 悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
  • 乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。

光说概念有些抽象,我们来看下乐观锁和悲观锁的调用方式示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ------------------------- 悲观锁的调用方式 -------------------------
// synchronized
public synchronized void testMethod() {
// 操作同步资源
}
// ReentrantLock
private ReentrantLock lock = new ReentrantLock(); // 需要保证多个线程使用的是同一个锁
public void modifyPublicResources() {
lock.lock();
// 操作同步资源
lock.unlock();
}

// ------------------------- 乐观锁的调用方式 -------------------------
private AtomicInteger atomicInteger = new AtomicInteger(); // 需要保证多个线程使用的是同一个AtomicInteger
atomicInteger.incrementAndGet(); //执行自增1

通过调用方式示例,我们可以发现悲观锁基本都是在显式的锁定之后再操作同步资源,而乐观锁则直接去操作同步资源。那么,为何乐观锁能够做到不锁定同步资源也可以正确的实现线程同步呢?我们通过介绍乐观锁的主要实现方式 “CAS” 的技术原理来为大家解惑。

CAS全称 Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。java.util.concurrent包中的原子类就是通过CAS来实现了乐观锁。

CAS算法涉及到三个操作数:

  • 需要读写的内存值 V。
  • 进行比较的值 A。
  • 要写入的新值 B。

当且仅当 V 的值等于 A 时,CAS通过原子方式用新值B来更新V的值(“比较+更新”整体是一个原子操作),否则不会执行任何操作。一般情况下,“更新”是一个不断重试的操作。

之前提到java.util.concurrent包中的原子类,就是通过CAS来实现了乐观锁,那么我们进入原子类AtomicInteger的源码,看一下AtomicInteger的定义:

img

根据定义我们可以看出各属性的作用:

  • unsafe: 获取并操作内存的数据。
  • valueOffset: 存储value在AtomicInteger中的偏移量。
  • value: 存储AtomicInteger的int值,该属性需要借助volatile关键字保证其在线程间是可见的。

接下来,我们查看AtomicInteger的自增函数incrementAndGet()的源码时,发现自增函数底层调用的是unsafe.getAndAddInt()。但是由于JDK本身只有Unsafe.class,只通过class文件中的参数名,并不能很好的了解方法的作用,所以我们通过OpenJDK 8 来查看Unsafe的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ------------------------- JDK 8 -------------------------
// AtomicInteger 自增方法
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}

// ------------------------- OpenJDK 8 -------------------------
// Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}

根据OpenJDK 8的源码我们可以看出,getAndAddInt()循环获取给定对象o中的偏移量处的值v,然后判断内存值是否等于v。如果相等则将内存值设置为 v + delta,否则返回false,继续循环进行重试,直到设置成功才能退出循环,并且将旧值返回。整个“比较+更新”操作封装在compareAndSwapInt()中,在JNI里是借助于一个CPU指令完成的,属于原子操作,可以保证多个线程都能够看到同一个变量的修改值。

后续JDK通过CPU的cmpxchg指令,去比较寄存器中的 A 和 内存中的值 V。如果相等,就把要写入的新值 B 存入内存中。如果不相等,就将内存值 V 赋值给寄存器中的值 A。然后通过Java代码中的while循环再次调用cmpxchg指令进行重试,直到设置成功为止。

CAS虽然很高效,但是它也存在三大问题,这里也简单说一下:

  1. ABA问题

    。CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。

    • JDK从1.5开始提供了AtomicStampedReference类来解决ABA问题,具体操作封装在compareAndSet()中。compareAndSet()首先检查当前引用和当前标志与预期引用和预期标志是否相等,如果都相等,则以原子方式将引用值和标志的值设置为给定的更新值。
  2. 循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。

  3. 只能保证一个共享变量的原子操作

    。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。

    • Java从1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。
2. 自旋锁 VS 适应性自旋锁

在介绍自旋锁前,我们需要介绍一些前提知识来帮助大家明白自旋锁的概念。

阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

img

自旋锁本身是有缺点的,它不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是10次,可以使用-XX:PreBlockSpin来更改)没有成功获得锁,就应当挂起线程。

自旋锁的实现原理同样也是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。

img

自旋锁在JDK1.4.2中引入,使用-XX:+UseSpinning来开启。JDK 6中变为默认开启,并且引入了自适应的自旋锁(适应性自旋锁)。

自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。

在自旋锁中 另有三种常见的锁形式:TicketLock、CLHlock和MCSlock,本文中仅做名词介绍,不做深入讲解,感兴趣的同学可以自行查阅相关资料。

3. 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁

这四种锁是指锁的状态,专门针对synchronized的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么Synchronized能实现线程同步?

在回答这个问题之前我们需要了解两个重要的概念:“Java对象头”、“Monitor”。

Java对象头

synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?

我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。

Mark Word:默认存储对象的HashCode,分代年龄和锁标志位信息。这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。

Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

现在话题回到synchronized,synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。

如同我们在自旋锁中提到的“阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长”。这种方式就是synchronized最初实现同步的方式,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

所以目前锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级。

通过上面的介绍,我们对synchronized的加锁机制以及相关知识有了一个了解,那么下面我们给出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:

锁状态 存储内容 存储内容
无锁 对象的hashCode、对象分代年龄、是否是偏向锁(0) 01
偏向锁 偏向线程ID、偏向时间戳、对象分代年龄、是否是偏向锁(1) 01
轻量级锁 指向栈中锁记录的指针 00
重量级锁 指向互斥量(重量级锁)的指针 10

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。

如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

整体的锁状态升级流程如下:

img

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

4. 公平锁 VS 非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。

非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

直接用语言描述可能有点抽象,这里作者用从别处看到的一个例子来讲述一下公平锁和非公平锁。

img

如上图所示,假设有一口水井,有管理员看守,管理员有一把锁,只有拿到锁的人才能够打水,打完水要把锁还给管理员。每个过来打水的人都要管理员的允许并拿到锁之后才能去打水,如果前面有人正在打水,那么这个想要打水的人就必须排队。管理员会查看下一个要去打水的人是不是队伍里排最前面的人,如果是的话,才会给你锁让你去打水;如果你不是排第一的人,就必须去队尾排队,这就是公平锁。

但是对于非公平锁,管理员对打水的人没有要求。即使等待队伍里有排队等待的人,但如果在上一个人刚打完水把锁还给管理员而且管理员还没有允许等待队伍里下一个人去打水时,刚好来了一个插队的人,这个插队的人是可以直接从管理员那里拿到锁去打水,不需要排队,原本排队等待的人只能继续等待。如下图所示:

img

接下来我们通过ReentrantLock的源码来讲解公平锁和非公平锁。

img

根据代码可知,ReentrantLock里面有一个内部类Sync,Sync继承AQS(AbstractQueuedSynchronizer),添加锁和释放锁的大部分操作实际上都是在Sync中实现的。它有公平锁FairSync和非公平锁NonfairSync两个子类。ReentrantLock默认使用非公平锁,也可以通过构造器来显示的指定使用公平锁。

下面我们来看一下公平锁与非公平锁的加锁方法的源码:

img

通过上图中的源代码对比,我们可以明显的看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()。

img

再进入hasQueuedPredecessors(),可以看到该方法主要做一件事情:主要是判断当前线程是否位于同步队列中的第一个。如果是则返回true,否则返回false。

综上,公平锁就是通过同步队列来实现多个线程按照申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待问题,直接尝试获取锁,所以存在后申请却先获得锁的情况。

5. 可重入锁 VS 非可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。下面用示例代码来进行分析:

1
2
3
4
5
6
7
8
9
10
public class Widget {
public synchronized void doSomething() {
System.out.println("方法1执行...");
doOthers();
}

public synchronized void doOthers() {
System.out.println("方法2执行...");
}
}

在上面的代码中,类中的两个方法都是被内置锁synchronized修饰的,doSomething()方法中调用doOthers()方法。因为内置锁是可重入的,所以同一个线程在调用doOthers()时可以直接获得当前对象的锁,进入doOthers()进行操作。

如果是一个不可重入锁,那么当前线程在调用doOthers()之前需要将执行doSomething()时获取当前对象的锁释放掉,实际上该对象锁已被当前线程所持有,且无法释放。所以此时会出现死锁。

而为什么可重入锁就可以在嵌套调用时可以自动获得锁呢?我们通过图示和源码来分别解析一下。

还是打水的例子,有多个人在排队打水,此时管理员允许锁和同一个人的多个水桶绑定。这个人用多个水桶打水时,第一个水桶和锁绑定并打完水之后,第二个水桶也可以直接和锁绑定并开始打水,所有的水桶都打完水之后打水人才会将锁还给管理员。这个人的所有打水流程都能够成功执行,后续等待的人也能够打到水。这就是可重入锁。

img

但如果是非可重入锁的话,此时管理员只允许锁和同一个人的一个水桶绑定。第一个水桶和锁绑定打完水之后并不会释放锁,导致第二个水桶不能和锁绑定也无法打水。当前线程出现死锁,整个等待队列中的所有线程都无法被唤醒。

img

之前我们说过ReentrantLock和synchronized都是重入锁,那么我们通过重入锁ReentrantLock以及非可重入锁NonReentrantLock的源码来对比分析一下为什么非可重入锁在重复调用同步资源时会出现死锁。

首先ReentrantLock和NonReentrantLock都继承父类AQS,其父类AQS中维护了一个同步状态status来计数重入次数,status初始值为0。

当线程尝试获取锁时,可重入锁先尝试获取并更新status值,如果status == 0表示没有其他线程在执行同步代码,则把status置为1,当前线程开始执行。如果status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行status+1,且当前线程可以再次获取锁。而非可重入锁是直接去获取并尝试更新当前status的值,如果status != 0的话会导致其获取锁失败,当前线程阻塞。

释放锁时,可重入锁同样先获取当前status的值,在当前线程是持有锁的线程的前提下。如果status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。而非可重入锁则是在确定当前线程是持有锁的线程之后,直接将status置为0,将锁释放。

img

6. 独享锁 VS 共享锁

独享锁和共享锁同样是一种概念。我们先介绍一下具体的概念,然后通过ReentrantLock和ReentrantReadWriteLock的源码来介绍独享锁和共享锁。

独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK中的synchronized和JUC中Lock的实现类就是互斥锁。

共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

下图为ReentrantReadWriteLock的部分源码:

img

我们看到ReentrantReadWriteLock有两把锁:ReadLock和WriteLock,由词知意,一个读锁一个写锁,合称“读写锁”。再进一步观察可以发现ReadLock和WriteLock是靠内部类Sync实现的锁。Sync是AQS的一个子类,这种结构在CountDownLatch、ReentrantLock、Semaphore里面也都存在。

在ReentrantReadWriteLock里面,读锁和写锁的锁主体都是Sync,但读锁和写锁的加锁方式不一样。读锁是共享锁,写锁是独享锁。读锁的共享锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升。

那读锁和写锁的具体加锁方式有什么区别呢?在了解源码之前我们需要回顾一下其他知识。 在最开始提及AQS的时候我们也提到了state字段(int类型,32位),该字段用来描述有多少线程获持有锁。

在独享锁中这个值通常是0或者1(如果是重入锁的话state值就是重入的次数),在共享锁中state就是持有锁的数量。但是在ReentrantReadWriteLock中有读、写两把锁,所以需要在一个整型变量state上分别描述读锁和写锁的数量(或者也可以叫状态)。于是将state变量“按位切割”切分成了两个部分,高16位表示读锁状态(读锁个数),低16位表示写锁状态(写锁个数)。如下图所示:

img

了解了概念之后我们再来看代码,先看写锁的加锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); // 取到当前锁的个数
int w = exclusiveCount(c); // 取写锁的个数w
if (c != 0) { // 如果已经有线程持有了锁(c!=0)
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 如果写线程数(w)为0(换言之存在读锁) 或者持有锁的线程不是当前线程就返回失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 如果写入锁的数量大于最大数(65535,2的16次方-1)就抛出一个Error。
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 如果当且写线程数为0,并且当前线程需要阻塞那么就返回失败;或者如果通过CAS增加写线程数失败也返回失败。
return false;
setExclusiveOwnerThread(current); // 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者
return true;
}
  • 这段代码首先取到当前锁的个数c,然后再通过c来获取写锁的个数w。因为写锁是低16位,所以取低16位的最大值与当前的c做与运算( int w = exclusiveCount©; ),高16位和0与运算后是0,剩下的就是低位运算的值,同时也是持有写锁的线程数目。
  • 在取到写锁线程的数目后,首先判断是否已经有线程持有了锁。如果已经有线程持有了锁(c!=0),则查看当前写锁线程的数目,如果写线程数为0(即此时存在读锁)或者持有锁的线程不是当前线程就返回失败(涉及到公平锁和非公平锁的实现)。
  • 如果写入锁的数量大于最大数(65535,2的16次方-1)就抛出一个Error。
  • 如果当且写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果通过CAS增加写线程数失败也返回失败。
  • 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,返回成功!

tryAcquire()除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:必须确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。

因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,然后等待的读写线程才能够继续访问读写锁,同时前次写线程的修改对后续的读写线程可见。

接着是读锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; // 如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

可以看到在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是“1<<16”。所以读写锁才能实现读读的过程共享,而读写、写读、写写的过程互斥。

此时,我们再回头看一下互斥锁ReentrantLock中公平锁和非公平锁的加锁源码:

img

我们发现在ReentrantLock虽然有公平锁和非公平锁两种,但是它们添加的都是独享锁。根据源码所示,当某一个线程调用lock方法获取锁时,如果同步资源没有被其他线程锁住,那么当前线程在使用CAS更新state成功后就会成功抢占该资源。而如果公共资源被占用且不是被当前线程占用,那么就会加锁失败。所以可以确定ReentrantLock无论读操作还是写操作,添加的锁都是都是独享锁。

结语

本文Java中常用的锁以及常见的锁的概念进行了基本介绍,并从源码以及实际应用的角度进行了对比分析。限于篇幅以及个人水平,没有在本篇文章中对所有内容进行深层次的讲解。

其实Java本身已经对锁本身进行了良好的封装,降低了研发同学在平时工作中的使用难度。但是研发同学也需要熟悉锁的底层原理,不同场景下选择最适合的锁。而且源码中的思路都是非常好的思路,也是值得大家去学习和借鉴的。

参考文章

背景

上一篇只是单纯的从原理上以及控制台上去实践系统之间的打通,但是如果能从页面上去看每一个请求日志的链路情况就更好了。其实zipkin是提供了一个UI后台管理给到我们的。

**注意点:**关于 Zipkin 的服务端,在使用 Spring Boot 2.x 版本后,官方就不推荐自行定制编译了,反而是直接提供了编译好的 jar 包来给我们使用。具体请查阅:https://zipkin.io/pages/quickstart.html (最直接、权威的就是阅读官方网站)

1
2
curl -sSL https://zipkin.io/quickstart.sh | bash -s
java -jar zipkin.jar

快速上手

Zipkin 分为两端,一个是 Zipkin 服务端,一个是 Zipkin 客户端,客户端也就是微服务的应用。
客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端。
发送的方式主要有两种,一种是 HTTP 报文的方式,还有一种是消息总线的方式如 RabbitMQ。

不论哪种方式,我们都需要:

  • 一个 Eureka 服务注册中心,这里我们就用之前的 eureka 项目来当注册中心。
  • 一个 Zipkin 服务端。
  • 三个微服务应用,trace-atrace-btrace-c,其中 trace-a 中有一个 REST 接口 /trace-a,调用该接口后将触发对 trace-b 应用的调用,最后调用trace-c。其实2个服务就够了~

环境说明

也是因为一些环境问题,总是导致ui后管一直没有数据,这次参考了很多文章一级官网网址来操作的,才得以显示数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>

<properties>
<spring-cloud-dependencies.version>Finchley.RELEASE</spring-cloud-dependencies.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-dependencies.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

方式一:通过HTTP

客户端的pom文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

客户端的properties文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
spring.application.name=trace-a
server.port=10001

eureka.client.serviceUrl.defaultZone=http://localhost:8001/eureka/

spring.sleuth.web.client.enabled=true
spring.sleuth.sampler.probability=1.0
spring.zipkin.base-url=http://localhost:9411

# log trace detail
logging.level.org.springframework.web.servlet.DispatcherServlet=DEBUG

访问层部分Java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.allei;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
@RestController
@EnableDiscoveryClient
@SpringBootApplication
@Slf4j
public class TraceAApplication {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}

@RequestMapping(value = "/trace-a", method = RequestMethod.GET)
public String trace() throws InterruptedException {
log.info("call trace-a----->");
TimeUnit.SECONDS.sleep(1l);
return restTemplate().getForEntity("http://trace-b/trace-b", String.class).getBody();
}

public static void main(String[] args) {
SpringApplication.run(TraceAApplication.class, args);
}
}

还是确定一下注册中心,保证服务都有起来。

http://static.cyblogs.com/WX20200113-155940@2x.png

通过上面的脚本拉取最新的zipkin服务,此时的版本是:2.19.2

1
2
3
4
5
6
7
8
9
➜  Desktop  curl -sSL https://zipkin.io/quickstart.sh | bash -s
Thank you for trying Zipkin!
This installer is provided as a quick-start helper, so you can try Zipkin out
without a lengthy installation process.

Fetching version number of latest io.zipkin:zipkin-server release...
Latest release of io.zipkin:zipkin-server seems to be 2.19.2

Downloading io.zipkin:zipkin-server:2.19.2:exec to zipkin.jar...

并且启动服务:嗯~ logo已经更换的更加的骚气~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
➜  Desktop  java -jar zipkin.jar

oo
oooo
oooooo
oooooooo
oooooooooo
oooooooooooo
ooooooo ooooooo
oooooo ooooooo
oooooo ooooooo
oooooo o o oooooo
oooooo oo oo oooooo
ooooooo oooo oooo ooooooo
oooooo ooooo ooooo ooooooo
oooooo oooooo oooooo ooooooo
oooooooo oo oo oooooooo
ooooooooooooo oo oo ooooooooooooo
oooooooooooo oooooooooooo
oooooooo oooooooo
oooo oooo

________ ____ _ _____ _ _
|__ /_ _| _ \| |/ /_ _| \ | |
/ / | || |_) | ' / | || \| |
/ /_ | || __/| . \ | || |\ |
|____|___|_| |_|\_\___|_| \_|

:: version 2.19.2 :: commit 56d907b ::

当浏览器访问trace-a服务的时候,去后管页面查看就能看到服务列表以及日志情况。

图一:

http://static.cyblogs.com/WX20200113-145151@2x.png

图二:

http://static.cyblogs.com/WX20200113-145220@2x.png

方式二:RabbitMQ

安装RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
➜  ~  brew install rabbitmq
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 4 taps (homebrew/cask, caskroom/versions, caskroom/cask and adoptopenjdk/openjdk).

==> Downloading https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-generic-unix-3.8.0.tar.xz
==> Downloading from https://github-production-release-asset-2e65be.s3.amazonaws.com/924551/71891480-e44b-11e9-80fd-c06739f04c5e?X-Amz-Algorith
######################################################################## 100.0%
==> /usr/bin/unzip -qq -j /usr/local/Cellar/rabbitmq/3.8.0/plugins/rabbitmq_management-3.8.0.ez rabbitmq_management-3.8.0/priv/www/cli/rabbitmq
==> Caveats
Management Plugin enabled by default at http://localhost:15672

Bash completion has been installed to:
/usr/local/etc/bash_completion.d

To have launchd start rabbitmq now and restart at login:
brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
rabbitmq-server
==> Summary
🍺 /usr/local/Cellar/rabbitmq/3.8.0: 277 files, 18.2MB, built in 34 seconds
==> `brew cleanup` has not been run in 30 days, running now...
Removing: /usr/local/Cellar/jpeg/9b... (20 files, 724KB)
Removing: /usr/local/Cellar/libpng/1.6.34... (26 files, 1.2MB)
Removing: /usr/local/Cellar/libtiff/4.0.8_5... (245 files, 3.4MB)
Removing: /usr/local/Cellar/wxmac/3.0.3.1_1... (810 files, 24.3MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/Cask/minikube--0.25.0... (41.3MB)
Removing: /Users/chenyuan/Library/Logs/Homebrew/go... (64B)
Removing: /Users/chenyuan/Library/Logs/Homebrew/readline... (64B)
Removing: /Users/chenyuan/Library/Logs/Homebrew/sqlite... (64B)
Removing: /Users/chenyuan/Library/Logs/Homebrew/kubernetes-cli... (64B)
Pruned 0 symbolic links and 2 directories from /usr/local

安装的路径在:/usr/local/Cellar/rabbitmq/3.8.0,利用brew安装软件速度可能会很慢,大家可以去更换一下源就变好了。但这个其实也不一定。

启动RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
➜  sbin  rabbitmq-server

## ## RabbitMQ 3.8.0
## ##
########## Copyright (C) 2007-2019 Pivotal Software, Inc.
###### ##
########## Licensed under the MPL. See https://www.rabbitmq.com/

Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html

Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

Config file(s): (none)

Starting broker... completed with 6 plugins.

http://static.cyblogs.com/WX20200111-064208@2x.png

默认密码账号为guest/guest。登录进入后就能看到界面了。

http://static.cyblogs.com/WX20200111-064349@2x.png

配置客户端pom文件:

1
2
3
4
5
# 新加入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

为了觉得不是http的模式导致的数据,故意让地址写错

1
spring.zipkin.base-url=http://localhost:9412

然后客户端就什么都不用管了。但是服务端因为是下载的Jar的方式,让其支持RabbitMQ只能通过环境变量的方式来启动。

1
RABBIT_ADDRESSES=localhost java -jar zipkin.jar

这个时候会发现RabbitMQ会有一个zipkin的队列。

http://static.cyblogs.com/WX20200113-150636@2x.png可配置的环境变量如下表所示:

属性 环境变量 描述
zipkin.collector.rabbitmq.concurrency RABBIT_CONCURRENCY 并发消费者数量,默认为 1
zipkin.collector.rabbitmq.connection-timeout RABBIT_CONNECTION_TIMEOUT 建立连接时的超时时间,默认为 60000 毫秒,即 1 分钟
zipkin.collector.rabbitmq.queue RABBIT_QUEUE 从中获取 span 信息的队列,默认为 zipkin
zipkin.collector.rabbitmq.uri RABBIT_URI 符合 RabbitMQ URI 规范 的 URI,例如 amqp://user:pass@host:10000/vhost

如果设置了 URI,则以下属性将被忽略。

属性 环境变量 描述
zipkin.collector.rabbitmq.addresses RABBIT_ADDRESSES 用逗号分隔的 RabbitMQ 地址列表,例如 localhost:5672,localhost:5673
zipkin.collector.rabbitmq.password RABBIT_PASSWORD 连接到 RabbitMQ 时使用的密码,默认为 guest
zipkin.collector.rabbitmq.username RABBIT_USER 连接到 RabbitMQ 时使用的用户名,默认为 guest
zipkin.collector.rabbitmq.virtual-host RABBIT_VIRTUAL_HOST 使用的 RabbitMQ virtual host,默认为 /
zipkin.collector.rabbitmq.use-ssl RABBIT_USE_SSL 设置为 true 则用 SSL 的方式与 RabbitMQ 建立链接

然后访问 http://localhost:10001/trace-a 并刷新 Zipkin UI,看到如下内容,就说明 Sleuth+Zipkin+RabbitMQ 整合成功了。

http://static.cyblogs.com/WX20200113-150738@2x.png

参考地址

前言

通过我之前的Tomcat系列文章,相信看我博客的同学对Tomcat应该有一个比较清晰的了解了,在前几篇博客我们讨论了Tomcat在SpringBoot框架中是如何启动的,讨论了Tomcat的内部组件是如何设计以及请求是如何流转的,那么我们这篇博客聊聊Tomcat的异步Servlet,Tomcat是如何实现异步Servlet的以及异步Servlet的使用场景。

手撸一个异步的Servlet

我们直接借助SpringBoot框架来实现一个Servlet,这里只展示Servlet代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@WebServlet(urlPatterns = "/async",asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {

ExecutorService executorService =Executors.newSingleThreadExecutor();

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//开启异步,获取异步上下文
final AsyncContext ctx = req.startAsync();
// 提交线程池异步执行
executorService.execute(new Runnable() {


@Override
public void run() {
try {
log.info("async Service 准备执行了");
//模拟耗时任务
Thread.sleep(10000L);
ctx.getResponse().getWriter().print("async servlet");
log.info("async Service 执行了");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//最后执行完成后完成回调。
ctx.complete();
}
});
}

上面的代码实现了一个异步的Servlet,实现了doGet方法注意在SpringBoot中使用需要再启动类加上@ServletComponentScan注解来扫描Servlet。既然代码写好了,我们来看看实际运行效果。

http://static.cyblogs.com/async%20Servlet%20result.png

我们发送一个请求后,看到页面有响应,同时,看到请求时间花费了10.05s,那么我们这个Servlet算是能正常运行啦。有同学肯定会问,这不是异步servlet吗?你的响应时间并没有加快,有什么用呢?对,我们的响应时间并不能加快,还是会取决于我们的业务逻辑,但是我们的异步servlet请求后,依赖于业务的异步执行,我们可以立即返回,也就是说,Tomcat的线程可以立即回收,默认情况下,Tomcat的核心线程是10,最大线程数是200,我们能及时回收线程,也就意味着我们能处理更多的请求,能够增加我们的吞吐量,这也是异步Servlet的主要作用。

异步Servlet的内部原理

了解完异步Servlet的作用后,我们来看看,Tomcat是如何是先异步Servlet的。其实上面的代码,主要核心逻辑就两部分,final AsyncContext ctx = req.startAsync() ctx.complete()那我们来看看他们究竟做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}

if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}

asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());

return asyncContext;
}

我们发现req.startAsync()只是保存了一个异步上下文,同时设置一些基础信息,比如Timeout,顺便提一下,这里设置的默认超时时间是30S,如果你的异步处理逻辑超过30S,此时执行ctx.complete()就会抛出IllegalStateException 异常。

我们来看看ctx.complete()的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
//类:AbstractProcessor
public final void action(ActionCode actionCode, Object param) {
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
}
//类:AbstractProcessor
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}
//类:AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
//省略部分代码
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}

return true;
}

所以,这里最终会调用AbstractEndpointprocessSocket方法,之前看过我前面博客的同学应该有印象,EndPoint是用来接受和处理请求的,接下来就会交给Processor去进行协议处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
类:AbstractProcessorLight
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
//省略部分diam
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {

} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
state = SocketState.CLOSED;
}

} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);

return state;
}

这部分是重点,AbstractProcessorLight会根据SocketEvent的状态来判断是不是要去调用service(socketWrapper),该方法最终会去调用到容器,从而完成业务逻辑的调用,我们这个请求是执行完成后调用的,肯定不能进容器了,不然就是死循环了,这里通过isAsync() 判断,就会进入dispatch(status),最终会调用CoyoteAdapterasyncDispatch方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
SocketEvent status) throws Exception {
//省略部分代码
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
boolean success = true;
AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
try {
if (!request.isAsync()) {
response.setSuspended(false);
}

if (status==SocketEvent.TIMEOUT) {
if (!asyncConImpl.timeout()) {
asyncConImpl.setErrorState(null, false);
}
} else if (status==SocketEvent.ERROR) {

}

if (!request.isAsyncDispatching() && request.isAsync()) {
WriteListener writeListener = res.getWriteListener();
ReadListener readListener = req.getReadListener();
if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
res.onWritePossible();//这里执行浏览器响应,写入数据
if (request.isFinished() && req.sendAllDataReadEvent() &&
readListener != null) {
readListener.onAllDataRead();
}
} catch (Throwable t) {

} finally {
request.getContext().unbind(false, oldCL);
}
}
}
}
//这里判断异步正在进行,说明这不是一个完成方法的回调,是一个正常异步请求,继续调用容器。
if (request.isAsyncDispatching()) {
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
if (t != null) {
asyncConImpl.setErrorState(t, true);
}
}
//注意,这里,如果超时或者出错,request.isAsync()会返回false,这里是为了尽快的输出错误给客户端。
if (!request.isAsync()) {
//这里也是输出逻辑
request.finishRequest();
response.finishResponse();
}
//销毁request和response
if (!success || !request.isAsync()) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
return success;
}

上面的代码就是ctx.complete()执行最终的方法了(当然省略了很多细节),完成了数据的输出,最终输出到浏览器。

这里有同学可能会说,我知道异步执行完后,调用ctx.complete()会输出到浏览器,但是,第一次doGet请求执行完成后,Tomcat是怎么知道不用返回到客户端的呢?关键代码在CoyoteAdapter中的service方法,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
postParseSuccess = postParseRequest(req, request, res, response);
//省略部分代码
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
} else {
//输出数据到客户端
request.finishRequest();
response.finishResponse();
if (!async) {
updateWrapperErrorCount(request, response);
//销毁request和response
request.recycle();
response.recycle();
}

这部分代码在调用完Servlet后,会通过request.isAsync()来判断是否是异步请求,如果是异步请求,就设置 async = true。如果是非异步请求就执行输出数据到客户端逻辑,同时销毁requestresponse。这里就完成了请求结束后不响应客户端的操作。

为什么说Spring Boot的@EnableAsync注解不是异步Servlet

因为之前准备写本篇文章的时候就查询过很多资料,发现很多资料写SpringBoot异步编程都是依赖于@EnableAsync注解,然后在Controller用多线程来完成业务逻辑,最后汇总结果,完成返回输出。这里拿一个掘金大佬的文章来举例《新手也能看懂的 SpringBoot 异步编程指南》,这篇文章写得很通俗易懂,非常不错,从业务层面来说,确实是异步编程,但是有一个问题,抛开业务的并行处理来说,针对整个请求来说,并不是异步的,也就是说不能立即释放Tomcat的线程,从而不能达到异步Servlet的效果。这里我参考上文也写了一个demo,我们来验证下,为什么它不是异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@RestController
@Slf4j
public class TestController {
@Autowired
private TestService service;

@GetMapping("/hello")
public String test() {
try {
log.info("testAsynch Start");
CompletableFuture<String> test1 = service.test1();
CompletableFuture<String> test2 = service.test2();
CompletableFuture<String> test3 = service.test3();
CompletableFuture.allOf(test1, test2, test3);
log.info("test1=====" + test1.get());
log.info("test2=====" + test2.get());
log.info("test3=====" + test3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "hello";
}
@Service
public class TestService {
@Async("asyncExecutor")
public CompletableFuture<String> test1() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test1");
}

@Async("asyncExecutor")
public CompletableFuture<String> test2() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test2");
}

@Async("asyncExecutor")
public CompletableFuture<String> test3() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test3");
}
}
@SpringBootApplication
@EnableAsync
public class TomcatdebugApplication {

public static void main(String[] args) {
SpringApplication.run(TomcatdebugApplication.class, args);
}

@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsynchThread-");
executor.initialize();
return executor;
}

这里我运行下,看看效果

http://static.cyblogs.com/Spring%20boot%20异步1.gif

这里我请求之后,在调用容器执行业务逻辑之前打了一个断点,然后在返回之后的同样打了一个断点,在Controller执行完之后,请求才回到了CoyoteAdapter中,并且判断request.isAsync(),根据图中看到,是为false,那么接下来就会执行 request.finishRequest()response.finishResponse() 来执行响应的结束,并销毁请求和响应体。很有趣的事情是,我实验的时候发现,在执行request.isAsync()之前,浏览器的页面上已经出现了响应体,这是SpringBoot框架已经通过StringHttpMessageConverter类中的writeInternal方法已经进行输出了。

以上分析的核心逻辑就是,Tomcat的线程执行CoyoteAdapter调用容器后,必须要等到请求返回,然后再判断是否是异步请求,再处理请求,然后执行完毕后,线程才能进行回收。而我一最开始的异步Servlet例子,执行完doGet方法后,就会立即返回,也就是会直接到request.isAsync()的逻辑,然后整个线程的逻辑执行完毕,线程被回收。

聊聊异步Servlet的使用场景

分析了这么多,那么异步Servlet的使用场景有哪些呢?其实我们只要抓住一点就可以分析了,就是异步Servlet提高了系统的吞吐量,可以接受更多的请求。假设web系统中Tomcat的线程不够用了,大量请求在等待,而此时Web系统应用层面的优化已经不能再优化了,也就是无法缩短业务逻辑的响应时间了,这个时候,如果想让减少用户的等待时间,提高吞吐量,可以尝试下使用异步Servlet。

举一个实际的例子:比如做一个短信系统,短信系统对实时性要求很高,所以要求等待时间尽可能短,而发送功能我们实际上是委托运营商去发送的,也就是说我们要调用接口,假设并发量很高,那么这个时候业务系统调用我们的发送短信功能,就有可能把我们的Tomcat线程池用完,剩下的请求就会在队列中等待,那这个时候,短信的延时就上去了,为了解决这个问题,我们可以引入异步Servlet,接受更多的短信发送请求,从而减少短信的延时。

总结

这篇文章我从手写一个异步Servlet来开始,分析了异步Servlet的作用,以及Tomcat内部是如何实现异步Servlet的,然后我也根据互联网上流行的SpringBoot异步编程来进行说明,其在Tomcat内部并不是一个异步的Servlet。最后,我谈到了异步Servlet的使用场景,分析了什么情况下可以尝试异步Servlet。

学习东西要知行合一,如果只是知道理论而没实践过,那么掌握的也不会特别扎实,估计过几天就会忘记,接下来我们一起实践来学习Spring事务的传播属性。

传播属性

传播属性定义的是当一个事务方法碰到另一个事务方法时的处理行为,一共有七种行为,定义如下

传播性 描述
PROPAGATION_REQUIRED 0 支持当前事务,如果没有就新建事务
PROPAGATION_SUPPORTS 1 支持当前事务,如果没有就不以事务的方式运行
PROPAGATION_MANDATORY 2 支持当前事务,如果当前没事务就抛异常
PROPAGATION_REQUIRES_NEW 3 无论当前是否有事务,都会新起一个事务
PROPAGATION_NOT_SUPPORTED 4 不支持事务,如果当前存在事务,就将此事务挂起不以事务方式运行
PROPAGATION_NEVER 5 不支持事务,如果有事务就抛异常
PROPAGATION_NESTED 6 如果当前存在事务,在当前事务中再新起一个事务

其实只看概念的话已经很直截了当了说明了每个传播性的作用,此时我们再用具体的例子演示一下每个传播性属性下的行为。

此次演示我们使用的是H2数据库,这个数据库是作用在内存里面的,所以对于我们演示事务效果来说正好,无需我们在进行其他的配置了,我们新建一个表。将下面语句放在schema.sql文件里面即可,SpringBoot程序在启动的时候就会自动为我们在内存里面建立这样的一个表。

1
CREATE TABLE FOO (ID INT IDENTITY, BAR VARCHAR(64));

演示之前我们会定义两个类FooServiceBarService。我们使用BarService 里面的方法进行调用FooService 中的方法。

环境准备

在进行事务演示之前,其实可以分为以下几种情况,根据排列组合,我们可以得出以下八种情况

  • 调用者:有无事务
  • 调用者:是否有异常
  • 被调用者:有无事务**(这个是通过传播属性进行控制的)**所以并不在排列组合中
  • 被调用者:是否有异常
调用者是否有事务 调用者是否有异常 被调用者是否有异常
异常类

其中的RollbackException是我们自己定义的一个异常类

1
2
3
4
5
6
7
8
9
10
11
@Service
public class BarServiceImpl implements BarService{
@Autowired
private FooService fooService;

// PROPAGATION_REQUIRED演示 无事务
@Override
public void testRequiredNoTransactional() throws RollbackException {
fooService.testRequiredTransactional();
}
}
调用者

BarService中定义两个方法,一个是带着事务的,一个是不带事务的

1
2
3
4
5
6
7
8
9
10
11
12
// 有事务
@Override
@Transactional(rollbackFor = Exception.class)
public void hasTransactional() throws RollbackException {

}

// 无事务
@Override
public void noTransactional() throws RollbackException {

}

接下来我们就根据俄上面定义的八种情况进行事务传播属性的学习。

PROPAGATION_REQUIRED

在此传播属性下,被调用方是否新建事务取决去调用者是否带着事务。

想要了解这个传播属性的特性,其实我们演示上面八种情况的两个例子就够了

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况我们在被调用者抛出异常的情况下,如果查询不到插入的数据,那么就说明被调用者在调用者没有事务的情况下自己新建了事务。
  • 第二种情况我们在调用者抛出异常的情况下,如果查询不到插入的数据,那么就说明被调用者在调用者有事务的情况下就加入当前事务了。

我们先来看一下被调用者的类的方法例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class FooServiceImpl implements FooService {
@Autowired
private JdbcTemplate jdbcTemplate;

// REQUIRED传播属性-被调用者有异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void testRequiredHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ("+Global.REQUIRED_HAS_EXCEPTION+")");
throw new RollbackException();
}

// REQUIRED传播属性-被调用者无异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void testRequiredNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ("+Global.REQUIRED_NO_EXCEPTION+")");
}
}

接下来我们看一下调用者方法的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
public class BarServiceImpl implements BarService{
@Autowired
private FooService fooService;

// 有事务
@Override
@Transactional(rollbackFor = Exception.class)
public void hasTransactional() throws RollbackException {
// 调用者有事务,抛异常 被调用者无异常
fooService.testRequiredNoException();
throw new RollbackException();
}

// 无事务
@Override
public void noTransactional() throws RollbackException {
// 调用者无事务,不抛异常 被调用者有异常
fooService.testRequiredHasException();
}
}

此时我们在程序调用时进行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
String noException = Global.REQUIRED_NO_EXCEPTION;
String hasException = Global.REQUIRED_HAS_EXCEPTION;

try {
barService.noTransactional();
}catch (Exception e){
log.info("第一种情况 {}",
jdbcTemplate
.queryForObject("SELECT COUNT(*) FROM FOO WHERE BAR='"+hasException+"'", Long.class));
}

try {
barService.hasTransactional();
}catch (Exception e){
log.info("第二种情况 {}",
jdbcTemplate
.queryForObject("SELECT COUNT(*) FROM FOO WHERE BAR='"+noException+"'", Long.class));
}

查看打印出来的日志

1
2
2019-10-16 13:02:04.142  INFO 11869 --- [           main] c.e.t.t.TransactionApplication           : 第一种情况 0
2019-10-16 13:02:04.143 INFO 11869 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 0

我们看到我们都没有查到相应的数据,说明数据都回滚了。此时我们应该就理解了那句话支持当前事务,如果没有就新建事务

PROPAGATION_SUPPORTS

被调用者是否有事务,完全依赖于调用者,调用者有事务则有事务,调用者没事务则没事务。

接下来我们还是用上面的两个例子进行演示

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况:被调用者抛出异常的情况下,如果仍能查询到数据,说明事务没有回滚,说明被调用者没有事务
  • 第二种情况:调用者抛出异常情况下,如果查不到数据,说明两个方法在一个事务中

接下来仍然是例子演示

被调用者,只是将@Transactional 注解中的propagation 属性更换为了Propagation.SUPPORTS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SUPPORTS传播属性-被调用者有异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.SUPPORTS)
public void testSupportsHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.SUPPORTS_HAS_EXCEPTION+"')");
throw new RollbackException();
}

// SUPPORTS传播属性-被调用者无异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.SUPPORTS)
public void testSupportsNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.SUPPORTS_NO_EXCEPTION+"')");
}

调用者和上面的例子调用一样,我们直接查看执行效果

1
2
2019-10-16 13:50:27.738  INFO 12174 --- [           main] c.e.t.t.TransactionApplication           : 第一种情况 1
2019-10-16 13:50:27.741 INFO 12174 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 0

我们看到了在第一种情况下查到了数据,说明在第一种情况下被调用者是没有事务的。此时我们应该就理解了这句话 支持当前事务,如果没有就不以事务的方式运行

PROPAGATION_MANDATORY

依然是这两个例子进行演示

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况:因为调用者没有事务,所以此传播属性下应该是抛异常的
  • 第二种情况:被调用者的事务和调用者事务是同样的

接下来是被调用者的代码例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// MANDATORY传播属性-被调用者有异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.MANDATORY)
public void testMandatoryHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.SUPPORTS_HAS_EXCEPTION+"')");
throw new RollbackException();
}

// MANDATORY传播属性-被调用者无异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.MANDATORY)
public void testMandatoryNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.SUPPORTS_NO_EXCEPTION+"')");
}

调用者和上面的例子调用一样,我们直接查看执行效果

1
2
3
2019-10-16 13:58:39.178 ERROR 12317 --- [           main] c.e.t.t.TransactionApplication           : org.springframework.transaction.IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'
2019-10-16 13:58:39.276 INFO 12317 --- [ main] c.e.t.t.TransactionApplication : 第一种情况 0
2019-10-16 13:58:39.281 INFO 12317 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 0

我们发现和我们推测一样,说明被调用者是不会自己新建事务的,此时我们应该就理解了这句话支持当前事务,如果当前没事务就抛异常

PROPAGATION_REQUIRES_NEW

此传播属性下,无论调用者是否有事务,被调用者都会新建一个事务

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况:调用者无事务,被调用者会新建事务,所以查不到数据
  • 第二种情况:调用者有事务,被调用者会新建一个事务,所以调用者抛异常影响不到被调用者,所以能查到数据

接下来我们演示代码。

被调用者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// REQUIRES_NEW传播属性-被调用者有异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW)
public void testRequiresNewHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.REQUIRES_NEW_HAS_EXCEPTION+"')");
throw new RollbackException();
}

// REQUIRES_NEW传播属性-被调用者无异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW)
public void testRequiresNewNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.REQUIRES_NEW_NO_EXCEPTION+"')");
}

调用者的例子和上面的相同,我们直接来看执行情况

1
2
2019-10-16 16:29:20.296  INFO 15553 --- [           main] c.e.t.t.TransactionApplication           : 第一种情况 0
2019-10-16 16:29:20.298 INFO 15553 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 1

我们发现和我们的推论是一样的,说明调用者的事务和被调用者的事务完全无关。此时我们应该就理解这句话了无论当前是否有事务,都会新起一个事务

PROPAGATION_NOT_SUPPORTED

无论调用者是否有事务,被调用者都不以事务的方法运行

同样是这两个例子

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况:被调用者都不会有事务,那么在抛异常之后就能查到相应的数据
  • 第二种情况:在调用者有事务的情况下,被调用者也会在无事务环境下运行,所以我们依然能查到数据

接下来验证我们的猜测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// NOT_SUPPORTED传播属性-被调用者有异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NOT_SUPPORTED)
public void testNotSupportHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.NOT_SUPPORTS_HAS_EXCEPTION+"')");
throw new RollbackException();
}

// NOT_SUPPORTED传播属性-被调用者无异常抛出
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NOT_SUPPORTED)
public void testNotSupportNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.NOT_SUPPORTS_NO_EXCEPTION+"')");
}

然后查看执行结果

1
2
2019-10-16 16:38:35.065  INFO 15739 --- [           main] c.e.t.t.TransactionApplication           : 第一种情况 1
2019-10-16 16:38:35.067 INFO 15739 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 1

我们可以看到在最后两种情况都查到了数据,根据演示效果应该可以理解这句话了,不支持事务,如果当前存在事务,就将此事务挂起不以事务方式运行

PROPAGATION_NEVER

调用者有事务,被调用者就会抛出异常

调用者是否有事务 调用者是否有异常 被调用者是否有异常

这个就不演示,相信大家看到这里应该都会明白在第一种情况下我们是能够查到数据的。在第二种情况下由于调用者带着事务,所以会抛异常。

PROPAGATION_NESTED

此传播属性下,被调用者的事务是调用者的事务的子集。

我们重点说一下NESTED的传播属性的特性

调用者是否有事务 说明
被调用者会新起一个事务,此事务和调用者事务是一个嵌套的关系
被调用者会自己新起一个事务

关于什么是嵌套事务的关系,我们用下面三个例子能够进行演示。

调用者是否有事务 调用者是否有异常 被调用者是否有异常
  • 第一种情况:如果查不到数据,则说明在调用者无事务情况下,被调用者会新起一个事务
  • 第二种情况:如果查不到数据,说明外层事务能够影响内层事务
  • 第三种情况:如果查到数据,说明内层事务不影响外层事务

接下来我们编写具体的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NESTED传播属性-回滚事务
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NESTED)
public void testNestedHasException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.NESTED_HAS_EXCEPTION+"')");
// TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
throw new RollbackException();
}

// NESTED传播属性-不回滚事务
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NESTED)
public void testNestedNoException() throws RollbackException {
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.NESTED_NO_EXCEPTION+"')");
}

然后接下来的调用者也会有点区别

1
2
3
4
5
6
7
@Override
@Transactional()
public void hasTransactionalNoException() throws RollbackException {
// NESTED传播属性 - 调用者有事务,不抛异常 被调用者有异常
jdbcTemplate.execute("INSERT INTO FOO (BAR) VALUES ('"+Global.NESTED_HAS_EXCEPTION_TWO+"')");
fooService.testNestedHasException();
}

然后执行效果

1
2
3
2019-10-16 18:01:06.387  INFO 17172 --- [           main] c.e.t.t.TransactionApplication           : 第一种情况 0
2019-10-16 18:01:06.389 INFO 17172 --- [ main] c.e.t.t.TransactionApplication : 第二种情况 0
2019-10-16 18:01:06.390 INFO 17172 --- [ main] c.e.t.t.TransactionApplication : 第三种情况 1

可以看出来嵌套事务的本质就是外层会影响内层,内层不影响外层。而REQUIRES_NEW则是互不影响

总结

到现在我们已经全部分析完了七种传播属性,从写这篇文章开始到结束其中也碰到过一些坑,有些是不自己实践一遍是根本不知道的,所以我还是建议读者看完这篇文章以后自己进行实践,演示各种情况,只有这样才能够烂熟于心。

Zipkin服务追踪原理

创造一些追踪标识符(tracingId,spanId,parentId),最终将一个request的流程树构建出来,各业务系统在彼此调用时,将特定的跟踪消息传递至zipkin,zipkin在收集到跟踪信息后将其聚合处理、存储、展示等,用户可通过web UI方便获得网络延迟、调用链路、系统依赖等等。

http://static.cyblogs.com/140563-20170613131120462-1419921670.png

transport作用:收集被trace的services的spans,并将它们转化为zipkin common Span,之后把这些Spans传递的存储层

collector会对一个到来的被trace的数据(span)进行验证、存储并设置索引(Cassandra/ES-search/Memory)

Zipkin基本概念&核心数据结构
  • Annotation(用途:用于定位一个request的开始和结束,cs/sr/ss/cr含有额外的信息,比如说时间点):
    • cs:Client Start,表示客户端发起请求一个span的开始
    • sr:Server Receive,表示服务端收到请求
    • ss:Server Send,表示服务端完成处理,并将结果发送给客户端
    • cr:Client Received,表示客户端获取到服务端返回信息一个span的结束,当这个annotation被记录了,这个RPC也被认为完成。客户端调用时间=cr-cs,服务端处理时间=sr-ss。
  • Span:一个请求(包含一组Annotation和BinaryAnnotation);它是基本工作单元,一次链路调用(可以是RPC,DB等没有特定的限制)创建一个span,通过一个64位ID标识它。
    • span通过还有其他的数据,例如描述信息,时间戳,key-value对的(Annotation)tag信息,parent-id等,其中parent-id可以表示span调用链路来源,通俗的理解span就是一次请求信息
  • Trace:类似于树结构的Span集合,表示一条调用链路,存在唯一标识通过traceId(全局的跟踪ID,是跟踪的入口点,根据需求来决定在哪生成traceId)、spanId(请求跟踪ID,比如一次rpc等)和parentId(上一次 请求跟踪ID,用来将前后的请求串联起来),被收集到的span会汇聚成一个tree,从而提供出一个request的整体流程。

整个描述:在一次Trace中,每个服务的每一次调用,就是一个基本工作单元,就像上图中的每一个树节点,称之为span。每一个span都有一个id作为唯一标识,同样每一次Trace都会生成一个traceId在span中作为追踪标识,另外再通过一个parentId标明本次调用的发起者(就是发起者的span-id)。当span有了上面三个标识后,就可以很清晰的将多个span进行梳理串联,最终归纳出一条完整的跟踪链路。

代码实践操作

无存储方式,只需要控制台

本文的源代码在:git@github.com:chengcheng222e/springcloud-learn.git 欢迎大家去fork。

/Users/chenyuan/Workspaces/Github/springcloud-learn/eureka-server

1
2
3
4
5
6
7
@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class,args);
}
}

application.properties

1
2
3
4
5
6
spring.application.name=eureka-server
server.port=8001
eureka.instance.hostname=localhost
eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false

/Users/chenyuan/Workspaces/Github/springcloud-learn/zipkin-server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.allei;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import zipkin.server.internal.EnableZipkinServer;

@EnableZipkinServer
@EnableEurekaClient
@SpringBootApplication
public class ZipkinApplication {
public static void main(String[] args) {
SpringApplication.run(ZipkinApplication.class, args);
}
}

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
spring.application.name=zipkin-server
server.port=10000

eureka.client.serviceUrl.defaultZone=http://localhost:8001/eureka/

zipkin.instance.hostname=localhost
spring.zipkin.base-url=http://${zipkin.instance.hostname}:${server.port}/

spring.sleuth.enabled=false
spring.sleuth.sampler.probability=1.0
spring.sleuth.hystrix.strategy.enabled=true

spring.main.allow-bean-definition-overriding=true

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-a

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.allei;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
@EnableDiscoveryClient
@SpringBootApplication
@Slf4j
public class TraceAApplication {

@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}

@RequestMapping(value = "/trace-a", method = RequestMethod.GET)
public String trace() throws InterruptedException {
log.info("call trace-a----->");
TimeUnit.SECONDS.sleep(1l);
return restTemplate().getForEntity("http://trace-b/trace-b", String.class).getBody();
}

public static void main(String[] args) {
SpringApplication.run(TraceAApplication.class, args);
}
}

application.properties

1
2
3
4
5
6
7
8
9
spring.application.name=trace-a
server.port=10001

eureka.client.serviceUrl.defaultZone=http://localhost:8001/eureka/

spring.zipkin.base-url=http://localhost:10000
spring.sleuth.sampler.percentage=1
# log trace detail
logging.level.org.springframework.web.servlet.DispatcherServlet=DEBUG

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-b

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.allei;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
@EnableDiscoveryClient
@SpringBootApplication
@Slf4j
public class TraceBApplication {

@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}

@RequestMapping(value = "/trace-b", method = RequestMethod.GET)
public String trace() throws InterruptedException {
log.info("call trace-b----->");
TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2));
return restTemplate().getForEntity("http://trace-c/trace-c", String.class).getBody();
}

public static void main(String[] args) {
SpringApplication.run(TraceBApplication.class, args);
}
}

application.properties

1
2
3
4
5
6
7
8
9
10
11
spring.application.name=trace-b
server.port=10002

eureka.client.serviceUrl.defaultZone=http://localhost:8001/eureka/

spring.zipkin.base-url=http://localhost:10000

spring.sleuth.sampler.percentage=1

# log trace detail
logging.level.org.springframework.web.servlet.DispatcherServlet=DEBUG

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.allei;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
@EnableDiscoveryClient
@SpringBootApplication
@Slf4j
public class TraceCApplication {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}

@RequestMapping(value = "/trace-c", method = RequestMethod.GET)
public String trace() throws InterruptedException {
log.info("call trace-c----->");
TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2));
return "hello trace";
}

public static void main(String[] args) {
SpringApplication.run(TraceCApplication.class, args);
}
}

application.properties

1
2
3
4
5
6
7
spring.application.name=trace-c
server.port=10003
eureka.client.serviceUrl.defaultZone=http://localhost:8001/eureka/
spring.zipkin.base-url=http://localhost:10000
spring.sleuth.sampler.percentage=1
# log trace detail
logging.level.org.springframework.web.servlet.DispatcherServlet=DEBUG

全部把服务启动起来,然后我们去注册中心看一下启动的服务情况。

http://localhost:8001/eureka/

http://static.cyblogs.com/WX20200110-120329@2x.png

我们从trace-a服务触发一个服务:

http://static.cyblogs.com/WX20200110-120512@2x.png

查阅一下日志

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-a 日志

http://static.cyblogs.com/WX20200110-120630.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2020-01-10 12:04:51.226 DEBUG [trace-a,c8df468b969a0917,c8df468b969a0917,false] 2626 --- [io-10001-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/trace-a", parameters={}
2020-01-10 12:04:51.240 INFO [trace-a,c8df468b969a0917,c8df468b969a0917,false] 2626 --- [io-10001-exec-1] com.allei.TraceAApplication : call trace-a----->
2020-01-10 12:04:52.528 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-b.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:52.558 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.n.u.concurrent.ShutdownEnabledTimer : Shutdown hook installed for: NFLoadBalancer-PingTimer-trace-b
2020-01-10 12:04:52.558 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.netflix.loadbalancer.BaseLoadBalancer : Client: trace-b instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=trace-b,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-01-10 12:04:52.564 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.n.l.DynamicServerListLoadBalancer : Using serverListUpdater PollingServerListUpdater
2020-01-10 12:04:52.588 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-b.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:52.590 INFO [trace-a,c8df468b969a0917,a4c77f10471bb209,false] 2626 --- [io-10001-exec-1] c.n.l.DynamicServerListLoadBalancer : DynamicServerListLoadBalancer for client trace-b initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=trace-b,current list of Servers=[172.18.232.69:10002],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone; Instance count:1; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;]
},Server stats: [[Server:172.18.232.69:10002; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]
]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@128e37fa
2020-01-10 12:04:53.573 INFO [trace-a,,,] 2626 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-b.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:54.397 DEBUG [trace-a,c8df468b969a0917,c8df468b969a0917,false] 2626 --- [io-10001-exec-1] o.s.web.servlet.DispatcherServlet : Completed 200 OK
2020-01-10 12:04:54.731 DEBUG [trace-a,4eb7430157ffd40f,4eb7430157ffd40f,false] 2626 --- [io-10001-exec-2] o.s.web.servlet.DispatcherServlet : GET "/favicon.ico", parameters={}
2020-01-10 12:04:54.749 DEBUG [trace-a,4eb7430157ffd40f,4eb7430157ffd40f,false] 2626 --- [io-10001-exec-2] o.s.web.servlet.DispatcherServlet : Completed 200 OK
2020-01-10 12:06:16.665 INFO [trace-a,,,] 2626 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2020-01-10 12:11:16.661 INFO [trace-a,,,] 2626 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-b 日志

http://static.cyblogs.com/WX20200110-120711.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2020-01-10 12:04:52.733 DEBUG [trace-b,c8df468b969a0917,a4c77f10471bb209,false] 2862 --- [io-10002-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/trace-b", parameters={}
2020-01-10 12:04:52.752 INFO [trace-b,c8df468b969a0917,a4c77f10471bb209,false] 2862 --- [io-10002-exec-1] com.allei.TraceBApplication : call trace-b----->
2020-01-10 12:04:53.062 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-c.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:53.108 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.n.u.concurrent.ShutdownEnabledTimer : Shutdown hook installed for: NFLoadBalancer-PingTimer-trace-c
2020-01-10 12:04:53.108 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.netflix.loadbalancer.BaseLoadBalancer : Client: trace-c instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=trace-c,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-01-10 12:04:53.116 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.n.l.DynamicServerListLoadBalancer : Using serverListUpdater PollingServerListUpdater
2020-01-10 12:04:53.146 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-c.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:53.148 INFO [trace-b,c8df468b969a0917,e9651abffa9bf1d2,false] 2862 --- [io-10002-exec-1] c.n.l.DynamicServerListLoadBalancer : DynamicServerListLoadBalancer for client trace-c initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=trace-c,current list of Servers=[172.18.232.69:10003],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone; Instance count:1; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;]
},Server stats: [[Server:172.18.232.69:10003; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]
]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@4e144ec0
2020-01-10 12:04:54.124 INFO [trace-b,,,] 2862 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty : Flipping property: trace-c.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-01-10 12:04:54.374 DEBUG [trace-b,c8df468b969a0917,a4c77f10471bb209,false] 2862 --- [io-10002-exec-1] o.s.web.servlet.DispatcherServlet : Completed 200 OK
2020-01-10 12:06:57.545 INFO [trace-b,,,] 2862 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2020-01-10 12:11:57.541 INFO [trace-b,,,] 2862 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration

/Users/chenyuan/Workspaces/Github/springcloud-learn/trace-c 日志

http://static.cyblogs.com/WX20200110-120747.png

1
2
3
4
5
2020-01-10 12:04:53.306 DEBUG [trace-c,c8df468b969a0917,e9651abffa9bf1d2,false] 3211 --- [io-10003-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/trace-c", parameters={}
2020-01-10 12:04:53.320 INFO [trace-c,c8df468b969a0917,e9651abffa9bf1d2,false] 3211 --- [io-10003-exec-1] com.allei.TraceCApplication : call trace-c----->
2020-01-10 12:04:54.346 DEBUG [trace-c,c8df468b969a0917,e9651abffa9bf1d2,false] 3211 --- [io-10003-exec-1] o.s.web.servlet.DispatcherServlet : Completed 200 OK
2020-01-10 12:08:09.576 INFO [trace-c,,,] 3211 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2020-01-10 12:13:09.572 INFO [trace-c,,,] 3211 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration

参考地址

什么是mysql的主从复制?

MySQL 主从复制是指数据可以从一个MySQL数据库服务器主节点复制到一个或多个从节点。MySQL 默认采用异步复制方式,这样从节点不用一直访问主服务器来更新自己的数据,数据的更新可以在远程连接上进行,从节点可以复制主数据库中的所有数据库或者特定的数据库,或者特定的表。

Mysql复制原理

原理:

http://static.cyblogs.com/MySQL主从同步.jpg

(1)Master服务器将数据的改变记录二进制Binlog日志,当Master上的数据发生改变时,则将其改变写入二进制日志中;

(2)Slave服务器会在一定时间间隔内对Master二进制日志进行探测其是否发生改变,如果发生改变,则开始一个I/OThread请求Master二进制事件

(3)同时主节点为每个I/O线程启动一个Dump线程,用于向其发送二进制事件,并保存至从节点本地的中继日志中,从节点将启动SQL线程从中继日志中读取二进制日志,在本地重放,使得其数据和主节点的保持一致,最后I/OThreadSQLThread将进入睡眠状态,等待下一次被唤醒。

Undo log与Redo log原理分析

Undo log原理

Undo log是把所有没有COMMIT的事务回滚到事务开始前的状态,系统崩溃时,可能有些事务还没有COMMIT,在系统恢复时,这些没有COMMIT的事务就需要借助Undo log来进行回滚。

使用Undo log时要求:

1
2
1、记录修改日志时(Redo log),(T,x,v)中v为x修改前的值,这样才能借助这条日志来回滚;
2、事务提交后,必须在事务的所有修改(包括记录的修改日志)都持久化后才能写COMMIT T日志;这样才能保证,宕机恢复时,已经COMMIT的事务的所有修改都已经持久化,不需要回滚。

使用Undo log时事务执行顺序

1
2
3
4
1、记录START T 
2、记录需要修改的记录的旧值(要求持久化)
3、根据事务的需要更新数据库(要求持久化)
4、记录COMMIT T

使用Undo log进行宕机回滚

1
2
1、扫描日志,找出所有已经START,还没有COMMIT的事务。
2、针对所有未COMMIT的日志,根据Redo log来进行回滚。

如果数据库访问很多,日志量也会很大,宕机恢复时,回滚的工作量也就很大,为了加快回滚,可以通过Checkpoint机制来加速回滚。

1
2
3
从后往前,扫描Undo log
1、如果先遇到checkpoint_start, 则将checkpoint_start之后的所有未提交的事务进行回滚;
2、如果先遇到checkpoint_end, 则将前一个checkpoint_start之后所有未提交的事务进行回滚;(在checkpoint的过程中,可能有很多新的事务START或者COMMIT)。

使用Undo log,在写COMMIT日志时,要求Redo log以及事务的所有修改都必须已经持久化,这种做法通常很影响性能。

Redo log原理

Redo log是指在回放日志的时候把已经COMMIT的事务重做一遍,对于没有COMMIT的事务按照abort处理,不进行任何操作。

使用Redo log时,要求:

1
2
1、记录Redo log时,(T,x,v)中的v必须是x修改后的值,否则不能通过Redo log来恢复已经COMMIT的事务。
2、写COMMIT T日志之前,事务的修改不能进行持久化,否则恢复时,对于未COMMIT的操作,可能有数据已经修改,但重放Redo log不会对该事务做任何处理,从而不能保证事务的原子性。

使用Redo log时事务执行顺序

1
2
3
4
1、记录START T
2、记录事务需要修改记录的新值(要求持久化)
3、记录COMMIT T(要求持久化)
4、将事务相关的修改写入数据库

使用Redo log重做事务

1
2
1、扫描日志,找到所有已经COMMIT的事务;
2、对于已经COMMIT的事务,根据Redo log重做事务;

根据Checkpoint来加速恢复

1
2
3
从后往前,扫描Redo log
1,如果先遇到checkpoint_start, 则把T1~Tn以及checkpoint_start之后的所有已经COMMIT的事务进行重做;
2. 如果先遇到checkpoint_end, 则T1~Tn以及前一个checkpoint_start之后所有已经COMMIT的事务进行重做;

Undo log类似,在使用时对持久化以及事务操作顺序的要求都比较高,可以将两者结合起来使用,在恢复时,对于已经COMMIT的事务使用Redo log进行重做,对于没有COMMIT的事务,使用Undo log进行回滚。Redo/Undo log结合起来使用时,要求同时记录操作修改前和修改后的值,如(T,x,v,w),v为x修改前的值,w为x修改后的值,具体操作顺序为:

1
2
3
4
1. 记录START T
2. 记录修改日志(T,x,v,w)(要求持久化,其中v用于undo,w用于redo)
3. 更新数据库
4. 记录 COMMIT T

实战操作

上一篇已经对于Binlog设置做了一些初步的实践:http://www.cyblogs.com/mysql-binlogshe-zhi/,还是在本地利用Docker的方式启动了2个容器。

1
2
3
4
➜  ~  docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
662e8531eb70 centos:7 "/bin/bash" 2 hours ago Up 2 hours 0.0.0.0:33062->3306/tcp docker-mysql-slave
c738746e9623 centos:7 "/bin/bash" 4 hours ago Up 4 hours 0.0.0.0:33061->3306/tcp docker-mysql-master

一个是docker-mysql-master作为主节点,docker-mysql-slave作为从节点,最后实现一个主从同步的功能。

Master节点

设置slave_account账户
1
2
3
4
5
6
7
8
9
[root@c738746e9623 bin]# ./mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 2
Server version: 5.6.45-log MySQL Community Server (GPL)
mysql> grant replication slave on *.* to 'slave_account'@'%' identified by '123456';
Query OK, 0 rows affected (0.01 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.01 sec)
Master节点的my.cnf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@c738746e9623 bin]# cat /etc/my.cnf
[client]
default-character-set=utf8

[mysql]
default-character-set=utf8

[mysqld]
user=mysql
default-storage-engine=INNODB
character-set-server=utf8
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
port = 3306
socket = /tmp/mysql.sock

server-id = 1
log-bin=mysql-bin

binlog-ignore-db = mysql
binlog-ignore-db = information_schema

sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
查看master节点状态
1
2
3
4
5
6
7
mysql> show master status;
+------------------+----------+--------------+--------------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+--------------------------+-------------------+
| mysql-bin.000002 | 120 | | mysql,information_schema | |
+------------------+----------+--------------+--------------------------+-------------------+
1 row in set (0.00 sec)

Slave节点

Slave节点my.cnf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@662e8531eb70 mysql]#cat /etc/my.cnf
[client]
default-character-set=utf8

[mysql]
default-character-set=utf8

[mysqld]
user=mysql
default-storage-engine=INNODB
character-set-server=utf8
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
port = 3306
socket = /tmp/mysql.sock

server-id = 2

sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
配置与主节点同步的配置
1
2
mysql> change master to master_host='172.17.0.2',master_user='slave_account',master_password='123456',master_log_file='mysql-bin.000002',master_log_pos=120;
Query OK, 0 rows affected, 2 warnings (0.06 sec)
启动同步
1
2
mysql> start slave;
Query OK, 0 rows affected (0.01 sec)
查看一个主从同步的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
mysql> show slave status\G;
*************************** 1. row ***************************
Slave_IO_State: Waiting for master to send event
Master_Host: 172.17.0.2
Master_User: slave_account
Master_Port: 3306
Connect_Retry: 60
Master_Log_File: mysql-bin.000002
Read_Master_Log_Pos: 120
Relay_Log_File: 662e8531eb70-relay-bin.000002
Relay_Log_Pos: 283
Relay_Master_Log_File: mysql-bin.000002
Slave_IO_Running: Yes
Slave_SQL_Running: Yes
Replicate_Do_DB:
Replicate_Ignore_DB:
Replicate_Do_Table:
Replicate_Ignore_Table:
Replicate_Wild_Do_Table:
Replicate_Wild_Ignore_Table:
Last_Errno: 0
Last_Error:
Skip_Counter: 0
Exec_Master_Log_Pos: 120
Relay_Log_Space: 463
Until_Condition: None
Until_Log_File:
Until_Log_Pos: 0
Master_SSL_Allowed: No
Master_SSL_CA_File:
Master_SSL_CA_Path:
Master_SSL_Cert:
Master_SSL_Cipher:
Master_SSL_Key:
Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
Last_IO_Errno: 0
Last_IO_Error:
Last_SQL_Errno: 0
Last_SQL_Error:
Replicate_Ignore_Server_Ids:
Master_Server_Id: 1
Master_UUID: 7323857e-254b-11ea-9b62-0242ac110002
Master_Info_File: /usr/local/mysql/data/master.info
SQL_Delay: 0
SQL_Remaining_Delay: NULL
Slave_SQL_Running_State: Slave has read all relay log; waiting for the slave I/O thread to update it
Master_Retry_Count: 86400
Master_Bind:
Last_IO_Error_Timestamp:
Last_SQL_Error_Timestamp:
Master_SSL_Crl:
Master_SSL_Crlpath:
Retrieved_Gtid_Set:
Executed_Gtid_Set:
Auto_Position: 0
1 row in set (0.00 sec)

Master节点写数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
mysql> CREATE TABLE `person_01` (
-> `id` int(11) DEFAULT NULL,
-> `first_name` varchar(20) DEFAULT NULL,
-> `age` int(11) DEFAULT NULL,
-> `gender` char(1) DEFAULT NULL
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
-> ;
Query OK, 0 rows affected (0.03 sec)
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| person |
| person_01 |
+----------------+
2 rows in set (0.01 sec)

mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (1, 'Bob', 25, 'M');
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (2, 'Jane', 20, 'F');
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (3, 'Jack', 30, 'M');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (4, 'Bill', 32, 'M');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (5, 'Nick', 22, 'M');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (6, 'Kathy', 18, 'F');
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (7, 'Steve', 36, 'M');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO test.person_01 (id, first_name, age, gender) VALUES (8, 'Anne', 25, 'F');
Query OK, 1 row affected (0.00 sec)

mysql> select * from person_01;
+------+------------+------+--------+
| id | first_name | age | gender |
+------+------------+------+--------+
| 1 | Bob | 25 | M |
| 2 | Jane | 20 | F |
| 3 | Jack | 30 | M |
| 4 | Bill | 32 | M |
| 5 | Nick | 22 | M |
| 6 | Kathy | 18 | F |
| 7 | Steve | 36 | M |
| 8 | Anne | 25 | F |
+------+------------+------+--------+
8 rows in set (0.01 sec)
mysql> exit
Bye
[root@c738746e9623 bin]# 主节点

Slave节点查数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| person_01 |
+----------------+
1 row in set (0.00 sec)

mysql> select * from person_01;
+------+------------+------+--------+
| id | first_name | age | gender |
+------+------------+------+--------+
| 1 | Bob | 25 | M |
| 2 | Jane | 20 | F |
| 3 | Jack | 30 | M |
| 4 | Bill | 32 | M |
| 5 | Nick | 22 | M |
| 6 | Kathy | 18 | F |
| 7 | Steve | 36 | M |
| 8 | Anne | 25 | F |
+------+------------+------+--------+
8 rows in set (0.00 sec)
mysql> exit
Bye
[root@662e8531eb70 mysql]# 从节点

这样子就做好了最简单的主从同步。主从同步只是最基础的高可用架构。

参考地址

0%