简栈文化

Java技术人的成长之路~

背景

这几天确实太忙了,之前是日更,说上班后来个隔日更,还是坚持不了。完成Q1季度的考评后发现群里有人问了一个问题,非常的有意思。当时我也是非常的懵逼,然后想自己尝试的去解决一下。

问题是:0xee 0xb9short得到 -4423 为什么?对啊,为什么?我TM也想知道。

http://static.cyblogs.com/QQ20200411-001905@2x.jpg

其实到这里,我先总结一下:

  • 计算机为什么能计算你这么快,原因是它只会处理0与1,二级制;
  • 其实到硬件上面表现的就是电流信号、脉冲信号;
  • 计算机虽然支持减法,乘法,除法,实际上底层只会一种算法,那就是加法;

这也就是当时大学老师跟我们说的,别以为计算机很厉害,其实它很傻逼。每一步操作都需要设定好逻辑与程序。

恶补基础知识点

我们还是梳理一下基础知识吧~

  • 1、byte:有符号(意思是有正和负),在网络传输中都是会用到byte的,它占1个字节,共8位,比如说11111111就可以用1个byte表示,转化为10进制:- (2的6次+2的5次+2的4次+2的3次+2的2次+2的1次+2的0次) = -127。其中前7位表示数字,最高位表示符号,0为正,1为负。范围是 (-2的7次 ~ 2的7次 - 1),那为什么前面最小是-127,范围最小又是-128呢?因为规定-0(10000000)为-128。

  • 2、short:有符号,占2个字节,共16位。同byte一样,它的取值范围就是 (-2的15次 ~ 2的15次 - 1)。

  • 3、int :有符号,占4个字节,共32位。它的取值范围就是(-2的31次 ~ 2的31次)。

  • 4、long:有符号,占8个字节,共64位,它的取值范围就是(-2的63次 ~ 2的63次)。

  • 5、^:表示异或位运算,两者相同则为0,两者不同则为1。比如说15^2,15用二进制表示就是1111,2用2进制表示就是0010,两者进行异或运算,结果就是1101,转换为十进制就是13。

  • 6、|:表示或运算,两者只有有一个为1就为1, 比如说13|2,13用二进制表示就是1101,2用二进制表示就是0010,两者进行或运算,那么结果就是1111,转换为十进制就是15。

  • 7、&:表示与运算,两者都为1就为1,其余都为0,比如说15&2, 13用二进制表示就是1111,2用二进制表示就是0010, 两者进行与运算,那么结果就是0010,转换为十进制就是2。

  • 8、~:取反,就是本来是0变成1,本来是1变成0。

计算机中存储是用补码!!,同时注意一下计算省略了高位不变部分

我举个例子,比如说-15|3等于多少呢?有些人会觉得-15转化为二进制就是10001111,而3转化为二进制就是00000011,那么结果应该是10001111呀,转换为十进制就是-15呀?大家可以自己写个demo就会发现是不对的。要注意在计算机中所有的都是用补码的形式存储的,之所以上面介绍两个正数是对的,因为正数的反码和补码都是一样的。而负数求补码应该是出去符号位取反+1,我们再来看看这个题-15|3,其中-15的原码为10001111,反码为11110000,那么补码就是11110001,然后3的补码为00000011,两者进行或操作就是11110011,你以为结束了么?还没有,再又要求它的原码了,原码就是补码再求补码再+1(是不是已经晕掉了?),也就是10001101,结果就是-13。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
求 -15|3

[-15]原码 = 10001111
[-15]反码 = 11110000 //原码求反码符号位不变
[-15]补码 = 11110001 //反码+1等于补码

[3]原码 = 00000011
[3]反码 = 00000011 //正数都一致
[3]补码 = 00000011 //正数都一致

-15|3 = 11110011 //两个补码进行或操作

[结果]补码 = 11110011 //上面求得的值
[结果]反码 = 10001100 //符号位不变
[结果]原码 = 10001101 //反码+1

100001101 转化为十进制就是-13

开始解他那道题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@sophist 其实这个是基础知识,我们在计算机的存储里面short16位,最大数据存储量是65536,数据范围是-32768~32767之间。

0xee 0xb9 → -4423

16进制对应二进制:11101110 10111001

high << 8 | low & 255 低位保持不变

11101110 00000000

00000000 10111001
------------------
1110111010111001 第一位代表符号位,这个数字肯定是负数

取不骂 + 1
1001000101000110 + 1 = 1001000101000111 = -4423

这样子结果就得到了:-4423。

背景

在SpringBoot环境中,我们有“使用不完的”注解。这也是SpringBoot替代了传统的Spring项目中的xml配置的原因。在使用这些annotation的时候,我们一定要了解这些注解背后的原理以及约定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.springframework.boot.context.properties;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ConfigurationProperties {
......
}

支持的类型

List
1
2
3
custom.config.config1.folders[0]=/root
custom.config.config1.folders[1]=/home/user1
custom.config.config1.folders[2]=/home/user2

对应的Java实现

1
2
3
4
5
@ConfigurationProperties(prefix = "custom.config.config1")
public class Config1Properties{
private List<String> folders;
...
}
Map
1
2
3
4
5
custom.config.config1.map.key1=value1
custom.config.config1.map.key2=value2
custom.config.config1.map.key3=value3
custom.config.config1.map.key4=value4
custom.config.config1.map.key5=value5

对应的Java实现

1
2
3
4
5
@ConfigurationProperties(prefix = "custom.config.config1")
public class Config1Properties{
private Map<String, String> map;
...
}
Object
1
2
3
4
custom.config.config1.server.host=host1
custom.config.config1.server.port=22
custom.config.config1.server.username=username1
custom.config.config1.server.password=password1

对应的Java实现

1
2
3
4
5
6
7
8
9
10
11
12
@ConfigurationProperties(prefix = "custom.config.config1")
public class Config1Properties{
private ServerProperties server;
...
public static class ServerProperties {
private String host;
private int port;
private String username;
private String password;
...
}
}
Object List
1
2
3
4
5
6
7
8
custom.config.config1.servers[0].host=host1
custom.config.config1.servers[0].port=22
custom.config.config1.servers[0].username=username1
custom.config.config1.servers[0].password=password1
custom.config.config1.servers[1].host=host2
custom.config.config1.servers[1].port=22
custom.config.config1.servers[1].username=username2
custom.config.config1.servers[1].password=password2

对应的Java实现

1
2
3
4
5
6
7
8
9
10
11
12
@ConfigurationProperties(prefix = "custom.config.config1")
public class Config1Properties{
private List<ServerProperties> servers;
...
public static class ServerProperties {
private String host;
private int port;
private String username;
private String password;
...
}
}

Map的使用案例

比如,我们同时需要连接多个OSS(阿里对象存储),那我们就可以利用ConfigurationProperties的方式来配置多个。而且可以通过Spring的加载动态的注入到容器中去。

配置中心的配置:

1
2
3
4
5
6
7
8
9
10
11
12
# OSS1配置
oss.multi.clients.accout.accessKeyId=xxx
oss.multi.clients.accout.accessKeySecret=xxx
oss.multi.clients.accout.privateEndpoint=xxx
oss.multi.clients.accout.bucketName=bucket-b-test

# OSS2配置
oss.multi.enabled=true
oss.multi.clients.xdtrans.accessKeyId=xxx
oss.multi.clients.xdtrans.accessKeySecret=xxx
oss.multi.clients.xdtrans.privateEndpoint=xxx
oss.multi.clients.xdtrans.bucketName=bucket-a-test

对应的Java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
@EqualsAndHashCode(callSuper = false)
@ConfigurationProperties(prefix = OssConstants.MULTI_CONFIG_PREFIX)
public class MultiOssProperties {
private Map<String, OssProperties> clients;

@Data
public static class OssProperties {
private String accessKeyId;
private String accessKeySecret;
private String publicEndpoint;
private String privateEndpoint;
private String bucketName;
private String object;
}

动态的定义我们需要的BeanDefinition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MultiOssScannerConfigurer implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware, BeanNameAware {

private ApplicationContext applicationContext;

@Setter
private MultiOssProperties multiOssProperties;

@Override
public void setBeanName(String name) {
log.info("init bean {}", name);
}

@Override
public void afterPropertiesSet() throws Exception {
Objects.requireNonNull(this.multiOssProperties, "multiOssProperties不能为空");
Objects.requireNonNull(this.applicationContext, "applicationContext不能为空");
}

// 动态的定义Bean
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
String beanSuffixName = StringUtils.capitalize(OssConstants.BEAN_SUFFIX_NAME);
// productCodes实际与oss.multi.clients.xdtrans的xdtrans保持一致
multiOssProperties.getClients().forEach((productCode, ossProperties) -> {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(OssClient.class,
() -> OssClientUtils.buildOssClient(ossProperties))
.getRawBeanDefinition();
beanDefinition.setInitMethodName("init");
beanDefinition.setDestroyMethodName("shutDown");
beanDefinitionRegistry.registerBeanDefinition(productCode + beanSuffixName, beanDefinition);
});
}

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

}

通过binder来让配置与对应的Java代码产生关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableConfigurationProperties(MultiOssProperties.class)
@ConditionalOnProperty(prefix = OssConstants.MULTI_CONFIG_PREFIX, value = "enabled")
public class MultiOssAutoConfiguration {

/**
* 初始化多个 ossClient 自动配置
*
* @param environment 环境变量属性
* @return OssClient 自动扫描注册器
*/
@Bean
public MultiOssScannerConfigurer multiOssScannerConfigurer(Environment environment) {
Binder binder = Binder.get(environment);
MultiOssProperties properties = binder.bind(OssConstants.MULTI_CONFIG_PREFIX, MultiOssProperties.class).get();
MultiOssScannerConfigurer multiOssScannerConfigurer = new MultiOssScannerConfigurer();
multiOssScannerConfigurer.setMultiOssProperties(properties);
return multiOssScannerConfigurer;
}
}

如何使用?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Getter
@AllArgsConstructor
public enum OssTypeEnum {
// 注意一下这里的beanName,要跟上面的postProcessBeanDefinitionRegistry保持一致
XDtransOssClient("xdtransOssClient", "oss1"),
DianDianOssClient("ddacctOssClient", "oss2"),
;

private final String beanName;
private final String desc;

// 根据BeanName来Spring容器中获取即可
public OssClient getBean() {
return SpringContextHolder.getBean(beanName, OssClient.class);
}

Binder是如何映射的?

通过上面的代码binder.bind(OssConstants.MULTI_CONFIG_PREFIX, MultiOssProperties.class).get();来进行bind。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final <T> T bind(ConfigurationPropertyName name, Bindable<T> target, BindHandler handler, Context context, boolean allowRecursiveBinding) {
context.clearConfigurationProperty();
try {
target = handler.onStart(name, target, context);
if (target == null) {
return null;
}
Object bound = bindObject(name, target, handler, context,allowRecursiveBinding);
return handleBindResult(name, target, handler, context, bound);
} catch (Exception ex) {
return handleBindError(name, target, handler, context, ex);
}
}

如果我们的key是:oss.multi.clients.accout.xxx

实际上对应的是Map,那么它的引用名字就是clients。具体的key就是accout,那么对应的value就是OssProperties。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Object bindBean(ConfigurationPropertyName name, Bindable<?> target,
BindHandler handler, Context context, boolean allowRecursiveBinding) {
if (containsNoDescendantOf(context.getSources(), name)
|| isUnbindableBean(name, target, context)) {
return null;
}
BeanPropertyBinder propertyBinder = (propertyName, propertyTarget) -> bind(
name.append(propertyName), propertyTarget, handler, context, false);
Class<?> type = target.getType().resolve(Object.class);
if (!allowRecursiveBinding && context.hasBoundBean(type)) {
return null;
}
return context.withBean(type, () -> {
Stream<?> boundBeans = BEAN_BINDERS.stream()
.map((b) -> b.bind(name, target, context, propertyBinder));
return boundBeans.filter(Objects::nonNull).findFirst().orElse(null);
});
}

http://static.cyblogs.com/QQ20200422-222025@2x.jpg

具体的一个bind情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static final List<BeanBinder> BEAN_BINDERS;

static {
List<BeanBinder> binders = new ArrayList<>();
binders.add(new JavaBeanBinder());
BEAN_BINDERS = Collections.unmodifiableList(binders);
}

public <T> T bind(ConfigurationPropertyName name, Bindable<T> target, Context context,
BeanPropertyBinder propertyBinder) {
boolean hasKnownBindableProperties = hasKnownBindableProperties(name, context);
Bean<T> bean = Bean.get(target, hasKnownBindableProperties);
if (bean == null) {
return null;
}
BeanSupplier<T> beanSupplier = bean.getSupplier(target);
boolean bound = bind(propertyBinder, bean, beanSupplier);
return (bound ? beanSupplier.get() : null);
}
// 返回对应的对象
public BeanSupplier<T> getSupplier(Bindable<T> target) {
return new BeanSupplier<>(() -> {
T instance = null;
if (target.getValue() != null) {
instance = target.getValue().get();
}
if (instance == null) {
instance = (T) BeanUtils.instantiateClass(this.resolvedType);
}
return instance;
});
}

参考地址

概述

UDP不属于连接协议,具有资源消耗少,处理速度快的优点,所以通常音频,视频和普通数据在传送时,使用UDP较多,因为即使丢失少量的包,也不会对接受结果产生较大的影响。

传输层无法保证数据的可靠传输,只能通过应用层来实现了。实现的方式可以参照tcp可靠性传输的方式,只是实现不在传输层,实现转移到了应用层。

最简单的方式是在应用层模仿传输层TCP的可靠性传输。下面不考虑拥塞处理,可靠UDP的简单设计。

  • 1、添加seq/ack机制,确保数据发送到对端
  • 2、添加发送和接收缓冲区,主要是用户超时重传。
  • 3、添加超时重传机制。

详细说明:送端发送数据时,生成一个随机seq=x,然后每一片按照数据大小分配seq。数据到达接收端后接收端放入缓存,并发送一个ack=x的包,表示对方已经收到了数据。发送端收到了ack包后,删除缓冲区对应的数据。时间到后,定时任务检查是否需要重传数据。

目前有如下开源程序利用udp实现了可靠的数据传输。分别为 RUDP、RTP、UDT

开源程序

1、RUDP(Reliable User Datagram Protocol)

RUDP 提供一组数据服务质量增强机制,如拥塞控制的改进、重发机制及淡化服务器算法等 ,从而在包丢失和网络拥塞的情况下, RTP 客户机(实时位置)面前呈现的就是一个高质量的 RTP 流。在不干扰协议的实时特性的同时,可靠 UDP 的拥塞控制机制允许 TCP 方式下的流控制行为。

2、RTP(Real Time Protocol)

RTP为数据提供了具有实时特征的端对端传送服务,如在组播或单播网络服务下的交互式视频音频或模拟数据。

应用程序通常在 UDP 上运行 RTP 以便使用其多路结点和校验服务;这两种协议都提供了传输层协议的功能。但是 RTP 可以与其它适合的底层网络或传输协议一起使用。如果底层网络提供组播方式,那么 RTP 可以使用该组播表传输数据到多个目的地。

RTP 本身并没有提供按时发送机制或其它服务质量(QoS)保证,它依赖于底层服务去实现这一过程。 RTP 并不保证传送或防止无序传送,也不确定底层网络的可靠性。 RTP 实行有序传送, RTP 中的序列号允许接收方重组发送方的包序列,同时序列号也能用于决定适当的包位置,例如:在视频解码中,就不需要顺序解码。

3、UDT(UDP-based Data Transfer Protocol)

基于UDP的数据传输协议(UDP-basedData Transfer Protocol,简称UDT)是一种互联网数据传输协议。*UDT的主要目的是支持高速广域网上的海量数据传输*,而互联网上的标准数据传输协议TCP在高带宽长距离网络上性能很差。

顾名思义,UDT建于UDP之上,并引入新的拥塞控制和数据可靠性控制机制。UDT是面向连接的双向的应用层协议。它同时支持可靠的数据流传输和部分可靠的数据报传输。由于UDT完全在UDP上实现,它也可以应用在除了高速数据传输之外的其它应用领域,例如点到点技术(P2P),防火墙穿透,多媒体数据传输等等。

参考地址

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生粘包拆包问题,因此粘包拆包问题只发生在TCP协议中。

什么是粘包、拆包?

假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

**第一种情况:**接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。

img

**第二种情况:**接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

img

**第三种情况:**这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

img

img

为什么会发生TCP粘包、拆包?

发生TCP粘包、拆包主要是由于下面一些原因:

  • 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。

  • 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。

  • 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。

  • 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

粘包、拆包解决办法

TCP本身是面向流的,作为网络服务器,如何从这源源不断涌来的数据流中拆分出或者合并出有意义的信息呢?通常会有以下一些常用的方法:

  • 1、发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。

  • 2、发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

  • 3、可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。

参考地址

在这之前我只记住了StringBuilder不是线程安全的,StringBuffer是线程安全的这个结论,至于StringBuilder为什么不安全从来没有去想过。

分析

在分析设个问题之前我们要知道StringBuilder和StringBuffer的内部实现跟String类一样,都是通过一个char数组存储字符串的,不同的是String类里面的char数组是final修饰的,是不可变的,而StringBuilder和StringBuffer的char数组是可变的。

首先通过一段代码去看一下多线程操作StringBuilder对象会出现什么问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class StringBuilderDemo {

public static void main(String[] args) throws InterruptedException {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 10; i++){
new Thread(new Runnable( {
@Override
public void run() {
for (int j = 0; j < 1000; j++){
stringBuilder.append("a");
}
}
}).start();
}

Thread.sleep(100);
System.out.println(stringBuilder.length());
}

}

我们能看到这段代码创建了10个线程,每个线程循环1000次往StringBuilder对象里面append字符。正常情况下代码应该输出10000,但是实际运行会输出什么呢?

http://static.cyblogs.com/16cc76d348d29915.png

我们看到输出了“9326”,小于预期的10000,并且还抛出了一个ArrayIndexOutOfBoundsException异常(异常不是必现)。

1、为什么输出值跟预期值不一样

我们先看一下StringBuilder的两个成员变量(这两个成员变量实际上是定义在AbstractStringBuilder里面的,StringBuilder和StringBuffer都继承了AbstractStringBuilder)

1
2
3
4
//存储字符串的具体内容
char[] value;
//已经使用的字符数组的数量
int count;

再看StringBuilder的append()方法:

1
2
3
4
5
@Override
public StringBuilder append(String str) {
super.append(str);
return this;
}

StringBuilder的append()方法调用的父类AbstractStringBuilder的append()方法

1
2
3
4
5
6
7
8
9
public AbstractStringBuilder append(String str) {
if (str == null)
return appendNull();
int len = str.length();
ensureCapacityInternal(count + len);
str.getChars(0, len, value, count);
count += len;
return this;
}

我们先不管代码的第五行和第六行干了什么,直接看第七行,count += len不是一个原子操作。假设这个时候count值为10,len值为1,两个线程同时执行到了第七行,拿到的count值都是10,执行完加法运算后将结果赋值给count,所以两个线程执行完后count值为11,而不是12。这就是为什么测试代码输出的值要比10000小的原因。

2、为什么会抛出ArrayIndexOutOfBoundsException异常。

我们看回AbstractStringBuilder的append()方法源码的第五行,ensureCapacityInternal()方法是检查StringBuilder对象的原char数组的容量能不能盛下新的字符串,如果盛不下就调用expandCapacity()方法对char数组进行扩容。

1
2
3
4
5
private void ensureCapacityInternal(int minimumCapacity) {
// overflow-conscious code
if (minimumCapacity - value.length > 0)
expandCapacity(minimumCapacity);
}

扩容的逻辑就是new一个新的char数组,新的char数组的容量是原来char数组的两倍再加2,再通过System.arryCopy()函数将原数组的内容复制到新数组,最后将指针指向新的char数组。

1
2
3
4
5
6
7
void expandCapacity(int minimumCapacity) {
//计算新的容量
int newCapacity = value.length * 2 + 2;
//中间省略了一些检查逻辑
...
value = Arrays.copyOf(value, newCapacity);
}

Arrys.copyOf()方法

1
2
3
4
5
6
7
public static char[] copyOf(char[] original, int newLength) {
char[] copy = new char[newLength];
//拷贝数组
System.arraycopy(original, 0, copy, 0,
Math.min(original.length, newLength));
return copy;
}

AbstractStringBuilder的append()方法源码的第六行,是将String对象里面char数组里面的内容拷贝到StringBuilder对象的char数组里面,代码如下:

1
str.getChars(0, len, value, count);

getChars()方法

1
2
3
4
5
public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin) {
//中间省略了一些检查
...
System.arraycopy(value, srcBegin, dst, dstBegin, srcEnd - srcBegin);
}

拷贝流程见下图

http://static.cyblogs.com/16cc76dc56ffc2f2.png

假设现在有两个线程同时执行了StringBuilder的append()方法,两个线程都执行完了第五行的ensureCapacityInternal()方法,此刻count=5。

http://static.cyblogs.com/16cc76df391e9f4f.png

这个时候线程1的cpu时间片用完了,线程2继续执行。线程2执行完整个append()方法后count变成6了

http://static.cyblogs.com/16cc76e4b1c9c16b.png

线程1继续执行第六行的str.getChars()方法的时候拿到的count值就是6了,执行char数组拷贝的时候就会抛出ArrayIndexOutOfBoundsException异常。

至此,StringBuilder为什么不安全已经分析完了。如果我们将测试代码的StringBuilder对象换成StringBuffer对象会输出什么呢?

http://static.cyblogs.com/16cc76e852c2a3cb.png

当然是输出10000啦!

那么StringBuffer用什么手段保证线程安全的?这个问题你点进StringBuffer的append()方法里面就知道了。

1
2
3
4
5
6
@Override
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}

参考地址

概述

本方法(invokeBeanFactoryPostProcessors)会实例化和调用所有 BeanFactoryPostProcessor(包括其子类BeanDefinitionRegistryPostProcessor)。

BeanFactoryPostProcessor 接口是 Spring 初始化 BeanFactory 时对外暴露的扩展点,Spring IoC 容器允许 BeanFactoryPostProcessor 在容器实例化任何 bean 之前读取 bean 的定义,并可以修改它。

BeanDefinitionRegistryPostProcessor 继承自 BeanFactoryPostProcessor,比 BeanFactoryPostProcessor 具有更高的优先级,主要用来在常规的 BeanFactoryPostProcessor 检测开始之前注册其他 bean 定义。特别是,你可以通过 BeanDefinitionRegistryPostProcessor 来注册一些常规的 BeanFactoryPostProcessor,因为此时所有常规的 BeanFactoryPostProcessor 都还没开始被处理。

项目中的实战

BeanDefinitionRegistryPostProcessor初始化Bean的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 想根据配置文件来动态的生成我们的Bean对象
public class MultiOssScannerConfigurer implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware, BeanNameAware {
@Setter
private MultiOssProperties multiOssProperties;

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
String beanSuffixName = StringUtils.capitalize(OssConstants.BEAN_SUFFIX_NAME);
multiOssProperties.getClients().forEach((productCode, ossProperties) -> {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(OssClient.class,
() -> OssClientUtils.buildOssClient(ossProperties))
.getRawBeanDefinition();
beanDefinition.setInitMethodName("init");
beanDefinition.setDestroyMethodName("shutDown");
beanDefinitionRegistry.registerBeanDefinition(productCode + beanSuffixName, beanDefinition);
});
}
}

BeanFactoryPostProcessor初始化的时候do something

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class KeplerBeanFactoryPostInitializer implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
Map<String, AviatorFunction> aviatorFunctionMap = beanFactory.getBeansOfType(AviatorFunction.class);
if (aviatorFunctionMap.size() > 0) {
log.info("初始化自定义RuleLoader...");
aviatorFunctionMap.forEach((k, v) -> {
log.info("加载Rule:{}", k);
AviatorEvaluator.addFunction(v);
});
}
RuleLoader.initRule();
}
}

跟进源代码

1
2
3
4
5
6
7
8
9
10
protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());

// Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
// (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
}
}
getBeanFactoryPostProcessors
1
2
3
4
5
6
7
/**
* Return the list of BeanFactoryPostProcessors that will get applied
* to the internal BeanFactory.
*/
public List<BeanFactoryPostProcessor> getBeanFactoryPostProcessors() {
return this.beanFactoryPostProcessors;
}

这边 getBeanFactoryPostProcessors() 会拿到当前应用上下文中已经注册的 BeanFactoryPostProcessor,在默认情况下,this.beanFactoryPostProcessors 是返回空的。

1
2
3
4
5
6
7
8
public interface ApplicationContextInitializer<C extends ConfigurableApplicationContext> {

/**
* Initialize the given application context.
* @param applicationContext the application to configure
*/
void initialize(C applicationContext);
}
invokeBeanFactoryPostProcessors
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
public static void invokeBeanFactoryPostProcessors(
ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {

// Invoke BeanDefinitionRegistryPostProcessors first, if any.
Set<String> processedBeans = new HashSet<String>();

// 1.判断beanFactory是否为BeanDefinitionRegistry,beanFactory为DefaultListableBeanFactory,
// 而DefaultListableBeanFactory实现了BeanDefinitionRegistry接口,因此这边为true
if (beanFactory instanceof BeanDefinitionRegistry) {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
// 用于存放普通的BeanFactoryPostProcessor
List<BeanFactoryPostProcessor> regularPostProcessors = new LinkedList<BeanFactoryPostProcessor>();
// 用于存放BeanDefinitionRegistryPostProcessor
List<BeanDefinitionRegistryPostProcessor> registryProcessors = new LinkedList<BeanDefinitionRegistryPostProcessor>();

// 2.首先处理入参中的beanFactoryPostProcessors
// 遍历所有的beanFactoryPostProcessors, 将BeanDefinitionRegistryPostProcessor和普通BeanFactoryPostProcessor区分开
for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
// 2.1 如果是BeanDefinitionRegistryPostProcessor
BeanDefinitionRegistryPostProcessor registryProcessor =
(BeanDefinitionRegistryPostProcessor) postProcessor;
// 2.1.1 直接执行BeanDefinitionRegistryPostProcessor接口的postProcessBeanDefinitionRegistry方法
registryProcessor.postProcessBeanDefinitionRegistry(registry);
// 2.1.2 添加到registryProcessors(用于最后执行postProcessBeanFactory方法)
registryProcessors.add(registryProcessor);
} else {
// 2.2 否则,只是普通的BeanFactoryPostProcessor
// 2.2.1 添加到regularPostProcessors(用于最后执行postProcessBeanFactory方法)
regularPostProcessors.add(postProcessor);
}
}

// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the bean factory post-processors apply to them!
// Separate between BeanDefinitionRegistryPostProcessors that implement
// PriorityOrdered, Ordered, and the rest.
// 用于保存本次要执行的BeanDefinitionRegistryPostProcessor
List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<BeanDefinitionRegistryPostProcessor>();

// First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
// 3.调用所有实现PriorityOrdered接口的BeanDefinitionRegistryPostProcessor实现类
// 3.1 找出所有实现BeanDefinitionRegistryPostProcessor接口的Bean的beanName
String[] postProcessorNames =
beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
// 3.2 遍历postProcessorNames
for (String ppName : postProcessorNames) {
// 3.3 校验是否实现了PriorityOrdered接口
if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
// 3.4 获取ppName对应的bean实例, 添加到currentRegistryProcessors中,
// beanFactory.getBean: 这边getBean方法会触发创建ppName对应的bean对象, 目前暂不深入解析
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
// 3.5 将要被执行的加入processedBeans,避免后续重复执行
processedBeans.add(ppName);
}
}
// 3.6 进行排序(根据是否实现PriorityOrdered、Ordered接口和order值来排序)
sortPostProcessors(currentRegistryProcessors, beanFactory);
// 3.7 添加到registryProcessors(用于最后执行postProcessBeanFactory方法)
registryProcessors.addAll(currentRegistryProcessors);
// 3.8 遍历currentRegistryProcessors, 执行postProcessBeanDefinitionRegistry方法
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
// 3.9 执行完毕后, 清空currentRegistryProcessors
currentRegistryProcessors.clear();

// Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
// 4.调用所有实现了Ordered接口的BeanDefinitionRegistryPostProcessor实现类(过程跟上面的步骤3基本一样)
// 4.1 找出所有实现BeanDefinitionRegistryPostProcessor接口的类, 这边重复查找是因为执行完上面的BeanDefinitionRegistryPostProcessor,
// 可能会新增了其他的BeanDefinitionRegistryPostProcessor, 因此需要重新查找
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
// 校验是否实现了Ordered接口,并且还未执行过
if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
}
}
sortPostProcessors(currentRegistryProcessors, beanFactory);
registryProcessors.addAll(currentRegistryProcessors);
// 4.2 遍历currentRegistryProcessors, 执行postProcessBeanDefinitionRegistry方法
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();

// Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
// 5.最后, 调用所有剩下的BeanDefinitionRegistryPostProcessors
boolean reiterate = true;
while (reiterate) {
reiterate = false;
// 5.1 找出所有实现BeanDefinitionRegistryPostProcessor接口的类
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
// 5.2 跳过已经执行过的
if (!processedBeans.contains(ppName)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
// 5.3 如果有BeanDefinitionRegistryPostProcessor被执行, 则有可能会产生新的BeanDefinitionRegistryPostProcessor,
// 因此这边将reiterate赋值为true, 代表需要再循环查找一次
reiterate = true;
}
}
sortPostProcessors(currentRegistryProcessors, beanFactory);
registryProcessors.addAll(currentRegistryProcessors);
// 5.4 遍历currentRegistryProcessors, 执行postProcessBeanDefinitionRegistry方法
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();
}

// Now, invoke the postProcessBeanFactory callback of all processors handled so far.
// 6.调用所有BeanDefinitionRegistryPostProcessor的postProcessBeanFactory方法(BeanDefinitionRegistryPostProcessor继承自BeanFactoryPostProcessor)
invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
// 7.最后, 调用入参beanFactoryPostProcessors中的普通BeanFactoryPostProcessor的postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
} else {
// Invoke factory processors registered with the context instance.
invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
}

// 到这里 , 入参beanFactoryPostProcessors和容器中的所有BeanDefinitionRegistryPostProcessor已经全部处理完毕,
// 下面开始处理容器中的所有BeanFactoryPostProcessor

// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the bean factory post-processors apply to them!
// 8.找出所有实现BeanFactoryPostProcessor接口的类
String[] postProcessorNames =
beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);

// Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
// Ordered, and the rest.
// 用于存放实现了PriorityOrdered接口的BeanFactoryPostProcessor
List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<BeanFactoryPostProcessor>();
// 用于存放实现了Ordered接口的BeanFactoryPostProcessor的beanName
List<String> orderedPostProcessorNames = new ArrayList<String>();
// 用于存放普通BeanFactoryPostProcessor的beanName
List<String> nonOrderedPostProcessorNames = new ArrayList<String>();
// 8.1 遍历postProcessorNames, 将BeanFactoryPostProcessor按实现PriorityOrdered、实现Ordered接口、普通三种区分开
for (String ppName : postProcessorNames) {
// 8.2 跳过已经执行过的
if (processedBeans.contains(ppName)) {
// skip - already processed in first phase above
} else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
// 8.3 添加实现了PriorityOrdered接口的BeanFactoryPostProcessor
priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
} else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
// 8.4 添加实现了Ordered接口的BeanFactoryPostProcessor的beanName
orderedPostProcessorNames.add(ppName);
} else {
// 8.5 添加剩下的普通BeanFactoryPostProcessor的beanName
nonOrderedPostProcessorNames.add(ppName);
}
}

// First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
// 9.调用所有实现PriorityOrdered接口的BeanFactoryPostProcessor
// 9.1 对priorityOrderedPostProcessors排序
sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
// 9.2 遍历priorityOrderedPostProcessors, 执行postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);

// Next, invoke the BeanFactoryPostProcessors that implement Ordered.
// 10.调用所有实现Ordered接口的BeanFactoryPostProcessor
List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<BeanFactoryPostProcessor>();
for (String postProcessorName : orderedPostProcessorNames) {
// 10.1 获取postProcessorName对应的bean实例, 添加到orderedPostProcessors, 准备执行
orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
}
// 10.2 对orderedPostProcessors排序
sortPostProcessors(orderedPostProcessors, beanFactory);
// 10.3 遍历orderedPostProcessors, 执行postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);

// Finally, invoke all other BeanFactoryPostProcessors.
// 11.调用所有剩下的BeanFactoryPostProcessor
List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<BeanFactoryPostProcessor>();
for (String postProcessorName : nonOrderedPostProcessorNames) {
// 11.1 获取postProcessorName对应的bean实例, 添加到nonOrderedPostProcessors, 准备执行
nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
}
// 11.2 遍历nonOrderedPostProcessors, 执行postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);

// Clear cached merged bean definitions since the post-processors might have
// modified the original metadata, e.g. replacing placeholders in values...
// 12.清除元数据缓存(mergedBeanDefinitions、allBeanNamesByType、singletonBeanNamesByType),
// 因为后处理器可能已经修改了原始元数据,例如, 替换值中的占位符...
beanFactory.clearMetadataCache();
}
sortPostProcessors
1
2
3
4
5
6
7
8
9
10
11
12
13
private static void sortPostProcessors(List<?> postProcessors, ConfigurableListableBeanFactory beanFactory) {
Comparator<Object> comparatorToUse = null;
if (beanFactory instanceof DefaultListableBeanFactory) {
// 1.获取设置的比较器
comparatorToUse = ((DefaultListableBeanFactory) beanFactory).getDependencyComparator();
}
if (comparatorToUse == null) {
// 2.如果没有设置比较器, 则使用默认的OrderComparator
comparatorToUse = OrderComparator.INSTANCE;
}
// 3.使用比较器对postProcessors进行排序
Collections.sort(postProcessors, comparatorToUse);
}

默认情况下,比较器为 OrderComparator;如果配置了 annotation-config,并且值为true,使用的是 AnnotationAwareOrderComparatorAnnotationAwareOrderComparator 继承自 OrderComparator,只是重写了部分方法,比较器的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public int compare(Object o1, Object o2) {
return doCompare(o1, o2, null);
}

private int doCompare(Object o1, Object o2, OrderSourceProvider sourceProvider) {
// 判断o1是否实现了PriorityOrdered接口
boolean p1 = (o1 instanceof PriorityOrdered);
// 判断o2是否实现了PriorityOrdered接口
boolean p2 = (o2 instanceof PriorityOrdered);
// 1.如果o1实现了PriorityOrdered接口, 而o2没有, 则o1排前面
if (p1 && !p2) {
return -1;
}
// 2.如果o2实现了PriorityOrdered接口, 而o1没有, 则o2排前面
else if (p2 && !p1) {
return 1;
}

// 3.如果o1和o2都实现(都没实现)PriorityOrdered接口
// Direct evaluation instead of Integer.compareTo to avoid unnecessary object creation.
// 拿到o1的order值, 如果没实现Ordered接口, 值为Ordered.LOWEST_PRECEDENCE
int i1 = getOrder(o1, sourceProvider);
// 拿到o2的order值, 如果没实现Ordered接口, 值为Ordered.LOWEST_PRECEDENCE
int i2 = getOrder(o2, sourceProvider);
// 4.通过order值(order值越小, 优先级越高)排序
return (i1 < i2) ? -1 : (i1 > i2) ? 1 : 0;
}

总结

第一点:整个 invokeBeanFactoryPostProcessors 方法围绕两个接口,BeanDefinitionRegistryPostProcessorBeanFactoryPostProcessor,其中 BeanDefinitionRegistryPostProcessor 继承了 BeanFactoryPostProcessorBeanDefinitionRegistryPostProcessor 主要用来在常规 BeanFactoryPostProcessor 检测开始之前注册其他 Bean 定义,说的简单点,就是 BeanDefinitionRegistryPostProcessor 具有更高的优先级,执行顺序在 BeanFactoryPostProcessor 之前。

第二点:整个 invokeBeanFactoryPostProcessors 方法操作了 3 种 bean 对象:

  • 入参 beanFactoryPostProcessors:拿的是 AbstractApplicationContext 类的 beanFactoryPostProcessors 属性值,也就是在之前已经添加到 beanFactoryPostProcessors 中的 BeanFactoryPostProcessor
  • BeanDefinitionRegistryPostProcessor 接口实现类:实现了 BeanDefinitionRegistryPostProcessor 接口,并且注册到 Spring IoC容器中。
  • 常规 BeanFactoryPostProcessor 接口实现类:实现了 BeanFactoryPostProcessor 接口,并且注册到 Spring IoC容器中。

第三点:操作3种 bean 对象具体指的是调用它们重写的方法,调用实现方法时会遵循以下的优先级:
两个用于排序的重要接口:PriorityOrderedOrdered,其中 PriorityOrdered 继承了 Ordered,并且 PriorityOrdered 的优先级要高于 Ordered,这跟 BeanDefinitionRegistryPostProcessor 继承 BeanFactoryPostProcessor 有点类似。实现 Ordered 接口需要重写 getOrder 方法,返回一个用于排序的 order 值,order 值的范围为 Integer.MIN_VALUE ~ Integer.MAX_VALUEorder 值越小优先级越高,Integer.MIN_VALUE 拥有最高优先级,而 Integer.MAX_VALUE 则对应的拥有最低优先级。

第四点:常见的 Java EE 相关的框架或者中间件,经常使用 BeanFactoryPostProcessor 来进行扩展,例如上面的 Mybatis,因此了解 BeanFactoryPostProcessor 的原理会对之后理解其他中间件的原理有帮助。

参考地址

本文主要是分析Spring bean的循环依赖,以及Spring的解决方式。 通过这种解决方式,我们可以应用在我们实际开发项目中。

什么是循环依赖?
怎么检测循环依赖
Spring怎么解决循环依赖
Spring对于循环依赖无法解决的场景
Spring解决循环依赖的方式我们能够学到什么?

什么是循环依赖?

循环依赖其实就是循环引用,也就是两个或则两个以上的bean互相持有对方,最终形成闭环。比如A依赖于B,B依赖于C,C又依赖于A。如下图:

http://static.cyblogs.com/20170912082357749.jpg

注意,这里不是函数的循环调用,是对象的相互依赖关系。循环调用其实就是一个死循环,除非有终结条件。

Spring中循环依赖场景有:

(1)构造器的循环依赖
(2)field属性的循环依赖。

怎么检测是否存在循环依赖

检测循环依赖相对比较容易,Bean在创建的时候可以给该Bean打标,如果递归调用回来发现正在创建中的话,即说明了循环依赖了。

Spring怎么解决循环依赖

Spring的循环依赖的理论依据其实是基于Java的引用传递,当我们获取到对象的引用时,对象的field或则属性是可以延后设置的(但是构造器必须是在获取引用之前)。

Spring的单例对象的初始化主要分为三步:

(1)createBeanInstance:实例化,其实也就是调用对象的构造方法实例化对象

(2)populateBean:填充属性,这一步主要是多bean的依赖属性进行填充

(3)initializeBean:调用spring xml中的init 方法。

从上面讲述的单例bean初始化步骤我们可以知道,循环依赖主要发生在第一、第二部。也就是构造器循环依赖和field循环依赖。

那么我们要解决循环引用也应该从初始化过程着手,对于单例来说,在Spring容器整个生命周期内,有且只有一个对象,所以很容易想到这个对象应该存在Cache中,Spring为了解决单例的循环依赖问题,使用了三级缓存。

首先我们看源码,三级缓存主要指:

1
2
3
4
5
6
7
8
/** Cache of singleton objects: bean name --> bean instance */
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<String, Object>(256);

/** Cache of singleton factories: bean name --> ObjectFactory */
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<String, ObjectFactory<?>>(16);

/** Cache of early singleton objects: bean name --> bean instance */
private final Map<String, Object> earlySingletonObjects = new HashMap<String, Object>(16);

这三级缓存分别指:
singletonFactories : 单例对象工厂的cache
earlySingletonObjects :提前暴光的单例对象的Cache
singletonObjects:单例对象的cache

我们在创建bean的时候,首先想到的是从cache中获取这个单例的bean,这个缓存就是singletonObjects。主要调用方法就就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Object getSingleton(String beanName, boolean allowEarlyReference) {
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) {
synchronized (this.singletonObjects) {
singletonObject = this.earlySingletonObjects.get(beanName);
if (singletonObject == null && allowEarlyReference) {
ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName);
if (singletonFactory != null) {
singletonObject = singletonFactory.getObject();
this.earlySingletonObjects.put(beanName, singletonObject);
this.singletonFactories.remove(beanName);
}
}
}
}
return (singletonObject != NULL_OBJECT ? singletonObject : null);
}

上面的代码需要解释两个参数:

isSingletonCurrentlyInCreation()判断当前单例bean是否正在创建中,也就是没有初始化完成(比如A的构造器依赖了B对象所以得先去创建B对象, 或则在A的populateBean过程中依赖了B对象,得先去创建B对象,这时的A就是处于创建中的状态。)
allowEarlyReference 是否允许从singletonFactories中通过getObject拿到对象
分析getSingleton()的整个过程,Spring首先从一级缓存singletonObjects中获取。如果获取不到,并且对象正在创建中,就再从二级缓存earlySingletonObjects中获取。如果还是获取不到且允许singletonFactories通过getObject()获取,就从三级缓存singletonFactory.getObject()(三级缓存)获取,如果获取到了则:

1
2
3
this.earlySingletonObjects.put(beanName, singletonObject);
this.singletonFactories.remove(beanName);
从singletonFactories中移除,并放入earlySingletonObjects中。其实也就是从三级缓存移动到了二级缓存。

从singletonFactories中移除,并放入earlySingletonObjects中。其实也就是从三级缓存移动到了二级缓存。

从上面三级缓存的分析,我们可以知道,Spring解决循环依赖的诀窍就在于singletonFactories这个三级cache。这个cache的类型是ObjectFactory,定义如下:

1
2
3
public interface ObjectFactory<T> {
T getObject() throws BeansException;
}

这个接口在下面被引用

1
2
3
4
5
6
7
8
9
10
protected void addSingletonFactory(String beanName, ObjectFactory<?> singletonFactory) {
Assert.notNull(singletonFactory, "Singleton factory must not be null");
synchronized (this.singletonObjects) {
if (!this.singletonObjects.containsKey(beanName)) {
this.singletonFactories.put(beanName, singletonFactory);
this.earlySingletonObjects.remove(beanName);
this.registeredSingletons.add(beanName);
}
}
}

这里就是解决循环依赖的关键,这段代码发生在createBeanInstance之后,也就是说单例对象此时已经被创建出来(调用了构造器)。这个对象已经被生产出来了,虽然还不完美(还没有进行初始化的第二步和第三步),但是已经能被人认出来了(根据对象引用能定位到堆中的对象),所以Spring此时将这个对象提前曝光出来让大家认识,让大家使用。

这样做有什么好处呢?让我们来分析一下“A的某个field或者setter依赖了B的实例对象,同时B的某个field或者setter依赖了A的实例对象”这种循环依赖的情况。

  • A首先完成了初始化的第一步,并且将自己提前曝光到singletonFactories中,此时进行初始化的第二步,发现自己依赖对象B,此时就尝试去get(B),发现B还没有被create,所以走create流程,B在初始化第一步的时候发现自己依赖了对象A,于是尝试get(A),尝试一级缓存singletonObjects(肯定没有,因为A还没初始化完全),尝试二级缓存earlySingletonObjects(也没有),尝试三级缓存singletonFactories,由于A通过ObjectFactory将自己提前曝光了,所以B能够通过ObjectFactory.getObject拿到A对象(虽然A还没有初始化完全,但是总比没有好呀),B拿到A对象后顺利完成了初始化阶段1、2、3,完全初始化之后将自己放入到一级缓存singletonObjects中。此时返回A中,A此时能拿到B的对象顺利完成自己的初始化阶段2、3,最终A也完成了初始化,进去了一级缓存singletonObjects中,而且更加幸运的是,由于B拿到了A的对象引用,所以B现在hold住的A对象完成了初始化。

知道了这个原理时候,肯定就知道为啥Spring不能解决“A的构造方法中依赖了B的实例对象,同时B的构造方法中依赖了A的实例对象”这类问题了!因为加入singletonFactories三级缓存的前提是执行了构造器,所以构造器的循环依赖没法解决。

参考地址

跳跃表(skiplist)是一种有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。

跳跃表支持平均O(logN)、最坏O(N)复杂度的节点查找,还可以通过顺序性操作来批量处理节点。在大部分情况下,跳跃表的效率可以和平衡树相媲美,并且因为跳跃表的实现比平衡树要来得更为简单,所以有不少程序都使用跳跃表来代替平衡树。

Redis使用跳跃表作为有序集合键的底层实现之一,如果一个有序集合包含的元素数量比较多,又或者有序集合中元素的成员(member)是比较长的字符串时,Redis就会使用跳跃表来作为有序集合键的底层实现。

以下是个典型的跳跃表例子

http://static.cyblogs.com/skiplist.png

从图中可以看到, 跳跃表主要由以下部分构成:

  • 表头(head):负责维护跳跃表的节点指针。
  • 跳跃表节点:保存着元素值,以及多个层。
  • 层:保存着指向其他元素的指针。高层的指针越过的元素数量大于等于低层的指针,为了提高查找的效率,程序总是从高层先开始访问,然后随着元素值范围的缩小,慢慢降低层次。
  • 表尾:全部由 NULL 组成,表示跳跃表的末尾。

因为跳跃表的定义可以在任何一本算法或数据结构的书中找到, 所以本章不介绍跳跃表的具体实现方式或者具体的算法, 而只介绍跳跃表在 Redis 的应用、核心数据结构和 API 。

跳跃表的实现

为了满足自身的功能需要, Redis 基于 William Pugh 论文中描述的跳跃表进行了以下修改:

  1. 允许重复的 score 值:多个不同的 memberscore 值可以相同。
  2. 进行对比操作时,不仅要检查 score 值,还要检查 member :当 score 值可以重复时,单靠 score 值无法判断一个元素的身份,所以需要连 member 域都一并检查才行。
  3. 每个节点都带有一个高度为 1 层的后退指针,用于从表尾方向向表头方向迭代:当执行 ZREVRANGEZREVRANGEBYSCORE 这类以逆序处理有序集的命令时,就会用到这个属性。

这个修改版的跳跃表由 redis.h/zskiplist 结构定义:

1
2
3
4
5
6
7
8
typedef struct zskiplist {
// 头节点,尾节点
struct zskiplistNode *header, *tail;
// 节点数量
unsigned long length;
// 目前表内节点的最大层数
int level;
} zskiplist;

跳跃表的节点由 redis.h/zskiplistNode 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zskiplistNode {
// member 对象
robj *obj;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 这个层跨越的节点数量
unsigned int span;
} level[];
} zskiplistNode;

以下是操作这两个数据结构的 API ,API 的用途与相应的算法复杂度:

函数 作用 复杂度
zslCreateNode 创建并返回一个新的跳跃表节点 最坏 O(1)O(1)
zslFreeNode 释放给定的跳跃表节点 最坏 O(1)O(1)
zslCreate 创建并初始化一个新的跳跃表 最坏 O(1)O(1)
zslFree 释放给定的跳跃表 最坏 O(N)O(N)
zslInsert 将一个包含给定 scoremember 的新节点添加到跳跃表中 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)
zslDeleteNode 删除给定的跳跃表节点 最坏 O(N)O(N)
zslDelete 删除匹配给定 memberscore 的元素 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)
zslFirstInRange 找到跳跃表中第一个符合给定范围的元素 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)
zslLastInRange 找到跳跃表中最后一个符合给定范围的元素 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)
zslDeleteRangeByScore 删除 score 值在给定范围内的所有节点 最坏 O(N2)O(N2)
zslDeleteRangeByRank 删除给定排序范围内的所有节点 最坏 O(N2)O(N2)
zslGetRank 返回目标元素在有序集中的排位 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)
zslGetElementByRank 根据给定排位,返回该排位上的元素节点 最坏 O(N)O(N) 平均 O(logN)O(log⁡N)

跳跃表的应用

和字典、链表或者字符串这几种在 Redis 中大量使用的数据结构不同, 跳跃表在 Redis 的唯一作用, 就是实现有序集数据类型。

跳跃表将指向有序集的 score 值和 member 域的指针作为元素, 并以 score 值为索引, 对有序集元素进行排序。

举个例子, 以下代码创建了一个带有 3 个元素的有序集:

1
2
3
4
5
6
7
8
9
10
redis> ZADD s 6 x 10 y 15 z
(integer) 3

redis> ZRANGE s 0 -1 WITHSCORES
1) "x"
2) "6"
3) "y"
4) "10"
5) "z"
6) "15"

在底层实现中, Redis 为 xyz 三个 member 分别创建了三个字符串, 值分别为 double 类型的 61015 , 然后用跳跃表将这些指针有序地保存起来, 形成这样一个跳跃表:

http://static.cyblogs.com/QQ20200329-225534@2x.jpg

为了方便展示, 在图片中我们直接将 memberscore 值包含在表节点中, 但是在实际的定义中, 因为跳跃表要和另一个实现有序集的结构(字典)分享 memberscore 值, 所以跳跃表只保存指向 memberscore 的指针。 更详细的信息,请参考《有序集》章节。

小结

❑跳跃表是有序集合的底层实现之一。

❑Redis的跳跃表实现由zskiplist和zskiplistNode两个结构组成,其中zskiplist用于保存跳跃表信息(比如表头节点、表尾节点、长度),而zskiplistNode则用于表示跳跃表节点。

❑每个跳跃表节点的层高都是1至32之间的随机数。

❑在同一个跳跃表中,多个节点可以包含相同的分值,但每个节点的成员对象必须是唯一的。

❑跳跃表中的节点按照分值大小进行排序,当分值相同时,节点按照成员对象的大小进行排序。

参考地址

Redis 通过 MULTIDISCARDEXECWATCH 四个命令来实现事务功能, 本章首先讨论使用 MULTIDISCARDEXEC 三个命令实现的一般事务, 然后再来讨论带有 WATCH 的事务的实现。

因为事务的安全性也非常重要, 所以本章最后通过常见的 ACID 性质对 Redis 事务的安全性进行了说明。

事务

事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制, 并且事务在执行的期间不会主动中断 —— 服务器在执行完事务中的所有命令之后, 才会继续处理其他客户端的其他命令。

以下是一个事务的例子, 它先以 MULTI 开始一个事务, 然后将多个命令入队到事务中, 最后由 EXEC 命令触发事务, 一并执行事务中的所有命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
redis> MULTI
OK

redis> SET book-name "Mastering C++ in 21 days"
QUEUED

redis> GET book-name
QUEUED

redis> SADD tag "C++" "Programming" "Mastering Series"
QUEUED

redis> SMEMBERS tag
QUEUED

redis> EXEC
1) OK
2) "Mastering C++ in 21 days"
3) (integer) 3
4) 1) "Mastering Series"
2) "C++"
3) "Programming"

一个事务从开始到执行会经历以下三个阶段:

  1. 开始事务。
  2. 命令入队。
  3. 执行事务。

下文将分别介绍事务的这三个阶段。

开始事务

MULTI 命令的执行标记着事务的开始:

1
2
redis> MULTI
OK

这个命令唯一做的就是, 将客户端的 REDIS_MULTI 选项打开, 让客户端从非事务状态切换到事务状态。

http://static.cyblogs.com/QQ20200308-222741@2x.jpg

命令入队

1
2
3
4
5
redis> SET msg "hello moto"
OK

redis> GET msg
"hello moto"

但是, 当客户端进入事务状态之后, 服务器在收到来自客户端的命令时, 不会立即执行命令, 而是将这些命令全部放进一个事务队列里, 然后返回 QUEUED , 表示命令已入队:

1
2
3
4
5
6
7
8
redis> MULTI
OK

redis> SET msg "hello moto"
QUEUED

redis> GET msg
QUEUED

以下流程图展示了这一行为:

http://static.cyblogs.com/Redis事物队列.jpg

事务队列是一个数组, 每个数组项是都包含三个属性:

  1. 要执行的命令(cmd)。
  2. 命令的参数(argv)。
  3. 参数的个数(argc)。

举个例子, 如果客户端执行以下命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
redis> MULTI
OK

redis> SET book-name "Mastering C++ in 21 days"
QUEUED

redis> GET book-name
QUEUED

redis> SADD tag "C++" "Programming" "Mastering Series"
QUEUED

redis> SMEMBERS tag
QUEUED

那么程序将为客户端创建以下事务队列:

数组索引 cmd argv argc
0 SET ["book-name", "Mastering C++ in 21 days"] 2
1 GET ["book-name"] 1
2 SADD ["tag", "C++", "Programming", "Mastering Series"] 4
3 SMEMBERS ["tag"] 1

执行事务

前面说到, 当客户端进入事务状态之后, 客户端发送的命令就会被放进事务队列里。

但其实并不是所有的命令都会被放进事务队列, 其中的例外就是 EXECDISCARDMULTIWATCH 这四个命令 —— 当这四个命令从客户端发送到服务器时, 它们会像客户端处于非事务状态一样, 直接被服务器执行:

http://static.cyblogs.com/QQ20200308-225830@2x.jpg

如果客户端正处于事务状态, 那么当 EXEC 命令执行时, 服务器根据客户端所保存的事务队列, 以先进先出(FIFO)的方式执行事务队列中的命令: 最先入队的命令最先执行, 而最后入队的命令最后执行。

比如说,对于以下事务队列:

数组索引 cmd argv argc
0 SET ["book-name", "Mastering C++ in 21 days"] 2
1 GET ["book-name"] 1
2 SADD ["tag", "C++", "Programming", "Mastering Series"] 4
3 SMEMBERS ["tag"] 1

程序会首先执行 SET 命令, 然后执行 GET 命令, 再然后执行 SADD 命令, 最后执行 SMEMBERS 命令。

执行事务中的命令所得的结果会以 FIFO 的顺序保存到一个回复队列中。

比如说,对于上面给出的事务队列,程序将为队列中的命令创建如下回复队列:

数组索引 回复类型 回复内容
0 status code reply OK
1 bulk reply "Mastering C++ in 21 days"
2 integer reply 3
3 multi-bulk reply ["Mastering Series", "C++", "Programming"]

当事务队列里的所有命令被执行完之后, EXEC 命令会将回复队列作为自己的执行结果返回给客户端, 客户端从事务状态返回到非事务状态, 至此, 事务执行完毕。

事务的整个执行过程可以用以下伪代码表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def execute_transaction():

# 创建空白的回复队列
reply_queue = []

# 取出事务队列里的所有命令、参数和参数数量
for cmd, argv, argc in client.transaction_queue:

# 执行命令,并取得命令的返回值
reply = execute_redis_command(cmd, argv, argc)

# 将返回值追加到回复队列末尾
reply_queue.append(reply)

# 清除客户端的事务状态
clear_transaction_state(client)

# 清空事务队列
clear_transaction_queue(client)

# 将事务的执行结果返回给客户端
send_reply_to_client(client, reply_queue)

在事务和非事务状态下执行命令

无论在事务状态下, 还是在非事务状态下, Redis 命令都由同一个函数执行, 所以它们共享很多服务器的一般设置, 比如 AOF 的配置、RDB 的配置,以及内存限制,等等。

不过事务中的命令和普通命令在执行上还是有一点区别的,其中最重要的两点是:

  1. 非事务状态下的命令以单个命令为单位执行,前一个命令和后一个命令的客户端不一定是同一个;

    而事务状态则是以一个事务为单位,执行事务队列中的所有命令:除非当前事务执行完毕,否则服务器不会中断事务,也不会执行其他客户端的其他命令。

  2. 在非事务状态下,执行命令所得的结果会立即被返回给客户端;

    而事务则是将所有命令的结果集合到回复队列,再作为 EXEC 命令的结果返回给客户端。

事务状态下的 DISCARD 、 MULTI 和 WATCH 命令

除了 EXEC 之外, 服务器在客户端处于事务状态时, 不加入到事务队列而直接执行的另外三个命令是 DISCARDMULTIWATCH

DISCARD 命令用于取消一个事务, 它清空客户端的整个事务队列, 然后将客户端从事务状态调整回非事务状态, 最后返回字符串 OK 给客户端, 说明事务已被取消。

Redis 的事务是不可嵌套的, 当客户端已经处于事务状态, 而客户端又再向服务器发送 MULTI 时, 服务器只是简单地向客户端发送一个错误, 然后继续等待其他命令的入队。 MULTI 命令的发送不会造成整个事务失败, 也不会修改事务队列中已有的数据。

WATCH 只能在客户端进入事务状态之前执行, 在事务状态下发送 WATCH 命令会引发一个错误, 但它不会造成整个事务失败, 也不会修改事务队列中已有的数据(和前面处理 MULTI 的情况一样)。

带 WATCH 的事务

WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

以下示例展示了一个执行失败的事务例子:

1
2
3
4
5
6
7
8
9
10
11
redis> WATCH name
OK

redis> MULTI
OK

redis> SET name peter
QUEUED

redis> EXEC
(nil)

以下执行序列展示了上面的例子是如何失败的:

时间 客户端 A 客户端 B
T1 WATCH name
T2 MULTI
T3 SET name peter
T4 SET name john
T5 EXEC

在时间 T4 ,客户端 B 修改了 name 键的值, 当客户端 A 在 T5 执行 EXEC 时,Redis 会发现 name 这个被监视的键已经被修改, 因此客户端 A 的事务不会被执行,而是直接返回失败。

下文就来介绍 WATCH 的实现机制,并且看看事务系统是如何检查某个被监视的键是否被修改,从而保证事务的安全性的。

WATCH 命令的实现

在每个代表数据库的 redis.h/redisDb 结构类型中, 都保存了一个 watched_keys 字典, 字典的键是这个数据库被监视的键, 而字典的值则是一个链表, 链表中保存了所有监视这个键的客户端。

比如说,以下字典就展示了一个 watched_keys 字典的例子:

http://static.cyblogs.com/QQ20200308-225905@2x.jpg

其中, 键 key1 正在被 client2client5client1 三个客户端监视, 其他一些键也分别被其他别的客户端监视着。

WATCH 命令的作用, 就是将当前客户端和要监视的键在 watched_keys 中进行关联。

举个例子, 如果当前客户端为 client10086 , 那么当客户端执行 WATCH key1 key2 时, 前面展示的 watched_keys 将被修改成这个样子:

http://static.cyblogs.com/QQ20200308-225928@2x.jpg

通过 watched_keys 字典, 如果程序想检查某个键是否被监视, 那么它只要检查字典中是否存在这个键即可; 如果程序要获取监视某个键的所有客户端, 那么只要取出键的值(一个链表), 然后对链表进行遍历即可。

WATCH 的触发

在任何对数据库键空间(key space)进行修改的命令成功执行之后 (比如 FLUSHDBSETDELLPUSHSADDZREM ,诸如此类), multi.c/touchWatchedKey 函数都会被调用 —— 它检查数据库的 watched_keys 字典, 看是否有客户端在监视已经被命令修改的键, 如果有的话, 程序将所有监视这个/这些被修改键的客户端的 REDIS_DIRTY_CAS 选项打开:

http://static.cyblogs.com/QQ20200308-225953@2x.jpg

当客户端发送 EXEC 命令、触发事务执行时, 服务器会对客户端的状态进行检查:

  • 如果客户端的 REDIS_DIRTY_CAS 选项已经被打开,那么说明被客户端监视的键至少有一个已经被修改了,事务的安全性已经被破坏。服务器会放弃执行这个事务,直接向客户端返回空回复,表示事务执行失败。
  • 如果 REDIS_DIRTY_CAS 选项没有被打开,那么说明所有监视键都安全,服务器正式执行事务。

可以用一段伪代码来表示这个检查:

1
2
3
4
5
6
7
8
9
10
11
12
def check_safety_before_execute_trasaction():

if client.state & REDIS_DIRTY_CAS:
# 安全性已破坏,清除事务状态
clear_transaction_state(client)
# 清空事务队列
clear_transaction_queue(client)
# 返回空回复给客户端
send_empty_reply(client)
else:
# 安全性完好,执行事务
execute_transaction()

举个例子,假设数据库的 watched_keys 字典如下图所示:

http://static.cyblogs.com/QQ20200308-230029@2x.jpg

事务的 ACID 性质

在传统的关系式数据库中,常常用 ACID 性质来检验事务功能的安全性。

Redis 事务保证了其中的一致性(C)和隔离性(I),但并不保证原子性(A)和持久性(D)。

以下四小节是关于这四个性质的详细讨论。

原子性(Atomicity)

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。

如果一个事务队列中的所有命令都被成功地执行,那么称这个事务执行成功。

另一方面,如果 Redis 服务器进程在执行事务的过程中被停止 —— 比如接到 KILL 信号、宿主机器停机,等等,那么事务执行失败。

当事务失败时,Redis 也不会进行任何的重试或者回滚动作。

一致性(Consistency)

Redis 的一致性问题可以分为三部分来讨论:入队错误、执行错误、Redis 进程被终结。

入队错误

在命令入队的过程中,如果客户端向服务器发送了错误的命令,比如命令的参数数量不对,等等, 那么服务器将向客户端返回一个出错信息, 并且将客户端的事务状态设为 REDIS_DIRTY_EXEC

当客户端执行 EXEC 命令时, Redis 会拒绝执行状态为 REDIS_DIRTY_EXEC 的事务, 并返回失败信息。

1
2
3
4
5
6
7
8
9
10
11
redis 127.0.0.1:6379> MULTI
OK

redis 127.0.0.1:6379> set key
(error) ERR wrong number of arguments for 'set' command

redis 127.0.0.1:6379> EXISTS key
QUEUED

redis 127.0.0.1:6379> EXEC
(error) EXECABORT Transaction discarded because of previous errors.

因此,带有不正确入队命令的事务不会被执行,也不会影响数据库的一致性。

执行错误

如果命令在事务执行的过程中发生错误,比如说,对一个不同类型的 key 执行了错误的操作, 那么 Redis 只会将错误包含在事务的结果中, 这不会引起事务中断或整个失败,不会影响已执行事务命令的结果,也不会影响后面要执行的事务命令, 所以它对事务的一致性也没有影响。

Redis 进程被终结

如果 Redis 服务器进程在执行事务的过程中被其他进程终结,或者被管理员强制杀死,那么根据 Redis 所使用的持久化模式,可能有以下情况出现:

  • 内存模式:如果 Redis 没有采取任何持久化机制,那么重启之后的数据库总是空白的,所以数据总是一致的。

  • RDB 模式:在执行事务时,Redis 不会中断事务去执行保存 RDB 的工作,只有在事务执行之后,保存 RDB 的工作才有可能开始。所以当 RDB 模式下的 Redis 服务器进程在事务中途被杀死时,事务内执行的命令,不管成功了多少,都不会被保存到 RDB 文件里。恢复数据库需要使用现有的 RDB 文件,而这个 RDB 文件的数据保存的是最近一次的数据库快照(snapshot),所以它的数据可能不是最新的,但只要 RDB 文件本身没有因为其他问题而出错,那么还原后的数据库就是一致的。

  • AOF 模式:因为保存 AOF 文件的工作在后台线程进行,所以即使是在事务执行的中途,保存 AOF 文件的工作也可以继续进行,因此,根据事务语句是否被写入并保存到 AOF 文件,有以下两种情况发生:

    1)如果事务语句未写入到 AOF 文件,或 AOF 未被 SYNC 调用保存到磁盘,那么当进程被杀死之后,Redis 可以根据最近一次成功保存到磁盘的 AOF 文件来还原数据库,只要 AOF 文件本身没有因为其他问题而出错,那么还原后的数据库总是一致的,但其中的数据不一定是最新的。

    2)如果事务的部分语句被写入到 AOF 文件,并且 AOF 文件被成功保存,那么不完整的事务执行信息就会遗留在 AOF 文件里,当重启 Redis 时,程序会检测到 AOF 文件并不完整,Redis 会退出,并报告错误。需要使用 redis-check-aof 工具将部分成功的事务命令移除之后,才能再次启动服务器。还原之后的数据总是一致的,而且数据也是最新的(直到事务执行之前为止)。

隔离性(Isolation)

Redis 是单进程程序,并且它保证在执行事务时,不会对事务进行中断,事务可以运行直到执行完所有事务队列中的命令为止。因此,Redis 的事务是总是带有隔离性的。

持久性(Durability)

因为事务不过是用队列包裹起了一组 Redis 命令,并没有提供任何额外的持久性功能,所以事务的持久性由 Redis 所使用的持久化模式决定:

  • 在单纯的内存模式下,事务肯定是不持久的。

  • 在 RDB 模式下,服务器可能在事务执行之后、RDB 文件更新之前的这段时间失败,所以 RDB 模式下的 Redis 事务也是不持久的。

  • 在 AOF 的“总是 SYNC ”模式下,事务的每条命令在执行成功之后,都会立即调用 fsyncfdatasync 将事务数据写入到 AOF 文件。但是,这种保存是由后台线程进行的,主线程不会阻塞直到保存成功,所以从命令执行成功到数据保存到硬盘之间,还是有一段非常小的间隔,所以这种模式下的事务也是不持久的。

    其他 AOF 模式也和“总是 SYNC ”模式类似,所以它们都是不持久的。

小结

  • 事务提供了一种将多个命令打包,然后一次性、有序地执行的机制。
  • 事务在执行过程中不会被中断,所有事务命令执行完之后,事务才能结束。
  • 多个命令会被入队到事务队列中,然后按先进先出(FIFO)的顺序执行。
  • WATCH 命令的事务会将客户端和被监视的键在数据库的 watched_keys 字典中进行关联,当键被修改时,程序会将所有监视被修改键的客户端的 REDIS_DIRTY_CAS 选项打开。
  • 只有在客户端的 REDIS_DIRTY_CAS 选项未被打开时,才能执行事务,否则事务直接返回失败。
  • Redis 的事务保证了 ACID 中的一致性(C)和隔离性(I),但并不保证原子性(A)和持久性(D)。

参考地址

一、Redis-Sentinel(哨兵)

1、介绍

Redis-Sentinel是redis官方推荐的高可用性解决方案,
当用redis作master-slave的高可用时,如果master本身宕机,redis本身或者客户端都没有实现主从切换的功能。

而redis-sentinel就是一个独立运行的进程,用于监控多个master-slave集群,
自动发现master宕机,进行自动切换slave > master。

sentinel主要功能如下:

  1. 不时的监控redis是否良好运行,如果节点不可达就会对节点进行下线标识
  2. 如果被标识的是主节点,sentinel就会和其他的sentinel节点“协商”,如果其他节点也认为主节点不可达,
   就会选举一个sentinel节点来完成自动故障转移
  3. 在master-slave进行切换后,master_redis.conf、slave_redis.conf和sentinel.conf的内容都会发生改变,
   即master_redis.conf中会多一行slaveof的配置,sentinel.conf的监控目标会随之调换

2、工作原理

每个Sentinel以每秒钟一次的频率向它所知的Master,Slave以及其他 Sentinel 实例发送一个 PING 命令

如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被 Sentinel 标记为主观下线。

如果一个Master被标记为主观下线,则正在监视这个Master的所有 Sentinel 要以每秒一次的频率确认Master的确进入了主观下线状态。

当有足够数量的 Sentinel(大于等于配置文件指定的值)在指定的时间范围内确认Master的确进入了主观下线状态, 则Master会被标记为客观下线

在一般情况下, 每个 Sentinel 会以每 10 秒一次的频率向它已知的所有Master,Slave发送 INFO 命令

当Master被 Sentinel 标记为客观下线时,Sentinel 向下线的 Master 的所有 Slave 发送 INFO 命令的频率会从 10 秒一次改为每秒一次

若没有足够数量的 Sentinel 同意 Master 已经下线, Master 的客观下线状态就会被移除。

若 Master 重新向 Sentinel 的 PING 命令返回有效回复, Master 的主观下线状态就会被移除。

主观下线和客观下线

主观下线:Subjectively Down,简称 SDOWN,指的是当前 Sentinel 实例对某个redis服务器做出的下线判断。
客观下线:Objectively Down, 简称 ODOWN,指的是多个 Sentinel 实例在对Master Server做出 SDOWN 判断,并且通过 SENTINEL is-master-down-by-addr 命令互相交流之后,得出的Master Server下线判断,然后开启failover.

SDOWN适合于Master和Slave,只要一个 Sentinel 发现Master进入了ODOWN, 这个 Sentinel 就可能会被其他 Sentinel 推选出, 并对下线的主服务器执行自动故障迁移操作。

ODOWN只适用于Master,对于Slave的 Redis 实例,Sentinel 在将它们判断为下线前不需要进行协商, 所以Slave的 Sentinel 永远不会达到ODOWN。

3、master宕机处理

img

img

如果master宕机,我们应该先选一个slave出来,让他成为新的master,其他redis都修改成这个新的master的slave,但是redis本身或者客户端都没有实现主从切换的功能,当然,人为地修改配置文件,实现上图的功能也是可以的,但是如果是在深夜,所有人都睡觉了呢,谁来修改配置信息?这个时候就可以使用redis的Sentinel功能了,它就是实现了,当发现master宕机,自动帮我们去修改其他redis配置文件,选举出一个新master。

4、Sentinel功能实现图

img

img

img

5、redis一些查看命令

1
2
3
4
5
redis-cli info  # 查看redis数据库信息

redis-cli info replication # 查看redis的复制授权信息(主从复制)

redis-cli info sentinel # 查看redis的哨兵信息

6、Redis主从配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1.准备三个redis实例,一主两从
# redis-6379.conf配置
port 6379
daemonize yes
logfile "6379.log"
dbfilename "dump-6379.rdb"
dir "/var/redis/data/"


# redis-6380.conf配置
port 6380
daemonize yes
logfile "6380.log"
dbfilename "dump-6380.rdb"
dir "/var/redis/data/"
slaveof 127.0.0.1 6379



# redis-6381.conf配置
port 6381
daemonize yes
logfile "6381.log"
dbfilename "dump-6381.rdb"
dir "/var/redis/data/"
slaveof 127.0.0.1 6379


2. 准备好了三个数据库实例,启动三个数据库实例
# 启动前记得先去创建redis数据存放的文件夹
mkdir -p /var/redis/data/

# 启动实例
redis-server redis-6379.conf
redis-server redis-6380.conf
redis-server redis-6381.conf

# 查看redis服务是否已经启动
ps -ef | grep redis


3. 确定主从关系
redis-cli -p 6379 info replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6380,state=online,offset=336,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=336,lag=1


redis-cli -p 6380 info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379

7、Redis Sentinel安装配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
1. Sentinel配置解析
port 26379 // Sentinel的端口
dir /var/redis/data/ // Sentinel日志文件存放位置
logfile "26379.log" // Sentinel日志文件名字

// 当前Sentinel节点监控 127.0.0.1:6379 这个主节点
// 2代表判断主节点失败至少需要2个Sentinel节点节点同意
// mymaster是主节点的别名
sentinel monitor mymaster 127.0.0.1 6379 2

// 每个Sentinel节点都要定期PING命令来判断Redis数据节点和其余Sentinel节点是否可达
// 如果超过30000毫秒30s且没有回复,则判定不可达
sentinel down-after-milliseconds mymaster 30000

// 当Sentinel节点集合对主节点故障判定达成一致时,Sentinel领导者节点会做故障转移操作,选出新的主节点,
// 原来的从节点会向新的主节点发起复制操作,限制每次向新的主节点发起复制操作的从节点个数为1
sentinel parallel-syncs mymaster 1

//故障转移超时时间为180000毫秒
sentinel failover-timeout mymaster 180000

// 后台执行
daemonize yes


2. 准备三个哨兵,开始监控主从架构
# 哨兵配置文件redis-26379.conf
port 26379
dir /var/redis/data/
logfile "26379.log"
sentinel monitor zbjmaster 127.0.0.1 6379 2
sentinel down-after-milliseconds zbjmaster 30000
sentinel parallel-syncs zbjmaster 1
sentinel failover-timeout zbjmaster 180000
daemonize yes


# 哨兵配置文件redis-26380.conf
port 26380
dir /var/redis/data/
logfile "26380.log"
sentinel monitor zbjmaster 127.0.0.1 6379 2
sentinel down-after-milliseconds zbjmaster 30000
sentinel parallel-syncs zbjmaster 1
sentinel failover-timeout zbjmaster 180000
daemonize yes



# 哨兵配置文件redis-26381.conf
port 26381
dir /var/redis/data/
logfile "26381.log"
sentinel monitor zbjmaster 127.0.0.1 6379 2
sentinel down-after-milliseconds zbjmaster 30000
sentinel parallel-syncs zbjmaster 1
sentinel failover-timeout zbjmaster 180000
daemonize yes



3. 启动三个哨兵实例
redis-sentinel redis-26379.conf
redis-sentinel redis-26380.conf
redis-sentinel redis-26381.conf

注意!!如果发现实验不成功,需删掉所有的哨兵配置文件,从新来过
注意!!如果发现实验不成功,需删掉所有的哨兵配置文件,从新来过
注意!!如果发现实验不成功,需删掉所有的哨兵配置文件,从新来过

# 检查哨兵状态是否正常
# 只有发现如下信息,即为正常
redis-cli -p 26379 info sentinel
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
# 最重要的一句
master0:name=zbjmaster,status=ok,address=127.0.0.1:6380,slaves=2,sentinels=3


4. 测试哨兵的自动主从切换
1,干掉6379的redis数据库
kill -9 6379的PID

2,查看6380和6381的身份信息,是否自动的进行主从切换
我们设置的是30s后master没有响应,哨兵自动进行主从切换,因此30s后查看主从信息
redis-cli -p 6380 info replication
redis-cli -p 6381 info replication

3,手动启动6379挂掉的数据库,查看是否会被哨兵,添加进信息的主从集群
redis-server redis-6379.conf
redis-cli -p 6379 info replication

二、redis分区和集群

1、什么是分区和集群

  1. 分区
    分区是分割数据到多个Redis实例的处理过程,因此每个实例只保存key的一个子集。
    分区可以让Redis管理更大的内存,Redis将可以使用所有机器的内存。如果没有分区,你最多只能使用一台机器的内存。
    分区使Redis的计算能力通过简单地增加计算机得到成倍提升,Redis的网络带宽也会随着计算机和网卡的增加而成倍增长。

  2. 集群
    redis集群就是分区的一种的实现

2、为什么要用分区

  1. 并发问题
    官方声称 redis 每秒可以执行10万条命令
    但是假如业务需要每秒100万的命令执行呢(例如新浪微博某某明星出轨、官宣之类的)

  2. 数据量
    当数据量太大的时候,一台服务器内存正常是16~256G,假如你的业务需要500G内存,怎么办?

  3. 解决方案

    1. 方案一:
      配置一台超级牛逼的服务器,拥有超大内存和超强的cpu,
      但是这么做的成本是非常高的,而且,万一这台机器宕掉了,那你的服务还不是全挂了。
    2. 方案二:
      考虑分布式,加机器,把数据分到不同的位置,分摊集中式的压力,一堆机器做一件事。

3、分区的数据分布理论

redis是一个非关系型数据库,它的存储是key-value形式的,
redis实例集群主要思想是将redis数据的key进行散列,通过hash函数特定的key会映射到指定的redis节点上

分布式数据库首要解决把整个数据集按照分区规则映射到多个节点的问题,即把数据集划分到多个节点上,每个节点负责整个数据的一个子集。

常见的分区规则有哈希分区和顺序分区。

4、顺序分区

假设我有三个节点,100个redis的数据,按照平均值(几乎是平均的),顺序分区的规则就是:
把1-33个数据 放在 node1
把34-66个数据 放在node2
把67-100个数据 放在node3

5、哈希分区

  1. 节点取余
    例如按照节点取余的方式,分三个节点
    1~100的数据对3取余,可以分为三类
    余数为0
    余数为1
    余数为2
    把余数为0的数据存到同一个节点
    把余数为1的数据存到同一个节点
    把余数为2的数据存到同一个节点

那么同样的分4个节点就是hash(key)%4,余数相同的存到同一个节点
节点取余的优点是简单,客户端分片直接是哈希+取余

  1. 一致性哈希
    客户端进行分片,哈希+顺时针取余
  2. 虚拟槽分区
    本文研究哈希分区之虚拟槽分区,因此下面单独来聊一聊

三、哈希分区之虚拟槽分区

1、介绍

Redis Cluster采用的就是虚拟槽分区

虚拟槽分区巧妙地使用了哈希空间,使用分散度良好的哈希函数把所有的数据映射到一个固定范围内的整数集合,
这些整数就定义为槽(slot)。

Redis Cluster槽的范围是0 ~ 16383,即一共16384个槽

槽是集群内数据管理和迁移的基本单位。采用大范围的槽的主要目的是为了方便数据的拆分和集群的扩展,

每个节点(redis实例)负责一定数量的槽。

2、虚拟槽图解

img

3、搭建redis cluster

redis支持多实例的功能,我们在单机演示集群搭建,需要6个实例,三个是主节点,三个是从节点,数量为6个节点才能保证高可用的集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
1.准备6个节点,用于存储数据,分配槽位,每个节点的配置,如下,仅仅是端口的区别
# redis-7000.conf配置
port 7000
daemonize yes
dir "/opt/redis/data"
logfile "7000.log"
dbfilename "dump-7000.rdb"
cluster-enabled yes
cluster-config-file nodes-7000.conf 

其余5个配置跟上面一模一样,仅仅是端口的区别
# redis-7001.conf配置
# redis-7002.conf配置
# redis-7003.conf配置
# redis-7004.conf配置
# redis-7005.conf配置

注意:要创建存放日志的文件夹 mkdir -p /opt/redis/data


2.启动6个数据库实例
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf

3.开始分配redis集群状态,以及槽位分配
Redis Cluster本身提供了自动将数据分散到Redis Cluster不同节点的能力,
但是槽位的分配就比较麻烦了,当然了,如果你是大神,你完全可以自定义槽位的分配,
一些大神已经写好了槽位分配的工具或脚本了,例如豆瓣公司开源的codis工具,还有ruby语言的作者,写的redsi.rb,
因此我们可以使用一些工具帮我们进行redis cluster的搭建

4.通过ruby脚本,一键创建redis-cluster,进行槽位分配

5.准备ruby的编程环境
1,下载ruby的源码包
wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.1.tar.gz

2,解压缩ruby远吗
tar -zxvf ruby-2.3.1.tar.gz

3,开始编译安装ruby
进入ruby源码包
./configure --prefix=/opt/ruby/

4,开始编译且编译安装
make && make install

5,配置ruby的环境变量
vim /etc/profile
写入如下配置
PATH=你原本的PATH:/opt/ruby/bin

source /etc/profile


6.安装ruby操作redis的模块
1,下载ruby操作redis的模块
wget http://rubygems.org/downloads/redis-3.3.0.gem

2,安装
gem install -l redis-3.3.0.gem

3,搜索创建redis集群的命令
find /opt -name redis-trib.rb
/opt/redis-4.0.10/src/redis-trib.rb

7.一键创建redis集群
/opt/redis-4.0.10/src/redis-trib.rb create --replicas 1 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
--replicas 进行身份授权
后面的1 代表,每个主节点,只有一个从节点
默认将 7000 7001 70002 设置为主库
将7003 7004 7005 设置为从库

8.检查集群状态
redis-cli -p 7000 cluster info

9.测试集群节点,看是否能正常写入数据
redis-cli -c -p 7000
-p 指定数据库端口
-c 指定开启集群模式

set age 18 # 设置一个key会自动分配槽位,重定向到槽位所在的节点即代表成功

在任意一个节点都可以get age,会自动重定向到 age 所在的节点。

10.redis-cluster 会默认将不同的key,进行CRC16算法,进行分配到不同槽位

11.数据正常重定向,即redis集群ok

参考地址

0%