简栈文化

Java技术人的成长之路~

对象拷贝有哪些

对象拷贝(Object Copy)就是将一个对象的属性拷贝到另一个有着相同类类型的对象中去。在程序中拷贝对象是很常见的,主要是为了在新的上下文环境中复用对象的部分或全部数据。

Java中有三种类型的对象拷贝:

  • 浅拷贝(Shallow Copy)
  • 深拷贝(Deep Copy)
  • 延迟拷贝(Lazy Copy)

理解浅拷贝

什么是浅拷贝?
  • 浅拷贝是按位拷贝对象,它会创建一个新对象,这个对象有着原始对象属性值的一份精确拷贝。
    • 如果属性是基本类型,拷贝的就是基本类型的值;如果属性是内存地址(引用类型),拷贝的就是内存地址 ,因此如果其中一个对象改变了这个地址,就会影响到另一个对象。

    • http://static.cyblogs.com/QQ20200226-210909@2x.jpg

    • 在上图中,SourceObject有一个int类型的属性 “field1”和一个引用类型属性”refObj”(引用ContainedObject类型的对象)。当对SourceObject做浅拷贝时,创建了CopiedObject,它有一个包含”field1”拷贝值的属性”field2”以及仍指向refObj本身的引用。由于”field1”是基本类型,所以只是将它的值拷贝给”field2”,但是由于”refObj”是一个引用类型, 所以CopiedObject指向”refObj”相同的地址。因此对SourceObject中的”refObj”所做的任何改变都会影响到CopiedObject。

如何实现浅拷贝

下面来看一看实现浅拷贝的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Subject {

private String name;
public Subject(String s) {
name = s;
}

public String getName() {
return name;
}

public void setName(String s) {
name = s;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Student implements Cloneable { 

// 对象引用
private Subject subj;
private String name;

public Student(String s, String sub) {
name = s;
subj = new Subject(sub);
}

public Subject getSubj() {
return subj;
}

public String getName() {
return name;
}

public void setName(String s) {
name = s;
}

/**
* 重写clone()方法
* @return
*/
public Object clone() {
//浅拷贝
try {
// 直接调用父类的clone()方法
return super.clone();
} catch (CloneNotSupportedException e) {
return null;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CopyDemoMain {
public static void main(String[] args) {
// 原始对象
Student stud = new Student("杨充", "潇湘剑雨");
System.out.println("原始对象: " + stud.getName() + " - " + stud.getSubj().getName());

// 拷贝对象
Student clonedStud = (Student) stud.clone();
System.out.println("拷贝对象: " + clonedStud.getName() + " - " + clonedStud.getSubj().getName());

// 原始对象和拷贝对象是否一样:
System.out.println("原始对象和拷贝对象是否一样: " + (stud == clonedStud));
// 原始对象和拷贝对象的name属性是否一样
System.out.println("原始对象和拷贝对象的name属性是否一样: " + (stud.getName() == clonedStud.getName()));
// 原始对象和拷贝对象的subj属性是否一样
System.out.println("原始对象和拷贝对象的subj属性是否一样: " + (stud.getSubj() == clonedStud.getSubj()));

stud.setName("小杨逗比");
stud.getSubj().setName("潇湘剑雨大侠");
System.out.println("更新后的原始对象: " + stud.getName() + " - " + stud.getSubj().getName());
System.out.println("更新原始对象后的克隆对象: " + clonedStud.getName() + " - " + clonedStud.getSubj().getName());
}
}

输出结果如下:

1
2
3
4
5
6
7
8
9
Connected to the target VM, address: '127.0.0.1:57836', transport: 'socket'
原始对象: 杨充 - 潇湘剑雨
拷贝对象: 杨充 - 潇湘剑雨
原始对象和拷贝对象是否一样: false
原始对象和拷贝对象的name属性是否一样: true
原始对象和拷贝对象的subj属性是否一样: true
更新后的原始对象: 小杨逗比 - 潇湘剑雨大侠
更新原始对象后的克隆对象: 杨充 - 潇湘剑雨大侠
Disconnected from the target VM, address: '127.0.0.1:57836', transport: 'socket'

可以得出的结论

  • 在这个例子中,让要拷贝的类Student实现了Clonable接口并重写Object类的clone()方法,然后在方法内部调用super.clone()方法。从输出结果中我们可以看到,对原始对象stud的”name”属性所做的改变并没有影响到拷贝对象clonedStud,但是对引用对象subj的”name”属性所做的改变影响到了拷贝对象clonedStud。

理解深拷贝

什么是深拷贝?
  • 深拷贝会拷贝所有的属性,并拷贝属性指向的动态分配的内存。当对象和它所引用的对象一起拷贝时即发生深拷贝。深拷贝相比于浅拷贝速度较慢并且花销较大。
    • http://static.cyblogs.com/QQ20200226-212333@2x.jpg
    • 在上图中,SourceObject有一个int类型的属性 “field1”和一个引用类型属性”refObj1”(引用ContainedObject类型的对象)。当对SourceObject做深拷贝时,创建了CopiedObject,它有一个包含”field1”拷贝值的属性”field2”以及包含”refObj1”拷贝值的引用类型属性”refObj2” 。因此对SourceObject中的”refObj”所做的任何改变都不会影响到CopiedObject
实现深拷贝案例

下面是实现深拷贝的一个例子。只是在浅拷贝的例子上做了一点小改动,Subject 和CopyTest 类都没有变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DeepStudent implements Cloneable {
// 对象引用
private Subject subj;
private String name;

public DeepStudent(String s, String sub) {
name = s;
subj = new Subject(sub);
}

public Subject getSubj() {
return subj;
}

public String getName() {
return name;
}

public void setName(String s) {
name = s;
}

/**
* 重写clone()方法
*
* @return
*/
public Object clone() {
// 深拷贝,创建拷贝类的一个新对象,这样就和原始对象相互独立
DeepStudent s = new DeepStudent(name, subj.getName());
return s;
}
}

输出结果如下:

1
2
3
4
5
6
7
8
9
Connected to the target VM, address: '127.0.0.1:60802', transport: 'socket'
原始对象: 杨充 - 潇湘剑雨
拷贝对象: 杨充 - 潇湘剑雨
原始对象和拷贝对象是否一样: false
原始对象和拷贝对象的name属性是否一样: true
原始对象和拷贝对象的subj属性是否一样: true
更新后的原始对象: 小杨逗比 - 潇湘剑雨大侠
更新原始对象后的克隆对象: 杨充 - 潇湘剑雨大侠
Disconnected from the target VM, address: '127.0.0.1:60802', transport: 'socket'

得出的结论

  • 很容易发现clone()方法中的一点变化。因为它是深拷贝,所以你需要创建拷贝类的一个对象。因为在Student类中有对象引用,所以需要在Student类中实现Cloneable接口并且重写clone方法。

序列化进行拷贝

序列化属于深拷贝

可能你会问,序列化是属于那种类型拷贝?答案是:通过序列化来实现深拷贝。可以思考一下,为何序列化对象要用深拷贝而不是用浅拷贝呢?

注意要点

可以序列化是干什么的?它将整个对象图写入到一个持久化存储文件中并且当需要的时候把它读取回来, 这意味着当你需要把它读取回来时你需要整个对象图的一个拷贝。这就是当你深拷贝一个对象时真正需要的东西。请注意,当你通过序列化进行深拷贝时,必须确保对象图中所有类都是可序列化的。

序列化案例

看一下下面案例,很简单,只需要实现Serializable这个接口。Android中还可以实现Parcelable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ColoredCircle implements Serializable { 

private int x;
private int y;

public ColoredCircle(int x, int y) {
this.x = x;
this.y = y;
}

public int getX() {
return x;
}

public void setX(int x) {
this.x = x;
}

public int getY() {
return y;
}

public void setY(int y) {
this.y = y;
}

@Override
public String toString() {
return "x=" + x + ", y=" + y;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DouBi implements Serializable {

private static final long serialVersionUID = -8752043194511106066L;

private int x;
private int y;

public DouBi(int x, int y) {
this.x = x;
this.y = y;
}

public int getX() {
return x;
}

public void setX(int x) {
this.x = x;
}

public int getY() {
return y;
}

public void setY(int y) {
this.y = y;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CopyDemoMain3 {
public static void main(String[] args) {
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
try {
// 创建原始的可序列化对象
DouBi c1 = new DouBi(100, 100);
System.out.println("原始的对象 = " + c1);
DouBi c2 = null;
// 通过序列化实现深拷贝
ByteArrayOutputStream bos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(bos);
// 序列化以及传递这个对象
oos.writeObject(c1);
oos.flush();
ByteArrayInputStream bin = new ByteArrayInputStream(bos.toByteArray());
ois = new ObjectInputStream(bin);
// 返回新的对象
c2 = (DouBi) ois.readObject();
// 校验内容是否相同
System.out.println("复制后的对象 = " + c2);
// 改变原始对象的内容
c1.setX(200);
c1.setY(200);
// 查看每一个现在的内容
System.out.println("查看原始的对象 = " + c1);
System.out.println("查看复制的对象 = " + c2);
} catch (IOException e) {
System.out.println("Exception in main = " + e);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
if (oos != null) {
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (ois != null) {
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

输出结果如下:

1
2
3
4
5
6
Connected to the target VM, address: '127.0.0.1:62095', transport: 'socket'
原始的对象 = com.vernon.test.copy.DouBi@1134affc
复制后的对象 = com.vernon.test.copy.DouBi@1b604f19
查看原始的对象 = com.vernon.test.copy.DouBi@1134affc
查看复制的对象 = com.vernon.test.copy.DouBi@1b604f19
Disconnected from the target VM, address: '127.0.0.1:62095', transport: 'socket'

注意:需要做以下几件事儿:

  • 确保对象图中的所有类都是可序列化的

  • 创建输入输出流

  • 使用这个输入输出流来创建对象输入和对象输出流

  • 将你想要拷贝的对象传递给对象输出流

  • 从对象输入流中读取新的对象并且转换回你所发送的对象的类

  • 得出的结论

    • 在这个例子中,创建了一个DouBi对象c1然后将它序列化 (将它写到ByteArrayOutputStream中). 然后我反序列化这个序列化后的对象并将它保存到c2中。随后我修改了原始对象c1。然后结果如你所见,c1不同于c2,对c1所做的任何修改都不会影响c2。
    • 注意,序列化这种方式有其自身的限制和问题:因为无法序列化transient变量, 使用这种方法将无法拷贝transient变量。再就是性能问题。创建一个socket, 序列化一个对象, 通过socket传输它, 然后反序列化它,这个过程与调用已有对象的方法相比是很慢的。所以在性能上会有天壤之别。如果性能对你的代码来说是至关重要的,建议不要使用这种方式。它比通过实现Clonable接口这种方式来进行深拷贝几乎多花100倍的时间。

延迟拷贝

  • 延迟拷贝是浅拷贝和深拷贝的一个组合,实际上很少会使用。这个以前几乎都没听说过,后来看书才知道有这么一种拷贝!
  • 当最开始拷贝一个对象时,会使用速度较快的浅拷贝,还会使用一个计数器来记录有多少对象共享这个数据。当程序想要修改原始的对象时,它会决定数据是否被共享(通过检查计数器)并根据需要进行深拷贝。
  • 延迟拷贝从外面看起来就是深拷贝,但是只要有可能它就会利用浅拷贝的速度。当原始对象中的引用不经常改变的时候可以使用延迟拷贝。由于存在计数器,效率下降很高,但只是常量级的开销。而且, 在某些情况下, 循环引用会导致一些问题。

如何选择拷贝方式

  • 如果对象的属性全是基本类型的,那么可以使用浅拷贝。
  • 如果对象有引用属性,那就要基于具体的需求来选择浅拷贝还是深拷贝。
  • 意思是如果对象引用任何时候都不会被改变,那么没必要使用深拷贝,只需要使用浅拷贝就行了。如果对象引用经常改变,那么就要使用深拷贝。没有一成不变的规则,一切都取决于具体需求。

数组的拷贝

  • 数组除了默认实现了clone()方法之外,还提供了Arrays.copyOf方法用于拷贝,这两者都是浅拷贝。
基本数据类型数组

如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
public void test4() {
int[] lNumbers1 = new int[5];
int[] rNumbers1 = Arrays.copyOf(lNumbers1, lNumbers1.length);
rNumbers1[0] = 1;
boolean first = lNumbers1[0] == rNumbers1[0];
Log.d("小杨逗比", "lNumbers2[0]=" + lNumbers1[0] + ",rNumbers2[0]=" + rNumbers1[0]+"---"+first);

int[] lNumbers3 = new int[5];
int[] rNumbers3 = lNumbers3.clone();
rNumbers3[0] = 1;
boolean second = lNumbers3[0] == rNumbers3[0];
Log.d("小杨逗比", "lNumbers3[0]=" + lNumbers3[0] + ",rNumbers3[0]=" + rNumbers3[0]+"---"+second);
}

打印结果如下所示

1
2
2019-03-25 14:28:09.907 30316-30316/org.yczbj.ycrefreshview D/小杨逗比: lNumbers2[0]=0,rNumbers2[0]=1---false
2019-03-25 14:28:09.907 30316-30316/org.yczbj.ycrefreshview D/小杨逗比: lNumbers3[0]=0,rNumbers3[0]=1---false

引用数据类型数组

如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static void test5() {
People[] lNumbers1 = new People[5];
lNumbers1[0] = new People();
People[] rNumbers1 = lNumbers1;
boolean first = lNumbers1[0].equals(rNumbers1[0]);
Log.d("小杨逗比", "lNumbers1[0]=" + lNumbers1[0] + ",rNumbers1[0]=" + rNumbers1[0]+"--"+first);

People[] lNumbers2 = new People[5];
lNumbers2[0] = new People();
People[] rNumbers2 = Arrays.copyOf(lNumbers2, lNumbers2.length);
boolean second = lNumbers2[0].equals(rNumbers2[0]);
Log.d("小杨逗比", "lNumbers2[0]=" + lNumbers2[0] + ",rNumbers2[0]=" + rNumbers2[0]+"--"+second);

People[] lNumbers3 = new People[5];
lNumbers3[0] = new People();
People[] rNumbers3 = lNumbers3.clone();
boolean third = lNumbers3[0].equals(rNumbers3[0]);
Log.d("小杨逗比", "lNumbers3[0]=" + lNumbers3[0] + ",rNumbers3[0]=" + rNumbers3[0]+"--"+third);
}

public static class People implements Cloneable {

int age;
Holder holder;

@Override
protected Object clone() {
try {
return super.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return null;
}

public static class Holder {
int holderValue;
}
}

打印日志如下

1
2
3
2019-03-25 14:53:17.054 31093-31093/org.yczbj.ycrefreshview D/小杨逗比: lNumbers1[0]=org.yczbj.ycrefreshview.MainActivity$People@46a2c18,rNumbers1[0]=org.yczbj.ycrefreshview.MainActivity$People@46a2c18--true
2019-03-25 14:53:17.054 31093-31093/org.yczbj.ycrefreshview D/小杨逗比: lNumbers2[0]=org.yczbj.ycrefreshview.MainActivity$People@d344671,rNumbers2[0]=org.yczbj.ycrefreshview.MainActivity$People@d344671--true
2019-03-25 14:53:17.054 31093-31093/org.yczbj.ycrefreshview D/小杨逗比: lNumbers3[0]=org.yczbj.ycrefreshview.MainActivity$People@91e9c56,rNumbers3[0]=org.yczbj.ycrefreshview.MainActivity$People@91e9c56--true

集合的拷贝

  • 集合的拷贝也是我们平时经常会遇到的,一般情况下,我们都是用浅拷贝来实现,即通过构造函数或者clone方法。
集合浅拷贝

构造函数和 clone() 默认都是浅拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void test6() {
ArrayList<People> lPeoples = new ArrayList<>();
People people1 = new People();
lPeoples.add(people1);
Log.d("小杨逗比", "lPeoples[0]=" + lPeoples.get(0));
ArrayList<People> rPeoples = (ArrayList<People>) lPeoples.clone();
Log.d("小杨逗比", "rPeoples[0]=" + rPeoples.get(0));
boolean b = lPeoples.get(0).equals(rPeoples.get(0));
Log.d("小杨逗比", "比较两个对象" + b);
}

public static class People implements Cloneable {

int age;
Holder holder;

@Override
protected Object clone() {
try {
People people = (People) super.clone();
people.holder = (People.Holder) this.holder.clone();
return people;
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return null;
}

public static class Holder implements Cloneable {

int holderValue;

@Override
protected Object clone() {
try {
return super.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return null;
}
}
}

打印日志

1
2
3
2019-03-25 14:56:56.931 31454-31454/org.yczbj.ycrefreshview D/小杨逗比: lPeoples[0]=org.yczbj.ycrefreshview.MainActivity$People@46a2c18
2019-03-25 14:56:56.931 31454-31454/org.yczbj.ycrefreshview D/小杨逗比: rPeoples[0]=org.yczbj.ycrefreshview.MainActivity$People@46a2c18
2019-03-25 14:56:56.931 31454-31454/org.yczbj.ycrefreshview D/小杨逗比: 比较两个对象true
集合深拷贝

在某些特殊情况下,如果需要实现集合的深拷贝,那就要创建一个新的集合,然后通过深拷贝原先集合中的每个元素,将这些元素加入到新的集合当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void test7() {
ArrayList<People> lPeoples = new ArrayList<>();
People people1 = new People();
people1.holder = new People.Holder();
lPeoples.add(people1);
Log.d("小杨逗比", "lPeoples[0]=" + lPeoples.get(0));
ArrayList<People> rPeoples = new ArrayList<>();
for (People people : lPeoples) {
rPeoples.add((People) people.clone());
}
Log.d("小杨逗比", "rPeoples[0]=" + rPeoples.get(0));
boolean b = lPeoples.get(0).equals(rPeoples.get(0));
Log.d("小杨逗比", "比较两个对象" + b);
}

public static class People implements Cloneable {

int age;
Holder holder;

@Override
protected Object clone() {
try {
People people = (People) super.clone();
people.holder = (People.Holder) this.holder.clone();
return people;
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return null;
}

public static class Holder implements Cloneable {

int holderValue;

@Override
protected Object clone() {
try {
return super.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return null;
}
}
}

打印日志

1
2
3
2019-03-25 15:00:54.610 31670-31670/org.yczbj.ycrefreshview D/小杨逗比: lPeoples[0]=org.yczbj.ycrefreshview.MainActivity$People@46a2c18
2019-03-25 15:00:54.610 31670-31670/org.yczbj.ycrefreshview D/小杨逗比: rPeoples[0]=org.yczbj.ycrefreshview.MainActivity$People@d344671
2019-03-25 15:00:54.610 31670-31670/org.yczbj.ycrefreshview D/小杨逗比: 比较两个对象false

参考地址

前言

Java是一个安全的编程语言,它能最大程度的防止程序员犯一些低级的错误(大部分是和内存管理有关的)。但凡是不是绝对的,使用Unsafe程序员就可以操作内存,因此可能带来一个安全隐患。

这篇文章是就快速学习下sun.misc.Unsafe的公共API和一些有趣的使用例子。

Unsafe 实例化

在使用Unsafe之前我们需要先实例化它。但我们不能通过像Unsafe unsafe = new Unsafe()这种简单的方式来实现Unsafe的实例化,这是由于Unsafe的构造方法是私有的。Unsafe有一个静态的getUnsafe()方法,但是如果天真的以为调用该方法就可以的话,那你将遇到一个SecurityException异常,这是由于该方法只能在被信任的代码中调用。

1
2
3
4
5
6
public static Unsafe getUnsafe() {
Class cc = sun.reflect.Reflection.getCallerClass(2);
if (cc.getClassLoader() != null)
throw new SecurityException("Unsafe");
return theUnsafe;
}

那Java是如何判断我们的代码是否是受信的呢?它就是通过判断加载我们代码的类加载器是否是根类加载器。

我们可是通过这种方法将我们自己的代码变为受信的,使用jvm参数bootclasspath。如下所示:

1
java -Xbootclasspath:/usr/jdk1.7.0/jre/lib/rt.jar:. com.mishadoff.magic.UnsafeClient

但这种方式太难了

Unsafe类内部有一个名为theUnsafe的私有实例变量,我们可以通过反射来获取该实例变量。

1
2
3
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

注意: 忽略你的IDE提示. 例如, eclipse可能会报这样的错误”Access restriction…” 单如果你运行你的代码,会发现一切正常。如果还是还是提示错误,你可以通过如下的方式关闭该错误提示:

1
2
Preferences -> Java -> Compiler -> Errors/Warnings ->
Deprecated and restricted API -> Forbidden reference -> Warning

Unsafe API

sun.misc.Unsafe 由150个方法组成。事实上这些方法只有几组是非常重要的用来操作不同的对象。下面我们就来看下这些方法中的一部分。

  1. Info 仅仅是返回一个低级别的内存相关的信息
    • addressSize
    • pageSize
  2. Objects. 提供操作对象和对象字段的方法
    • allocateInstance
    • objectFieldOffset
  3. Classes. 提供针对类和类的静态字段操作的方法
    • staticFieldOffset
    • defineClass
    • defineAnonymousClass
    • ensureClassInitialized
  4. Arrays. 数组操作
    • arrayBaseOffset
    • arrayIndexScale
  5. Synchronization. 低级别的同步原语
    • monitorEnter
    • tryMonitorEnter
    • monitorExit
    • compareAndSwapInt
    • putOrderedInt
  6. Memory. 直接访问内存的方法
    • allocateMemory
    • copyMemory
    • freeMemory
    • getAddress
    • getInt
    • putInt

有趣的使用case

跳过构造初始化

allocateInstance方法可能是有用的,当你需要在构造函数中跳过对象初始化阶段或绕过安全检查又或者你想要实例化哪些没有提供公共构造函数的类时就可以使用该方法。考虑下面的类:

1
2
3
4
5
6
7
8
9
class A {
private long a; // not initialized value

public A() {
this.a = 1; // initialization
}

public long a() { return this.a; }
}

通过构造函数,反射,Unsafe分别来实例化该类结果是不同的:

1
2
3
4
5
6
7
8
A o1 = new A(); // constructor
o1.a(); // prints 1

A o2 = A.class.newInstance(); // reflection
o2.a(); // prints 1

A o3 = (A) unsafe.allocateInstance(A.class); // unsafe
o3.a(); // prints 0

思考一下这些确保对Singletons模式的影响。

内存泄露

对C程序员来说这中情况是很常见的。

思考一下一些简单的类是如何坚持访问规则的:

1
2
3
4
5
6
7
class Guard {
private int ACCESS_ALLOWED = 1;

public boolean giveAccess() {
return 42 == ACCESS_ALLOWED;
}
}

客户端代码是非常安全的,调用giveAccess()检查访问规则。不幸的是对所有的客户端代码,它总是返回false。只有特权用户在某种程度上可以改变ACCESS_ALLOWED常量并且获得访问权限。

事实上,这不是真的。这是证明它的代码:

1
2
3
4
5
6
7
8
9
Guard guard = new Guard();
guard.giveAccess(); // false, no access

// bypass
Unsafe unsafe = getUnsafe();
Field f = guard.getClass().getDeclaredField("ACCESS_ALLOWED");
unsafe.putInt(guard, unsafe.objectFieldOffset(f), 42); // memory corruption

guard.giveAccess(); // true, access granted

现在所有的客户端都没有访问限制了。

事实上同样的功能也可以通过反射来实现。但有趣的是, 通过上面的方式我们修改任何对象,即使我们没有持有对象的引用。

举个例子, 在内存中有另外的一个Guard对象,并且地址紧挨着当前对象的地址,我们就可以通过下面的代码来修改该对象的ACCESS_ALLOWED字段的值。

1
unsafe.putInt(guard, 16 + unsafe.objectFieldOffset(f), 42); // memory corruption

注意,我们没有使用任何指向该对象的引用,16是Guard对象在32位架构上的大小。我们也可以通过sizeOf方法来计算Guard对象的大小。

sizeOf

使用objectFieldOffset方法我们可以实现C风格的sizeof方法。下面的方法实现返回对象的表面上的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static long sizeOf(Object o) {
Unsafe u = getUnsafe();
HashSet<Field> fields = new HashSet<Field>();
Class c = o.getClass();
while (c != Object.class) {
for (Field f : c.getDeclaredFields()) {
if ((f.getModifiers() & Modifier.STATIC) == 0) {
fields.add(f);
}
}
c = c.getSuperclass();
}

// get offset
long maxSize = 0;
for (Field f : fields) {
long offset = u.objectFieldOffset(f);
if (offset > maxSize) {
maxSize = offset;
}
}

return ((maxSize/8) + 1) * 8; // padding
}

算法逻辑如下:收集所有包括父类在内的非静态字段,获得每个字段的偏移量,发现最大并添加填充。也许,我错过了一些东西,但是概念是明确的。

更简单的sizeof方法实现逻辑是:我们只读取该对象对应的class对象中关于大小的字段值。在JVM 1.7 32 位版本上该表示大小的字段偏移量是12。

1
2
3
4
public static long sizeOf(Object object){
return getUnsafe().getAddress(
normalize(getUnsafe().getInt(object, 4L)) + 12L);
}

normalize是一个将有符号的int类型转为无符号的long类型的方法。

1
2
3
4
private static long normalize(int value) {
if(value >= 0) return value;
return (~0L >>> 32) & value;
}

太棒了,这个方法返回的结果和我们之前的sizeof函数是相同的。

but it requires specifyng agent option in your JVM.

事实上,对于合适的,安全的,准确的sizeof函数最好使用java.lang.instrument包,但它需要特殊的JVM参数。

浅拷贝

在实现了计算对象浅层大小的基础上,我们可以非常容易的添加对象的拷贝方法。标准的办法需要修改我们的代码和Cloneable。或者你可以实现自定义的对象拷贝函数,但它不会变为通用的函数。

浅拷贝:

1
2
3
4
5
6
7
static Object shallowCopy(Object obj) {
long size = sizeOf(obj);
long start = toAddress(obj);
long address = getUnsafe().allocateMemory(size);
getUnsafe().copyMemory(start, address, size);
return fromAddress(address);
}

toAddressfromAddress 将对象转为它在内存中的地址或者从指定的地址内容转为对象。

1
2
3
4
5
6
7
8
9
10
11
12
static long toAddress(Object obj) {
Object[] array = new Object[] {obj};
long baseOffset = getUnsafe().arrayBaseOffset(Object[].class);
return normalize(getUnsafe().getInt(array, baseOffset));
}

static Object fromAddress(long address) {
Object[] array = new Object[] {null};
long baseOffset = getUnsafe().arrayBaseOffset(Object[].class);
getUnsafe().putLong(array, baseOffset, address);
return array[0];
}

该拷贝函数可以用来拷贝任何类型的对象,因为对象的大小是动态计算的。

注意 在完成拷贝动作后你需要将拷贝对象的类型强转为目标类型。

隐藏密码

在Unsafe的直接内存访问方法使用case中有一个非常有趣的用法就是删除内存中不想要的对象。

大多数获取用户密码的API方法的返回值不是byte[]就是char[],这是为什么呢?

这完全是出于安全原因, 因为我们可以在不需要它们的时候将数组元素置为失效。如果我们获取的密码是字符串类型,则密码字符串是作为一个对象保存在内存中的。要将该密码字符串置为无效,我们只能讲字符串引用职位null,但是该字符串的内容任然存在内存直到GC回收该对象后。

这个技巧在内存创建一个假的大小相同字符串对象来替换原来的:

1
2
3
4
5
6
7
8
9
10
String password = new String("l00k@myHor$e");
String fake = new String(password.replaceAll(".", "?"));
System.out.println(password); // l00k@myHor$e
System.out.println(fake); // ????????????

getUnsafe().copyMemory(
fake, 0L, null, toAddress(password), sizeOf(password));

System.out.println(password); // ????????????
System.out.println(fake); // ????????????

感觉安全了吗?

其实该方法不是真的安全。想要真的安全我们可以通过反射API将字符串对象中的字符数组value字段的值修改为null。

1
2
3
4
5
6
Field stringValue = String.class.getDeclaredField("value");
stringValue.setAccessible(true);
char[] mem = (char[]) stringValue.get(password);
for (int i=0; i < mem.length; i++) {
mem[i] = '?';
}

多重继承

在Java中本来是没有多重集成的。除非我们可以将任意的类型转为我们想要的任意类型。

1
2
3
long intClassAddress = normalize(getUnsafe().getInt(new Integer(0), 4L));
long strClassAddress = normalize(getUnsafe().getInt("", 4L));
getUnsafe().putAddress(intClassAddress + 36, strClassAddress);

这段代码将String类添加到Integer的超类集合中,所以我们的强转代码是没有运行时异常的。

1
(String) (Object) (new Integer(666))

有个问题是我们需要先将要转的对象转为Object,然后再转为我们想要的类型。这是为了欺骗编译器。

动态类

We can create classes in runtime, for example from compiled .class file. To perform that read class contents to byte array and pass it properly to defineClass method.

我们可以在运行时创建类, 例如通过一个编译好的class文件。将class文件的内容读入到字节数组中然后将该数组传递到合适的defineClass方法中。

1
2
3
4
byte[] classContents = getClassContent();
Class c = getUnsafe().defineClass(
null, classContents, 0, classContents.length);
c.getMethod("a").invoke(c.newInstance(), null); // 1

读取class文件内如的代码:

1
2
3
4
5
6
7
8
private static byte[] getClassContent() throws Exception {
File f = new File("/home/mishadoff/tmp/A.class");
FileInputStream input = new FileInputStream(f);
byte[] content = new byte[(int)f.length()];
input.read(content);
input.close();
return content;
}

该方式是非常有用的,如果你确实需要在运行时动态的创建类。比如生产代理类或切面类。

抛出一个异常

不喜欢受检异常?这不是问题。

1
getUnsafe().throwException(new IOException());

该方法抛出一个受检异常,但是你的代码不需要强制捕获该异常就像运行时异常一样。

快速序列化

这种使用方式更实用。

每个人都知道java标准的序列化的功能速度很慢而且它还需要类拥有公有的构造函数。

外部序列化是更好的方式,但是需要定义针对待序列化类的schema。

非常流行的高性能序列化库,像kryo是有使用限制的,比如在内存缺乏的环境就不合适。

但通过使用Unsafe类我们可以非常简单的实现完整的序列化功能。

序列化

  • 通过反射定义类的序列化。 这个可以只做一次。
  • 通过Unsafe的getLong, getInt, getObject等方法获取字段真实的值。
  • 添加可以恢复该对象的标识符。
  • 将这些数据写入到输出

当然也可以使用压缩来节省空间。

反序列化:

  • 创建一个序列化类的实例,可以通过方法allocateInstance。因为该方法不需要任何构造方法。
  • 创建schama, 和序列化类似
  • 从文件或输入读取或有的字段
  • 使用 UnsafeputLong, putInt, putObject等方法来填充对象。

Actually, there are much more details in correct inplementation, but intuition is clear.

事实上要正确实现序列化和反序列化需要注意很多细节,但是思路是清晰的。

这种序列化方式是非常快的。

顺便说一句,在 kryo 有许多使用Unsafe的尝试 http://code.google.com/p/kryo/issues/detail?id=75

大数组

如你所知Java数组长度的最大值是Integer.MAX_VALUE。使用直接内存分配我们可以创建非常大的数组,该数组的大小只受限于堆的大小。

这里有一个SuperArray的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SuperArray {
private final static int BYTE = 1;

private long size;
private long address;

public SuperArray(long size) {
this.size = size;
address = getUnsafe().allocateMemory(size * BYTE);
}

public void set(long i, byte value) {
getUnsafe().putByte(address + i * BYTE, value);
}

public int get(long idx) {
return getUnsafe().getByte(address + idx * BYTE);
}

public long size() {
return size;
}
}

一个简单的用法:

1
2
3
4
5
6
7
8
long SUPER_SIZE = (long)Integer.MAX_VALUE * 2;
SuperArray array = new SuperArray(SUPER_SIZE);
System.out.println("Array size:" + array.size()); // 4294967294
for (int i = 0; i < 100; i++) {
array.set((long)Integer.MAX_VALUE + i, (byte)3);
sum += array.get((long)Integer.MAX_VALUE + i);
}
System.out.println("Sum of 100 elements:" + sum); // 300

事实上该技术使用了非堆内存off-heap memory,在 java.nio 包中也有使用。

通过这种方式分配的内存不在堆上,并且不受GC管理。因此需要小心使用Unsafe.freeMemory()。该方法不会做任何边界检查,因此任何不合法的访问可能就会导致JVM奔溃。

这种使用方式对于数学计算是非常有用的,因为代码可以操作非常大的数据数组。 同样的编写实时程序的程序员对此也非常感兴趣,因为不受GC限制,就不会因为GC导致非常大的停顿。

并发

关于并发编程使用Unsafe的只言片语。compareAndSwap 方法是原子的,可以用来实现高性能的无锁化数据结构。

举个例子,多个线程并发的更新共享的对象这种场景:

首先我们定义一个简单的接口 Counter:

1
2
3
4
interface Counter {
void increment();
long getCounter();
}

我们定义工作线程 CounterClient, 它会使用 Counter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class CounterClient implements Runnable {
private Counter c;
private int num;

public CounterClient(Counter c, int num) {
this.c = c;
this.num = num;
}

@Override
public void run() {
for (int i = 0; i < num; i++) {
c.increment();
}
}
}

这是测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
int NUM_OF_THREADS = 1000;
int NUM_OF_INCREMENTS = 100000;
ExecutorService service = Executors.newFixedThreadPool(NUM_OF_THREADS);
Counter counter = ... // creating instance of specific counter
long before = System.currentTimeMillis();
for (int i = 0; i < NUM_OF_THREADS; i++) {
service.submit(new CounterClient(counter, NUM_OF_INCREMENTS));
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
long after = System.currentTimeMillis();
System.out.println("Counter result: " + c.getCounter());
System.out.println("Time passed in ms:" + (after - before));

第一个实现-没有同步的计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
class StupidCounter implements Counter {
private long counter = 0;

@Override
public void increment() {
counter++;
}

@Override
public long getCounter() {
return counter;
}
}

Output:

1
2
Counter result: 99542945
Time passed in ms: 679

速度很多,但是没有对所有的线程进行协调所以结果是错误的。第二个版本,使用Java常见的同步方式来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
class SyncCounter implements Counter {
private long counter = 0;

@Override
public synchronized void increment() {
counter++;
}

@Override
public long getCounter() {
return counter;
}
}

Output:

1
2
Counter result: 100000000
Time passed in ms: 10136

彻底的同步当然会导致正确的结果。但是花费的时间令人沮丧。让我们试试 ReentrantReadWriteLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LockCounter implements Counter {
private long counter = 0;
private WriteLock lock = new ReentrantReadWriteLock().writeLock();

@Override
public void increment() {
lock.lock();
counter++;
lock.unlock();
}

@Override
public long getCounter() {
return counter;
}
}

Output:

1
2
Counter result: 100000000
Time passed in ms: 8065

结果依然是正确的,时间也短。那使用原子的类呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
class AtomicCounter implements Counter {
AtomicLong counter = new AtomicLong(0);

@Override
public void increment() {
counter.incrementAndGet();
}

@Override
public long getCounter() {
return counter.get();
}
}

Output:

1
2
Counter result: 100000000
Time passed in ms: 6552

使用AtomicCounter的效果更好一点。最后我们试试Unsafe的原子方法compareAndSwapLong看看是不是更进一步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CASCounter implements Counter {
private volatile long counter = 0;
private Unsafe unsafe;
private long offset;

public CASCounter() throws Exception {
unsafe = getUnsafe();
offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
}

@Override
public void increment() {
long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
before = counter;
}
}

@Override
public long getCounter() {
return counter;
}
}

Output:

1
2
Counter result: 100000000
Time passed in ms: 6454

开起来和使用原子类是一样的效果,难道原子类使用了Unsafe?答案是YES。

事实上该例子非常简单但表现出了Unsafe的强大功能。

就像前面提到的 CAS原语可以用来实现高效的无锁数据结构。实现的原理很简单:

  • 拥有一个状态
  • 创建一个它的副本
  • 修改该副本
  • 执行 CAS 操作
  • 如果失败就重复执行

事实上,在真实的环境它的实现难度超过你的想象,这其中有需要类似ABA,指令重排序这样的问题。

如果你确实对此感兴趣,你可以参考关于无锁HashMap的精彩演示。

Bonus

Documentation for park method from Unsafe class contains longest English sentence I’ve ever seen:

Block current thread, returning when a balancing unpark occurs, or a balancing unpark has already occurred, or the thread is interrupted, or, if not absolute and time is not zero, the given time nanoseconds have elapsed, or if absolute, the given deadline in milliseconds since Epoch has passed, or spuriously (i.e., returning for no “reason”). Note: This operation is in the Unsafe class only because unpark is, so it would be strange to place it elsewhere.

结论

尽管Unsafe有这么多有用的应用,但是尽力不要使用。当然了使用JDK中利用了Unsafe实现的类是可以的。或者你对你代码功力非常自信。

参考地址

Volatile简介

volatile被称为轻量级的synchronized,运行时开销比synchronized更小,在多线程并发编程中发挥着同步共享变量禁止处理器重排序的重要作用。建议在学习volatie之前,先看一下Java内存模型《什么是Java内存模型?》,因为volatile和Java内存模型有着莫大的关系。

Java内存模型

在学习volatie之前,需要补充下Java内存模型的相关(JMM)知识,我们知道Java线程的所有操作都是在工作区进行的,那么工作区和主存之间的变量是怎么进行交互的呢,可以用下面的图来表示。

http://static.cyblogs.com/e0e01e43ly1g186enjfwfj20k80degmr.jpg
Java通过几种原子操作完成工作区内存主存的交互

  1. lock:作用于主存,把变量标识为线程独占状态。
  2. unlock:作用于主存,解除变量的独占状态。
  3. read:作用于主存,把一个变量的值通过主存传输到线程的工作区内存。
  4. load:作用于工作区内存,把read操作传过来的变量值储存到工作区内存的变量副本中。
  5. use:作用于工作内存,把工作区内存的变量副本传给执行引擎。
  6. assign:作用于工作区内存,把从执行引擎传过来的值赋值给工作区内存的变量副本。
  7. store:作用于工作区内存,把工作区内存的变量副本传给主存。
  8. write:作用于主存,把store操作传过来的值赋值给主存变量。

8个操作每个操作都是原子性的,但是几个操作连着一起就不是原子性了!

volatile原理

上面介绍了Java模型的8个操作,那么这8个操作和volatile又有着什么关系呢。

volatile的可见性

什么是可见性,用一个例子来解释,先看一段代码,加入线程1先执行,线程2再执行

1
2
3
4
5
6
7
8
//线程1
boolean stop = false;
while (!stop) {
do();
}

//线程2
stop = true;

线程1执行后会进入到一个死循环中,当线程2执行后,线程1的死循环就一定会马上结束吗?答案是不一定,因为线程2执行完stop = true后,并不会马上将变量stop的值true写回主存中,也就是上图中的assign执行完成之后,storewrite并不会随着执行,线程1没有立即将修改后的变量的值更新到主存中,即使线程2及时将变量stop的值写回主存中了,线程1也没有了解到变量stop的值已被修改而去主存中重新获取,也就是线程1loadread操作并不会马上执行造成线程1的工作区内存中的变量副本不是最新的。这两个原因造成了线程1的死循环也就不会马上结束。
那么如何避免上诉的问题呢?我们可以使用volatile关键字修饰变量stop,如下

1
2
3
4
5
6
7
8
//线程1
volatile boolean stop = false;
while (!stop) {
do();
}

//线程2
stop = true;

这样线程1每次读取变量stop的时候都会先去主存中获取变量stop最新的值,线程2每次修改变量stop的值之后都会马上将变量的值写回主存中,这样也就不会出现上述的问题了。

那么关键字volatie是如何做到的呢?volatie规定了上述8个操作的规则

  1. 只有当线程对变量执行的前一个操作load时,线程才能对变量执行use操作;只有线程的后一个操作是use时,线程才能对变量执行load操作。即规定了useloadread三个操作之间的约束关系,规定这三个操作必须连续的出现,保证了线程每次读取变量的值前都必须去主存获取最新的值
  2. 只有当前程对变量执行的前一个操作assign时,线程才能对变量执行store操作;只有线程的后一个操作是store时,线程才能对变量执行assign操作,即规定了assignstorewrite三个操作之间的约束关系,规定了这三个操作必须连续的出现,保证线程每次修改变量后都必须将变量的值写回主存

volatile的这两个规则,也正是保证了共享变量的可见性

volatile的有序性

有序性即程序执行的顺序按照代码的先后顺序执行,Java内存模型(JMM)允许编译器和处理器对指令进行重排序,但是规定了as-if-serial语义,即保证单线程情况下不管怎么重排序,程序的结果不能改变,如

1
2
3
double pi = 3.14;  //A
double r = 1; //B
double s = pi * r * r; //C

上面的代码可能按照A->B->C顺序执行,也有可能按照B->A->C顺序执行,这两种顺序都不会影响程序的结果。但是不会以C->A(B)->B(A)的顺序去执行,因为C语句是依赖于AB的,如果按照这样的顺序去执行就不能保证结果不变了(违背了as-if-serial)。

上面介绍的是单线程的执行,不管指令怎么重排序都不会影响结果,但是在多线程下就会出现问题了。
下面看个例子

1
2
3
4
5
6
7
8
9
10
11
12
double pi = 3.14;
double r = 0;
double s = 0;
boolean start = false;
//线程1
r = 10; //A
start = true; //B

//线程2
if (start) { //C
s = pi * r * r; //D
}

线程1和线程2同时执行,线程1AB的执行顺序可能是A->B或者B->A(因为A和B之间没有依赖关系,可以指令重排序)。如果线程1按照A->B的顺序执行,那么线程2执行后的结果s就是我们想要的正确结果,如果线程1按照B->A的顺序执行,那么线程2执行后的结果s可能就不是我们想要的结果了,因为线程1将变量stop的值修改为true后,线程2马上获取到stoptrue然后执行C语句,然后执行D语句即s = 3.14 * 0 * 0,然后线程1再执行B语句,那么结果就是有问题了。

那么为了解决这个问题,我们可以在变量true加上关键字volatile

1
2
3
4
5
6
7
8
9
10
11
12
double pi = 3.14;
double r = 0;
double s = 0;
volatile boolean start = false;
//线程1
r = 10; //A
start = true; //B

//线程2
if (start) { //C
s = pi * r * r; //D
}

这样线程1的执行顺序就只能是A->B了,因为关键字发挥了禁止处理器指令重排序的作用,所以线程2的执行结果就不会有问题了。

那么volatile是怎么实现禁止处理器重排序的呢?
编译器会在编译生成字节码的时候,在加有volatile关键字的变量的指令进行插入内存屏障来禁止特定类型的处理器重排序
我们先看内存屏障有哪些及发挥的作用
http://static.cyblogs.com/e0e01e43ly1g191y6o3paj21620f5qd7.jpg

  1. StoreStore屏障:禁止屏障上面变量的写和下面所有进行写的变量进行处理器重排序。
  2. StoreLoad屏障:禁止屏障上面变量的写和下面所有进行读的变量进行处理器重排序。
  3. LoadLoad屏障:禁止屏障上面变量的读和下面所有进行读的变量进行处理器重排序。
  4. LoadStore屏障:禁止屏障上面变量的读和下面所有进行写的变量进行处理器重排序。

再看volatile是怎么插入屏障的

  1. 在每个volatile变量的写前面插入一个StoreStore屏障。
  2. 在每个volatile变量的写后面插入一个StoreLoad屏障。
  3. 在每个volatile变量的读后面插入一个LoadLoad屏障。
  4. 在每个volatile变量的读后面插入一个LoadStore屏障。

注意:写操作是在volatile前后插入一个内存屏障,而读操作是在后面插入两个内存屏障。

http://static.cyblogs.com/e0e01e43ly1g186ext5z1j20h809vjsv.jpg

volatile变量通过插入内存屏障禁止了处理器重排序,从而解决了多线程环境下处理器重排序的问题

volatile有没有原子性?

上面分别介绍了volatile的可见性和有序性,那么volatile有原子性吗?我们先看一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test {
public volatile int inc = 0;

public void increase() {
inc++;
}

public static void main(String[] args) {
final Test test = new Test();
for(int i=0;i<10;i++){
new Thread(){
public void run() {
for(int j=0;j<1000;j++)
test.increase();
};
}.start();
}

while(Thread.activeCount()>1) //保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}

我们开启10个线程对volatile变量进行自增操作,每个线程对volatile变量执行1000次自增操作,那结果变量inc会是10000吗?答案是,变量inc的值基本都是小于10000
可能你会有疑问,volatile变量inc不是保证了共享变量的可见性了吗,每次线程读取到的都是最新的值,是的没错,但是线程每次将值写回主存的时候并不能保证主存中的值没有被其他的线程修过过

http://static.cyblogs.com/WX20200212-225100@2x.png

如果所示:线程1在主存中获取了i的最新值(i=1),线程2也在主存中获取了i的最新值(i=1,注意这时候线程1并未对变量i进行修改,所以i的值还是1)),然后线程2将i自增后写回主存,这时候主存中i=2,到这里还没有问题,然后线程1又对i进行了自增写回了主存,这时候主存中i=2,也就是对i做了2次自增操作,结果i的结果只自增了1,问题就出来了这里。

为什么会有这个问题呢,前面我们提到了Java内存模型和主存之间交互的8个操作都是原子性的,但是他们的操作连在一起就不是原子性了,而volatile关键字也只是保证了useloadread三个操作连在一起时候的原子性,还有assignstorewrite这三个操作连在一起时候的原子性,也就是volatile关键字保证了变量读操作的原子性和写操作的原子性,而变量的自增过程需要对变量进行读和写两个过程,而这两个过程连在一起就不是原子性操作了。

所以说volatile变量对于变量的单独写操作/读操作是保证了原子性的,而常说的原子性包括读写操作连在一起,所以说对于volatile不保证原子性的。那么如何解决上面程序的问题呢?只能给increase方法加锁,让在多线程情况下只有一个线程能执行increase方法,也就是保证了一个线程对变量的读写是原子性的。当然还有个更优的方案,就是利用读写都为原子性的CAS,利用CASvolatile进行操作,既解决了volatile不保证原子性的问题,同时消耗也没加锁的方式大

volatile和CAS

学完volatile之后,是不是觉得volatileCAS有种似曾相识的感觉?那它们之间有什么关系或者区别呢。

  1. volatile只能保证共享变量的读和写操作单个操作的原子性,而CAS保证了共享变量的读和写两个操作一起的原子性(即CAS是原子性操作的)。
  2. volatile的实现基于JMM,而CAS的实现基于硬件。

参考

前几篇文章讲述了IO的几种模式及netty的基本概念,netty基于多路复用模型下的reactor模式,对 大量连接、单个处理短且快 的场景很适用 。

那在往底层思考,linux对于IO又是如何处理的呢?

C10K 问题

http://www.52im.net/thread-566-1-1.html

最初的服务器都是基于进程/线程模型的,新到来一个TCP连接,就需要分配1个进程(或者线程)。而进程又是操作系统最昂贵的资源,一台机器无法创建很多进程。如果是C10K就要创建1万个进程,那么单机而言操作系统是无法承受的(往往出现效率低下甚至完全瘫痪)。如果是采用分布式系统,维持1亿用户在线需要10万台服务器,成本巨大。基于上述考虑,如何突破单机性能局限,是高性能网络编程所必须要直面的问题。这些局限和问题最早被Dan Kegel 进行了归纳和总结,并首次成系统地分析和提出解决方案,后来这种普遍的网络现象和技术局限都被大家称为 C10K 问题。

C10K 问题的最大特点是:设计不够良好的程序,其性能和连接数及机器性能的关系往往是非线性的

举个例子:如果没有考虑过 C10K 问题,一个经典的基于 select 的程序能在旧服务器上很好处理 1000 并发的吞吐量,它在 2 倍性能新服务器上往往处理不了并发 2000 的吞吐量。这是因为在策略不当时,大量操作的消耗和当前连接数 n 成线性相关。会导致单个任务的资源消耗和当前连接数的关系会是 O(n)。而服务程序需要同时对数以万计的socket 进行 I/O 处理,积累下来的资源消耗会相当可观,这显然会导致系统吞吐量不能和机器性能匹配。

以上这就是典型的C10K问题在技术层面的表现。C10K问题本质上是操作系统的问题。对于Web1.0/2.0时代的操作系统而言, 传统的同步阻塞I/O模型都是一样的,处理的方式都是requests per second,并发10K和100的区别关键在于CPU。创建的进程线程多了,数据拷贝频繁(缓存I/O、内核将数据拷贝到用户进程空间、阻塞), 进程/线程上下文切换消耗大, 导致操作系统崩溃,这就是C10K问题的本质!
可见,解决C10K问题的关键就是尽可能减少这些CPU等核心计算资源消耗,从而榨干单台服务器的性能,突破C10K问题所描述的瓶颈。

概念说明

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

处在内核空间称为内核态,用户空间称为用户态! 内核的权限远大于用户空间权限,硬件、IO等等系统操作只能通过内核调用!

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态, 此时是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。I/O的socket操作也是一种文件描述符fd
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缺点:数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

I/O 多路复用之select、poll、epoll详解

详细部分可以参阅:
https://segmentfault.com/a/1190000003063859;
https://blog.csdn.net/wxy941011/article/details/80274233

select,poll,epoll都是IO多路复用的机制。**一个进程可以监视多个描述符,**一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。 本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

Netty与redis(单线程的下的I/O多路复用) 使用epoll模式。

select/poll的几大缺点
  1. select的本质是采用32个整数的32位,即32*32= 1024来标识,fd值为1-1024。

    (总结: 句柄上限 + 重复初始化 + 逐个排查所有文件句柄状态效率不高)

    1. 当fd的值超过1024限制时,就必须修改FD_SETSIZE(管理的句柄上限)的大小,这个时候就可以标识32*max值范围的fd。
    2. 在使用上,因为只有一个字段记录关注和发生事件,每次调用之前要重新初始化 fd_set 结构体。
    3. select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
  2. poll主要解决 select 的前两个问题,但还是得逐个排查所有文件句柄状态效率不高:

    1. 通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制。
    2. pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
  3. select/poll 将这个fd列表维持在用户态, 每次调用时都需要把fd集合从用户态拷贝到内核态, 并在内核中遍历传递进来的所有fd; 返回的的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件

epoll

epoll是poll的一种优化,在内核中维持了fd的列表,只遍历发生事件的fd集合
与poll/select不同,epoll不再是一个单独的系统调用,而是由epoll_create/epoll_ctl/epoll_wait三个系统调用组成,epoll在2.6以后的内核才支持。

综合的来说:

epoll在内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分。连接的套接字(socket句柄)是采用红黑树的结构存储在内核cache中的,并给内核中断处理程序注册一个回调函数,告诉内核:如果这个句柄的中断到了,就把它放到准备就绪list链表里。当有事件准备就绪时,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了(epoll的基础是回调)。当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可;有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。

1)调用epoll_create建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源, 创建了红黑树和就绪链表)
2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字 (如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据)
3)调用epoll_wait收集发生的事件的连接 (立刻返回准备就绪链表里的数据)

两种模式LT和ET

ET是边缘触发,LT是水平触发,一个表示只有在变化的边际触发,一个表示在某个阶段都会触发。

当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回这个句柄。(从上面这段,可以看出,LT还有个回放的过程,低效了)

场景假设

eg.有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?

在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。处理的流越多,每一次无差别轮询时间就越长!

参考地址

推荐一个咕泡学院的视频资源:链接:https://pan.baidu.com/s/1SmSzrmfgbm6XgKZO7utKWg 密码:e54x

先回答一下之前发布的《使用HashMap的时候小心点》同学不补充的问题,说最好说下HashMap在JDK8下是怎么解决死循环的问题的。

链表部分对应上面 transfer 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}

由于扩容是按两倍进行扩,即 N 扩为 N + N,因此就会存在低位部分 0 - (N-1),以及高位部分 N - (2N-1), 所以这里分为 loHead (low Head) 和 hiHead (high head)。

通过上面的分析,不难发现循环的产生是因为新链表的顺序跟旧的链表是完全相反的,所以只要保证建新链时还是按照原来的顺序的话就不会产生循环。

JDK8是用 headtail 来保证链表的顺序和之前一样,这样就不会产生循环引用。

传统 HashMap 的缺点

JDK 1.8 以前 HashMap 的实现是 数组+链表,即使哈希函数取得再好,也很难达到元素百分百均匀分布。

当 HashMap 中有大量的元素都存放到同一个桶中时,这个桶下有一条长长的链表,这个时候 HashMap 就相当于一个单链表,假如单链表有 n 个元素,遍历的时间复杂度就是 O(n),完全失去了它的优势。

针对这种情况,JDK 1.8 中引入了 红黑树(查找时间复杂度为 O(logn))来优化这个问题。

新增的数据结构 – 红黑树

http://static.cyblogs.com/adc0e45d0065227fa3b6f01acad60528.png

JDK 1.8 中 HashMap 中除了链表节点:

1
2
3
4
5
6
7
8
9
10
11
static class Node implements Map.Entry {
//哈希值,就是位置
final int hash;
//键
final K key;
//值
V value;
//指向下一个几点的指针
Node next;
//...
}

还有另外一种节点:TreeNode,它是 1.8 新增的,属于数据结构中的 红黑树(不了解红黑树的同学可以 点击这里了解红黑树):

1
2
3
4
5
6
7
static final class TreeNode extends LinkedHashMap.Entry {
TreeNode parent; // red-black tree links
TreeNode left;
TreeNode right;
TreeNode prev; // needed to unlink next upon deletion
boolean red;
}

可以看到就是个红黑树节点,有父亲、左右孩子、前一个元素的节点,还有个颜色值。

另外由于它继承自 LinkedHashMap.Entry ,而 LinkedHashMap.Entry 继承自 HashMap.Node ,因此还有额外的 6 个属性:

1
2
3
4
5
6
7
8
//继承 LinkedHashMap.Entry 的
Entry before, after;

//HashMap.Node 的
final int hash;
final K key;
V value;
Node next;

红黑树的三个关键参数

HashMap 中有三个关于红黑树的关键参数:

  • TREEIFY_THRESHOLD
  • UNTREEIFY_THRESHOLD
  • MIN_TREEIFY_CAPACITY

值及作用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//一个桶的树化阈值
//当桶中元素个数超过这个值时,需要使用红黑树节点替换链表节点
//这个值必须为 8,要不然频繁转换效率也不高
static final int TREEIFY_THRESHOLD = 8;

//一个树的链表还原阈值
//当扩容时,桶中元素个数小于这个值,就会把树形的桶元素 还原(切分)为链表结构
//这个值应该比上面那个小,至少为 6,避免频繁转换
static final int UNTREEIFY_THRESHOLD = 6;

//哈希表的最小树形化容量
//当哈希表中的容量大于这个值时,表中的桶才能进行树形化
//否则桶内元素太多时会扩容,而不是树形化
//为了避免进行扩容、树形化选择的冲突,这个值不能小于 4 * TREEIFY_THRESHOLD
static final int MIN_TREEIFY_CAPACITY = 64;

新增的操作:桶的树形化 treeifyBin()

Java 8 中,如果一个桶中的元素个数超过 TREEIFY_THRESHOLD(默认是 8 ),就使用红黑树来替换链表,从而提高速度。

这个替换的方法叫 treeifyBin() 即树形化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//将桶内所有的 链表节点 替换成 红黑树节点
final void treeifyBin(Node[] tab, int hash) {
int n, index; Node e;
//如果当前哈希表为空,或者哈希表中元素的个数小于 进行树形化的阈值(默认为 64),就去新建/扩容
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
resize();
else if ((e = tab[index = (n - 1) & hash]) != null) {
//如果哈希表中的元素个数超过了 树形化阈值,进行树形化
// e 是哈希表中指定位置桶里的链表节点,从第一个开始
TreeNode hd = null, tl = null; //红黑树的头、尾节点
do {
//新建一个树形节点,内容和当前链表节点 e 一致
TreeNode p = replacementTreeNode(e, null);
if (tl == null) //确定树头节点
hd = p;
else {
p.prev = tl;
tl.next = p;
}
tl = p;
} while ((e = e.next) != null);
//让桶的第一个元素指向新建的红黑树头结点,以后这个桶里的元素就是红黑树而不是链表了
if ((tab[index] = hd) != null)
hd.treeify(tab);
}
}


TreeNode replacementTreeNode(Node p, Node next) {
return new TreeNode<>(p.hash, p.key, p.value, next);
}

上述操作做了这些事:

  • 根据哈希表中元素个数确定是扩容还是树形化
  • 如果是树形化
    • 遍历桶中的元素,创建相同个数的树形节点,复制内容,建立起联系
    • 然后让桶第一个元素指向新建的树头结点,替换桶的链表内容为树形内容

但是我们发现,之前的操作并没有设置红黑树的颜色值,现在得到的只能算是个二叉树。在 最后调用树形节点 hd.treeify(tab) 方法进行塑造红黑树,来看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
   final void treeify(Node[] tab) {
TreeNode root = null;
for (TreeNode x = this, next; x != null; x = next) {
next = (TreeNode)x.next;
x.left = x.right = null;
if (root == null) { //头回进入循环,确定头结点,为黑色
x.parent = null;
x.red = false;
root = x;
}
else { //后面进入循环走的逻辑,x 指向树中的某个节点
K k = x.key;
int h = x.hash;
Class kc = null;
//又一个循环,从根节点开始,遍历所有节点跟当前节点 x 比较,调整位置,有点像冒泡排序
for (TreeNode p = root;;) {
int dir, ph; //这个 dir
K pk = p.key;
if ((ph = p.hash) > h) //当比较节点的哈希值比 x 大时, dir 为 -1
dir = -1;
else if (ph < h) //哈希值比 x 小时 dir 为 1
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
// 如果比较节点的哈希值、 x
dir = tieBreakOrder(k, pk);

//把 当前节点变成 x 的父亲
//如果当前比较节点的哈希值比 x 大,x 就是左孩子,否则 x 是右孩子
TreeNode xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
root = balanceInsertion(root, x);
break;
}
}
}
}
moveRootToFront(tab, root);
}

可以看到,将二叉树变为红黑树时,需要保证有序。这里有个双重循环,拿树中的所有节点和当前节点的哈希值进行对比(如果哈希值相等,就对比键,这里不用完全有序),然后根据比较结果确定在树种的位置。

新增的操作: 红黑树中添加元素 putTreeVal()

上面介绍了如何把一个桶中的链表结构变成红黑树结构。

在添加时,如果一个桶中已经是红黑树结构,就要调用红黑树的添加元素方法 putTreeVal()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
final TreeNode putTreeVal(HashMap map, Node[] tab,
int h, K k, V v) {
Class kc = null;
boolean searched = false;
TreeNode root = (parent != null) ? root() : this;
//每次添加元素时,从根节点遍历,对比哈希值
for (TreeNode p = root;;) {
int dir, ph; K pk;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (k != null && k.equals(pk)))
//如果当前节点的哈希值、键和要添加的都一致,就返回当前节点(奇怪,不对比值吗?)
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0) {
//如果当前节点和要添加的节点哈希值相等,但是两个节点的键不是一个类,只好去挨个对比左右孩子
if (!searched) {
TreeNode q, ch;
searched = true;
if (((ch = p.left) != null &&
(q = ch.find(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.find(h, k, kc)) != null))
//如果从 ch 所在子树中可以找到要添加的节点,就直接返回
return q;
}
//哈希值相等,但键无法比较,只好通过特殊的方法给个结果
dir = tieBreakOrder(k, pk);
}

//经过前面的计算,得到了当前节点和要插入节点的一个大小关系
//要插入的节点比当前节点小就插到左子树,大就插到右子树
TreeNode xp = p;
//这里有个判断,如果当前节点还没有左孩子或者右孩子时才能插入,否则就进入下一轮循环
if ((p = (dir <= 0) ? p.left : p.right) == null) {
Node xpn = xp.next;
TreeNode x = map.newTreeNode(h, k, v, xpn);
if (dir <= 0)
xp.left = x;
else
xp.right = x;
xp.next = x;
x.parent = x.prev = xp;
if (xpn != null)
((TreeNode)xpn).prev = x;
//红黑树中,插入元素后必要的平衡调整操作
moveRootToFront(tab, balanceInsertion(root, x));
return null;
}
}
}

//这个方法用于 a 和 b 哈希值相同但是无法比较时,直接根据两个引用的地址进行比较
//这里源码注释也说了,这个树里不要求完全有序,只要插入时使用相同的规则保持平衡即可
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null ||
(d = a.getClass().getName().
compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
-1 : 1);
return d;
}

通过上面的代码可以知道,HashMap 中往红黑树中添加一个新节点 n 时,有以下操作:

  • 从根节点开始遍历当前红黑树中的元素 p,对比 n 和 p 的哈希值;
  • 如果哈希值相等并且键也相等,就判断为已经有这个元素(这里不清楚为什么不对比值);
  • 如果哈希值就通过其他信息,比如引用地址来给个大概比较结果,这里可以看到红黑树的比较并不是很准确,注释里也说了,只是保证个相对平衡即可;
  • 最后得到哈希值比较结果后,如果当前节点 p 还没有左孩子或者右孩子时才能插入,否则就进入下一轮循环;
  • 插入元素后还需要进行红黑树例行的平衡调整,还有确保根节点的领先地位。

新增的操作: 红黑树中查找元素 getTreeNode()

HashMap 的查找方法是 get():

1
2
3
4
public V get(Object key) {
Node e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

它通过计算指定 key 的哈希值后,调用内部方法 getNode();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node getNode(int hash, Object key) {
Node[] tab; Node first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

这个 getNode() 方法就是根据哈希表元素个数与哈希值求模(使用的公式是 (n - 1) &hash)得到 key 所在的桶的头结点,如果头节点恰好是红黑树节点,就调用红黑树节点的 getTreeNode() 方法,否则就遍历链表节点。

1
2
3
final TreeNode getTreeNode(int h, Object k) {
return ((parent != null) ? root() : this).find(h, k, null);
}

getTreeNode 方法使通过调用树形节点的 find() 方法进行查找:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//从根节点根据 哈希值和 key 进行查找
final TreeNode find(int h, Object k, Class kc) {
TreeNode p = this;
do {
int ph, dir; K pk;
TreeNode pl = p.left, pr = p.right, q;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (k != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.find(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
return null;
}

由于之前添加时已经保证这个树是有序的,因此查找时基本就是折半查找,效率很高。

这里和插入时一样,如果对比节点的哈希值和要查找的哈希值相等,就会判断 key 是否相等,相等就直接返回(也没有判断值哎);不相等就从子树中递归查找。

新增的操作: 树形结构修剪 split()

HashMap 中, resize() 方法的作用就是初始化或者扩容哈希表。当扩容时,如果当前桶中元素结构是红黑树,并且元素个数小于链表还原阈值 UNTREEIFY_THRESHOLD (默认为 6),就会把桶中的树形结构缩小或者直接还原(切分)为链表结构,调用的就是 split():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//参数介绍
//tab 表示保存桶头结点的哈希表
//index 表示从哪个位置开始修剪
//bit 要修剪的位数(哈希值)
final void split(HashMap map, Node[] tab, int index, int bit) {
TreeNode b = this;
// Relink into lo and hi lists, preserving order
TreeNode loHead = null, loTail = null;
TreeNode hiHead = null, hiTail = null;
int lc = 0, hc = 0;
for (TreeNode e = b, next; e != null; e = next) {
next = (TreeNode)e.next;
e.next = null;
//如果当前节点哈希值的最后一位等于要修剪的 bit 值
if ((e.hash & bit) == 0) {
//就把当前节点放到 lXXX 树中
if ((e.prev = loTail) == null)
loHead = e;
else
loTail.next = e;
//然后 loTail 记录 e
loTail = e;
//记录 lXXX 树的节点数量
++lc;
}
else { //如果当前节点哈希值最后一位不是要修剪的
//就把当前节点放到 hXXX 树中
if ((e.prev = hiTail) == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
//记录 hXXX 树的节点数量
++hc;
}
}


if (loHead != null) {
//如果 lXXX 树的数量小于 6,就把 lXXX 树的枝枝叶叶都置为空,变成一个单节点
//然后让这个桶中,要还原索引位置开始往后的结点都变成还原成链表的 lXXX 节点
//这一段元素以后就是一个链表结构
if (lc <= UNTREEIFY_THRESHOLD)
tab[index] = loHead.untreeify(map);
else {
//否则让索引位置的结点指向 lXXX 树,这个树被修剪过,元素少了
tab[index] = loHead;
if (hiHead != null) // (else is already treeified)
loHead.treeify(tab);
}
}
if (hiHead != null) {
//同理,让 指定位置 index + bit 之后的元素
//指向 hXXX 还原成链表或者修剪过的树
if (hc <= UNTREEIFY_THRESHOLD)
tab[index + bit] = hiHead.untreeify(map);
else {
tab[index + bit] = hiHead;
if (loHead != null)
hiHead.treeify(tab);
}
}
}

从上述代码可以看到,HashMap 扩容时对红黑树节点的修剪主要分两部分,先分类、再根据元素个数决定是还原成链表还是精简一下元素仍保留红黑树结构。

1.分类

指定位置、指定范围,让指定位置中的元素 (hash & bit) == 0 的,放到 lXXX 树中,不相等的放到 hXXX 树中。

2.根据元素个数决定处理情况

符合要求的元素(即 lXXX 树),在元素个数小于 6 时还原成链表,最后让哈希表中修剪的痛 tab[index] 指向 lXXX 树;在元素个数大于 6 时,还是用红黑树,只不过是修剪了下枝叶;

不符合要求的元素(即 hXXX 树)也是一样的操作,只不过最后它是放在了修剪范围外 tab[index + bit]。

总结

JDK 1.8 以后哈希表的 添加、删除、查找、扩容方法都增加了一种 节点为 TreeNode 的情况:

  • 添加时,当桶中链表个数超过 8 时会转换成红黑树;
  • 删除、扩容时,如果桶中结构为红黑树,并且树中元素个数太少的话,会进行修剪或者直接还原成链表结构;
  • 查找时即使哈希函数不优,大量元素集中在一个桶中,由于有红黑树结构,性能也不会差。

(图片来自:tech.meituan.com/java-hashma…)

shixinzhang

这篇文章根据源码分析了 HashMap 在 JDK 1.8 里新增的 TreeNode 的一些关键方法,可以看到,1.8 以后的 HashMap 结合了哈希表和红黑树的优点,不仅快速,而且在极端情况也能保证性能,设计者苦心孤诣可见一斑,写到这里不禁仰天长叹:什么时候我才能写出这么 NB 的代码啊!!!

参考地址

Hash算法

Hash算法在路由算法应用中,为了保证数据均匀的分布,例如有3个桶,分别是0号桶,1号桶和2号桶;现在有12个球,怎么样才能让12个球平均分布到3个桶中呢?使用Hash算法的做法是,将12个球从0开始编号,得到这样的一个序列:0,1,2,3,4,5,6,7,8,9,10,11。将这个序列中的每个值模3,不管数字是什么,得到的结果都是0,1,2,不会超过3,将结果为0的数字放入0号桶,结果为1的数子放入1号桶,结果为2的数字放入2号桶,12个球就均匀的分布到3个桶中,0,3,6,9,12号球放入0号桶,1,4,7,10号球放入1号桶,2,5,8,11号球放入2号桶。

一致性Hash算法

一致性Hash算法在1997年由麻省理工学院提出的一种分布式哈希(DHT)实现算法,设计目标是为了解决因特网中的热点(Hot Spot)问题,初衷和CARP十分相似。一致性Hash修正了CARP使用的简单哈希算法带来的问题,使得分布式哈希(DHT)可以在P2P环境中真正得到应用。

一致性Hash算法也是使用取模的方法,只是,刚才描述的取模法是对服务器的数量进行取模,而一致性Hash算法是对2^32取模,什么意思呢?简单来说,一致性Hash算法将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形)。整个空间按顺时针方向组织,圆环的正上方的点代表0,0点右侧的第一个点代表1,以此类推,2、3、4、5、6……直到2^32-1,也就是说0点左侧的第一个点代表2^32-102^32-1在零点中方向重合,我们把这个由2^32个点组成的圆环称为Hash环

http://static.cyblogs.com/hash算法001.png

特性定义

一致性Hash算法提出了在动态变化的Cache环境中,判定哈希算法好坏的四个定义:

**1、平衡性(Balance):**平衡性是指哈希的结果能够尽可能分布在所有的缓冲(Cache)中去,这样可以使得所有的缓冲空间得到利用。很多哈希算法都能够满足这一条件。

**2、单调性(Monotonicity):**单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应该能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会映射到旧的缓冲集合中的其他缓冲区。

**3、分散性(Spread):**在分布式环境中,终端有可能看不到所有的缓冲,而只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上去,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应该能够尽量避免不一致的情况发生,也就是尽量降低分散性。

**4、负载(Load):**负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射到不同的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。

下一步将各个服务器使用Hash进行一个哈希,具体可以选择服务器的IP或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置,这里假设将上文中四台服务器使用IP地址哈希后在环空间的位置如下:

http://static.cyblogs.com/hash算法002.png

接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器!

例如我们有Object AObject BObject CObject D四个数据对象,经过哈希计算后,在环空间上的位置如下:

http://static.cyblogs.com/hash算法003.png

根据一致性Hash算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上。

容错和可扩展

现假设Node C不幸宕机,可以看到此时对象A、B、D不会受到影响,只有C对象被重定位到Node D。一般的,在一致性Hash算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响,如下所示:

http://static.cyblogs.com/hash算法004.png

下面考虑另外一种情况,如果在系统中增加一台服务器Node X,如下图所示:

http://static.cyblogs.com/hash算法005.png

此时对象Object A、B、D不受影响,只有对象C需要重定位到新的Node X !一般的,在一致性Hash算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。

综上所述,一致性Hash算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

Hash环的数据倾斜问题

一致性Hash算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜(被缓存的对象大部分集中缓存在某一台服务器上)问题,例如系统中只有两台服务器,其环分布如下:

http://static.cyblogs.com/hash算法006.png

此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上。为了解决这种数据倾斜问题,一致性Hash算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器IP主机名的后面增加编号来实现。

例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:

http://static.cyblogs.com/hash算法007.png

同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到“Node A#1”、“Node A#2”、“Node A#3”三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题。在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。

作者:FserSuN

来源:https://blog.csdn.net/Revivedsun/article/details/71022871

LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,以分摊压力。Dubbo中LoadBalance结构如下图。

http://static.cyblogs.com/20170430135402902.jpeg

1
2
3
4
com.alibaba.dubbo.rpc.cluster.LoadBalance 
接口提供了
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
通过该方法,进行结点选择。
1
2
3
4
com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance 
实现了一些公共方法,并定义抽象方法
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
该方法由具体的负载均衡实现类去实现。

一致性哈希负载均衡配置

具体的负载均衡实现类包括4种。分别是随机、轮训、最少活跃、一致性Hash
一致性哈希负载均衡配置

配置如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dubbo:service interface="..." loadbalance="consistenthash" />
或:

<dubbo:reference interface="..." loadbalance="consistenthash" />
或:

<dubbo:service interface="...">
<dubbo:method name="..." loadbalance="consistenthash"/>
</dubbo:service>
或:

<dubbo:reference interface="...">
<dubbo:method name="..." loadbalance="consistenthash"/>
</dubbo:reference

一致性Hash负载均衡涉及到两个主要的配置参数为hash.arguments 与hash.nodes。

hash.arguments : 当进行调用时候根据调用方法的哪几个参数生成key,并根据key来通过一致性hash算法来选择调用结点。例如调用方法invoke(String s1,String s2); 若hash.arguments为1(默认值),则仅取invoke的参数1(s1)来生成hashCode。

hash.nodes: 为结点的副本数。

1
2
3
4
5
缺省只对第一个参数Hash,如果要修改,请配置
<dubbo:parameter key="hash.arguments" value="0,1" />

缺省用160份虚拟节点,如果要修改,请配置
<dubbo:parameter key="hash.nodes" value="320" />

Dubbo中一致性Hash的实现分析

dubbo的一致性哈希通过ConsistentHashLoadBalance类来实现。

ConsistentHashLoadBalance内部定义ConsistentHashSelector类,最终通过该类进行结点选择。ConsistentHashLoadBalance实现的doSelect方法来利用所创建的ConsistentHashSelector对象选择结点。

doSelect的实现如下。当调用该方法时,如果选择器不存在则去创建。随后通过ConsistentHashSelector的select方法选择结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 获取调用方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 生成调用列表hashCode
int identityHashCode = System.identityHashCode(invokers);
// 以调用方法名为key,获取一致性hash选择器
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 若不存在则创建新的选择器
if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
// 创建ConsistentHashSelector时会生成所有虚拟结点
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
// 获取选择器
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 选择结点
return selector.select(invocation);
}

ConsistentHashSelector在构造函数内部会创建replicaNumber个虚拟结点,并将这些虚拟结点存储于TreeMap。随后根据调用方法的参数来生成key,并在TreeMap中选择一个结点进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers; // 虚拟结点

private final int replicaNumber; // 副本数

private final int identityHashCode;// hashCode

private final int[] argumentIndex; // 参数索引数组

public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 创建TreeMap 来保存结点
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
// 生成调用结点HashCode
this.identityHashCode = System.identityHashCode(invokers);
// 获取Url
// dubbo://169.254.90.37:20880/service.DemoService?anyhost=true&application=srcAnalysisClient&check=false&dubbo=2.8.4&generic=false&interface=service.DemoService&loadbalance=consistenthash&methods=sayHello,retMap&pid=14648&sayHello.timeout=20000&side=consumer&timestamp=1493522325563
URL url = invokers.get(0).getUrl();
// 获取所配置的结点数,如没有设置则使用默认值160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 获取需要进行hash的参数数组索引,默认对第一个参数进行hash
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i ++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 创建虚拟结点
// 对每个invoker生成replicaNumber个虚拟结点,并存放于TreeMap中
for (Invoker<T> invoker : invokers) {

for (int i = 0; i < replicaNumber / 4; i++) {
// 根据md5算法为每4个结点生成一个消息摘要,摘要长为16字节128位。
byte[] digest = md5(invoker.getUrl().toFullString() + i);
// 随后将128位分为4部分,0-31,32-63,64-95,95-128,并生成4个32位数,存于long中,long的高32位都为0
// 并作为虚拟结点的key。
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}

public int getIdentityHashCode() {
return identityHashCode;
}

// 选择结点
public Invoker<T> select(Invocation invocation) {
// 根据调用参数来生成Key
String key = toKey(invocation.getArguments());
// 根据这个参数生成消息摘要
byte[] digest = md5(key);
//调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
//调用sekectForKey方法选择结点。
Invoker<T> invoker = sekectForKey(hash(digest, 0));
return invoker;
}

private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
// 由于hash.arguments没有进行配置,因为只取方法的第1个参数作为key
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}

//根据hashCode选择结点
private Invoker<T> sekectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
// 若HashCode直接与某个虚拟结点的key一样,则直接返回该结点
if (!virtualInvokers.containsKey(key)) {
// 若不一致,找到一个最小上届的key所对应的结点。
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
// 若存在则返回,例如hashCode落在图中[1]的位置
// 若不存在,例如hashCode落在[2]的位置,那么选择treeMap中第一个结点
// 使用TreeMap的firstKey方法,来选择最小上界。
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {

key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}

private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[0 + number * 4] & 0xFF))
& 0xFFFFFFFFL;
}

private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = null;
try {
bytes = value.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.update(bytes);
return md5.digest();
}
}

上述代码中 hash(byte[] digest, int number)方法用来生成hashCode。该函数将生成的结果转换为long类,这是因为生成的结果是一个32位数,若用int保存可能会产生负数。而一致性hash生成的逻辑环其hashCode的范围是在 0 - MAX_VALUE之间。因此为正整数,所以这里要强制转换为long类型,避免出现负数。

进行结点选择的方法为select,最后通过sekectForKey方法来选择结点。

1
2
3
4
5
6
7
8
9
10
11
// 选择结点
public Invoker<T> select(Invocation invocation) {
// 根据调用参数来生成Key
String key = toKey(invocation.getArguments());
// 根据这个参数生成消息摘要
byte[] digest = md5(key);
//调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
//调用sekectForKey方法选择结点。
Invoker<T> invoker = sekectForKey(hash(digest, 0));
return invoker;
}

sekectForKey方法的实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Invoker<T> sekectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
// 若HashCode直接与某个虚拟结点的key一样,则直接返回该结点
if (!virtualInvokers.containsKey(key)) {
// 若不在,找到一个最小上届的key所对应的结点。
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
// 若存在则返回,例如hashCode落在图中[1]的位置
// 若不存在,例如hashCode落在[2]的位置,那么选择treeMap中第一个结点
// 使用TreeMap的firstKey方法,来选择最小上界。
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}

在进行选择时候若HashCode直接与某个虚拟结点的key一样,则直接返回该结点,例如hashCode落在某个结点上(圆圈所表示)。若不在,找到一个最小上届的key所对应的结点。例如进行选择时的key落在图中1所标注的位置。由于利用TreeMap存储,key所落在的位置可能无法找到最小上界,例如图中2所标注的位置。那么需要返回TreeMap中的最小值(构成逻辑环状结构,找不到,则返回最开头的结点)。

http://static.cyblogs.com/20170430142247665.png

作者:王念博客

来源:https://my.oschina.net/wangnian/blog/3064886

前言:在工作中,谈到有小数点的加减乘除都会想到用BigDecimal来解决,但是有很多人对于double或者float为啥会丢失精度一脸茫然。还有BigDecimal是怎么解决的?话不多说,我们开始。

浮点数是啥?

浮点数是计算机用来表示小数的一种数据类型,采用科学计数法。在java中,double是双精度,64位,浮点数,默认是0.0d。float是单精度,32位.浮点数,默认是0.0f;

在内存中存储

img

float 符号位(1bit) 指数(8 bit) 尾数(23 bit)
double 符号位(1bit) 指数(11 bit) 尾数(52 bit)

float在内存中指数是8bit,由于阶码实际存储的是指数的移码,假设指数的真值是e,阶码为E,则有E=e+(2^n-1 -1)。其中 2^n-1 -1是IEEE754标准规定的指数偏移量,根据这个公式我们可以得到 2^8 -1=127。于是,float的指数范围为-128 +127,而double的指数范围为-1024 +1023。其中负指数决定了浮点数所能表达的绝对值最小的非零数;而正指数决定了浮点数所能表达的绝对值最大的数,也即决定了浮点数的取值范围。

float的范围为-2^128 ~ +2^127,也即-3.40E+38 ~ +3.40E+38;
double的范围为-2^1024 ~ +2^1023,也即-1.79E+308 ~ +1.79E+308

走进失真之科学计数法

我们先说说科学计数法,科学计数法是一种简化计数的方法,用来近似表示一个极大或极小且位数较多的数,对于位数较小的数值,科学计数法没有什么优势,但对于位数较多的数值其计数方法的优势就非常明显了。例如:光的速速是300000000米/秒,全世界人口数大约是6100000000。类似光的速度和世界人口数这样大数值的数,读、写都很不方便,所以光的速度可以写成310^8,全世界人口数可以写成6.110^9。所以计算器用科学计数法表示光速是3E8,世界人口数大约是6.1E9。

我们小时候玩计算器喜欢疯狂的累加或者累减,到最后计算器就会显示下图。这个就是科学计数法显示的结果

img

那图中真实的值是 -4.86*10^11=-486000000000。十进制科学计数法要求有效数字的整数部分必须在【1,9】区间内。

走进失真之精度

计算机在处理数据都涉及到数据的转换和各种复杂运算,比如,不同单位换算,不同进制(如二进制十进制)换算等,很多除法运算不能除尽,比如10÷3=3.3333…..无穷无尽,而精度是有限的,3.3333333x3并不等于10,经过复杂的处理后得到的十进制数据并不精确,精度越高越精确。float和double的精度是由尾数的位数来决定的,其整数部分始终是一个隐含着的“1”,由于它是不变的,故不能对精度造成影响。float:2^23 = 8388608,一共七位,由于最左为1的一位省略了,这意味着最多能表示8位数: 28388608 = 16777216 。有8位有效数字,但绝对能保证的为7位,也即*float的精度为7~8位有效数字;double:2^52 = 4503599627370496,一共16位,同理,*double的精度为16~17位

img

当到达一定值自动开始使用科学计数法,并保留相关精度的有效数字,所以结果是个近似数,并且指数为整数。在十进制中小数有些是无法完整用二进制表示的。所以只能用有限位来表示,从而在存储时可能就会有误差。对于十进制的小数转换成二进制采用乘2取整法进行计算,取掉整数部分后,剩下的小数继续乘以2,直到小数部分全为0。

如遇到

img

输出是 0.19999999999999998

double类型 0.3-0.1的情况。需要将0.3转成二进制在运算

0.3 * 2 = 0.6 => .0 (.6)取0剩0.6
0.6 * 2 = 1.2 => .01 (.2)取1剩0.2
0.2 * 2 = 0.4 => .010 (.4)取0剩0.4
0.4 * 2 = 0.8 => .0100 (.8) 取0剩0.8
0.8 * 2 = 1.6 => .01001 (.6)取1剩0.6
………….

img

看完上面,大概清楚了为啥浮点数会有精度问题。简单来说float和double类型主要是为了科学计算和工程计算而设计,他们执行二进制浮点运算,这是为了在广泛的数值范围上提供较为精确的快速近和计算而精心设计的。然而,他们并没有提供完全精确的结果,所以不应该被用于精确的结果的场合。浮点数达到一定大的数会自动使用科学计数法,这样的表示只是近似真实数而不等于真实数。当十进制小数位转换二进制的时候也会出现无限循环或者超过浮点数尾数的长度。

那我们怎么用BigDecimal来解决?

大家看下面的两个输出

img

输出结果:

0.299999999999999988897769753748434595763683319091796875
0.3

图上阿里的代码约束插件已经标注警告,让我使用String参数的构造方法创建BigDecimal。因为double不能精确地表示为0.3(任何有限长度的二进制),构造方法传递的值也是不完全等于0.3。大家在使用BigDecimal的时候一定要用String参数的构造方法来创建。说到这里,是木有还有好奇的宝宝有疑问,BigDecimal的原理是啥?为啥它就没有问题呢?其实原理很简单,BigDecimal是不可变的,可以用来表示任意精度的带符号十进制数。double之所以会出问题,是因为小数点转二进制丢失精度。**BigDecimal在处理的时候把十进制小数扩大N倍让它在整数上进行计算,并保留相应的精度信息。**至于BigDecimal是怎么保存的可以翻阅一下源代码。

L1,L2,L3 指的都是CPU的缓存,他们比内存快,但是很昂贵,所以用作缓存,CPU查找数据的时候首先在L1,然后看L2,如果还没有,就到内存查找一些服务器还有L3 Cache,目的也是提高速度。

高速缓冲存储器Cache是位于CPU与内存之间的临时存储器,它的容量比内存小但交换速度快。在Cache中的数据是内存中的一小部分,但这一小部分是短时间内CPU即将访问的,当CPU调用大量数据时,就可避开内存直接从Cache中调用,从而加快读取速度。由此可见,在CPU中加入Cache是一种高效的解决方案,这样整个内存储器(Cache+内存)就变成了既有Cache的高速度,又有内存的大容量的存储系统了。CacheCPU的性能影响很大,主要是因为CPU的数据交换顺序和CPUCache间的带宽引起的。

http://static.cyblogs.com/20161111081230139.png

高速缓存的工作原理

1. 读取顺序

CPU要读取一个数据时,首先从Cache中查找,如果找到就立即读取并送给CPU处理;如果没有找到,就用相对慢的速度从内存中读取并送给CPU处理,同时把这个数据所在的数据块调入Cache中,可以使得以后对整块数据的读取都从Cache中进行,不必再调用内存。

正是这样的读取机制使CPU读取Cache的命中率非常高(大多数CPU可达90%左右),也就是说CPU下一次要读取的数据90%都在Cache中,只有大约10%需要从内存读取。这大大节省了CPU直接读取内存的时间,也使CPU读取数据时基本无需等待。总的来说,CPU读取数据的顺序是先Cache后内存。

2. 缓存分类

前面是把Cache作为一个整体来考虑的,现在要分类分析了。IntelPentium开始将Cache分开,通常分为一级高速缓存L1二级高速缓存L2

在以往的观念中,L1 Cache是集成在CPU中的,被称为片内Cache。在L1中还分数据Cache(I-Cache)和指令Cache(D-Cache)。它们分别用来存放数据和执行这些数据的指令,而且两个Cache可以同时被CPU访问,减少了争用Cache所造成的冲突,提高了处理器效能。

http://static.cyblogs.com/20161111081336921.png

在P4处理器中使用了一种先进的一级指令Cache——动态跟踪缓存。它直接和执行单元及动态跟踪引擎相连,通过动态跟踪引擎可以很快地找到所执行的指令,并且将指令的顺序存储在追踪缓存里,这样就减少了主执行循环的解码周期,提高了处理器的运算效率。

以前的L2 Cache没集成在CPU中,而在主板上或与CPU集成在同一块电路板上,因此也被称为片外Cache。但从PⅢ开始,由于工艺的提高L2 Cache被集成在CPU内核中,以相同于主频的速度工作,结束了L2 Cache与CPU大差距分频的历史,使L2 CacheL1 Cache在性能上平等,得到更高的传输速度。L2Cache只存储数据,因此不分数据Cache和指令Cache。在CPU核心不变化的情况下,增加L2 Cache的容量能使性能提升,同一核心的CPU高低端之分往往也是在L2 Cache上做手脚,可见L2 Cache的重要性。现在CPUL1 CacheL2 Cache惟一区别在于读取顺序。

3. 读取命中率

CPUCache中找到有用的数据被称为命中,当Cache中没有CPU所需的数据时(这时称为未命中),CPU才访问内存。从理论上讲,在一颗拥有2级CacheCPU中,读取L1 Cache的命中率为80%。也就是说CPUL1 Cache中找到的有用数据占数据总量的80%,剩下的20%从L2 Cache读取。由于不能准确预测将要执行的数据,读取L2的命中率也在80%左右(从L2读到有用的数据占总数据的16%)。那么还有的数据就不得不从内存调用,但这已经是一个相当小的比例了。在一些高端领域的CPU(像IntelItanium)中,我们常听到L3 Cache,它是为读取L2 Cache后未命中的数据设计的—种Cache,在拥有L3 CacheCPU中,只有约5%的数据需要从内存中调用,这进一步提高了CPU的效率。

为了保证CPU访问时有较高的命中率,Cache中的内容应该按一定的算法替换。一种较常用的算法是“最近最少使用算法”(LRU算法),它是将最近一段时间内最少被访问过的行淘汰出局。因此需要为每行设置一个计数器,LRU算法是把命中行的计数器清零,其他各行计数器加1。当需要替换时淘汰行计数器计数值最大的数据行出局。这是一种高效、科学的算法,其计数器清零过程可以把一些频繁调用后再不需要的数据淘汰出Cache,提高Cache的利用率。

缓存行填充

CPU访问内存时,并不是逐个字节访问,而是以字长为单位访问。比如32位的CPU,字长为4字节,那么CPU访问内存的单位也是4字节。

这么设计的目的,是减少CPU访问内存的次数,加大CPU访问内存的吞吐量。比如同样读取8个字节的数据,一次读取4个字节那么只需要读取2次。

我们来看看,编写程序时,变量在内存中是否按内存对齐的差异。有2个变量word1、word2

图如下:

http://static.cyblogs.com/WX20200211-152201@2x.png

我们假设CPU4字节为单位读取内存。如果变量在内存中的布局按4字节对齐,那么读取a变量只需要读取一次内存,即word1;读取b变量也只需要读取一次内存,即word2

而如果变量不做内存对齐,那么读取a变量也只需要读取一次内存,即word1;但是读取b变量时,由于b变量跨越了2个word,所以需要读取两次内存,分别读取word1word2的值,然后将word1偏移取后3个字节,word2偏移取前1个字节,最后将它们做或操作,拼接得到b变量的值。

显然,内存对齐在某些情况下可以减少读取内存的次数以及一些运算,性能更高。

另外,由于内存对齐保证了读取b变量是单次操作,在多核环境下,原子性更容易保证。

但是内存对齐提升性能的同时,也需要付出相应的代价。由于变量与变量之间增加了填充,并没有存储真实有效的数据,所以占用的内存会更大。这也是一个典型的空间换时间的应用场景。

参考文章

认识CPU Cache

CPU Cache概述

随着CPU的频率不断提升,而内存的访问速度却没有质的突破,为了弥补访问内存的速度慢,充分发挥CPU的计算资源,提高CPU整体吞吐量,在CPU与内存之间引入了一级Cache。随着热点数据体积越来越大,一级Cache L1已经不满足发展的要求,引入了二级Cache L2,三级Cache L3。(注:若无特别说明,本文的Cache指CPU Cache,高速缓存)CPU Cache在存储器层次结构中的示意如下图:

http://static.cyblogs.com/20161111081230139.png

计算机早已进入多核时代,软件也越来越多的支持多核运行。一个处理器对应一个物理插槽,多处理器间通过QPI总线相连。一个处理器包含多个核,一个处理器间的多核共享L3 Cache。一个核包含寄存器、L1 Cache、L2 Cache,下图是Intel Sandy Bridge CPU架构,一个典型的NUMA多处理器结构:

http://static.cyblogs.com/20161111081336921.png

作为程序员,需要理解计算机存储器层次结构,它对应用程序的性能有巨大的影响。如果需要的程序是在CPU寄存器中的,指令执行时1个周期内就能访问到他们。如果在CPU Cache中,需要130个周期;如果在主存中,需要50200个周期;在磁盘上,大概需要几千万个周期。充分利用它的结构和机制,可以有效的提高程序的性能。

以我们常见的X86芯片为例,Cache的结构下图所示:整个Cache被分为S个组,每个组是又由E行个最小的存储单元——Cache Line所组成,而一个Cache Line中有B(B=64)个字节用来存储数据,即每个Cache Line能存储64个字节的数据,每个Cache Line又额外包含一个有效位(valid bit)、t个标记位(tag bit),其中valid bit用来表示该缓存行是否有效;tag bit用来协助寻址,唯一标识存储在CacheLine中的块;而Cache Line里的64个字节其实是对应内存地址中的数据拷贝。根据Cache的结构题,我们可以推算出每一级Cache的大小为B×E×S。

http://static.cyblogs.com/20161111081422178.png

那么如何查看自己电脑CPU的Cache信息呢?

在windows下查看方式有多种方式,其中最直观的是,通过安装CPU-Z软件,直接显示Cache信息,如下图:

http://static.cyblogs.com/20161111081516632.png

此外,Windows下还有两种方法:

①Windows API调用GetLogicalProcessorInfo。
②通过命令行系统内部工具CoreInfo。

如果是Linux系统, 可以使用下面的命令查看Cache信息:

1
ls /sys/devices/system/cpu/cpu0/cache/index0

http://static.cyblogs.com/20161111081643978.png

还有lscpu等命令也可以查看相关信息,如果是Mac系统,可以用sysctl machdep.cpu 命令查看cpu信息。

如果我们用Java编程,还可以通过CacheSize API方式来获取Cache信息, CacheSize是一个谷歌的小项目,java语言通过它可以进行访问本机Cache的信息。示例代码如下:

1
2
3
4
5
6
7
public static void main(String[] args) throws CacheNotFoundException {
CacheInfo info = CacheInfo.getInstance();
CacheLevelInfo l1Datainf = info.getCacheInformation(CacheLevel.L1, CacheType.DATA_CACHE);
System.out.println("第一级数据缓存信息:"+l1Datainf.toString());
CacheLevelInfo l1Instrinf = info.getCacheInformation(CacheLevel.L1, CacheType.INSTRUCTION_CACHE);
System.out.println("第一级指令缓存信息:"+l1Instrinf.toString());
}

打印输出结果如下:

1
2
3
4
第一级数据缓存信息:CacheLevelInfo [cacheLevel=L1, cacheType=DATA_CACHE, cacheSets=64, cacheCoherencyLineSize=64, cachePhysicalLinePartitions=1, cacheWaysOfAssociativity=8, isFullyAssociative=false, isSelfInitializing=true, totalSizeInBytes=32768]

第一级指令缓存信息:CacheLevelInfo [cacheLevel=L1, cacheType=INSTRUCTION_CACHE, cacheSets=64, cacheCoherencyLineSize=64, cachePhysicalLinePartitions=1, cacheWaysOfAssociativity=8, isFullyAssociative=false, isSelfInitializing=true, totalSizeInBytes=32768]
还可以查询L2、L3级缓存的信息,这里不做示例。从打印的信息和CPU-Z显示的信息可以看出,本机的Cache信息是一致的,L1数据/指令缓存大小都为:C=B×E×S=64×8×64=32768字节=32KB。

Cache Line伪共享及解决方案

Cache Line伪共享分析

说伪共享前,先看看Cache Line 在java编程中使用的场景。如果CPU访问的内存数据不在Cache中(一级、二级、三级),这就产生了Cache Line miss问题,此时CPU不得不发出新的加载指令,从内存中获取数据。通过前面对Cache存储层次的理解,我们知道一旦CPU要从内存中访问数据就会产生一个较大的时延,程序性能显著降低,所谓远水救不了近火。为此我们不得不提高Cache命中率,也就是充分发挥局部性原理。

局部性包括时间局部性、空间局部性。时间局部性:对于同一数据可能被多次使用,自第一次加载到Cache Line后,后面的访问就可以多次从Cache Line中命中,从而提高读取速度(而不是从下层缓存读取)。空间局部性:一个Cache Line有64字节块,我们可以充分利用一次加载64字节的空间,把程序后续会访问的数据,一次性全部加载进来,从而提高Cache Line命中率(而不是重新去寻址读取)。

看个例子:内存地址是连续的数组(利用空间局部性),能一次被L1缓存加载完成。

如下代码,长度为16的row和column数组,在Cache Line 64字节数据块上内存地址是连续的,能被一次加载到Cache Line中,所以在访问数组时,Cache Line命中率高,性能发挥到极致。

1
2
3
4
5
6
7
public int run(int[] row, int[] column) {
int sum = 0;
for(int i = 0; i < 16; i++ ) {
sum += row[i] * column[i];
}
return sum;
}

而上面例子中变量i则体现了时间局部性,i作为计数器被频繁操作,一直存放在寄存器中,每次从寄存器访问,而不是从主存甚至磁盘访问。虽然连续紧凑的内存分配带来高性能,但并不代表它一直都能带来高性能。如果把它放在多线程中将会发生什么呢?如图:

http://static.cyblogs.com/20161111081903485.png

数据X、Y、Z被加载到同一Cache Line中,线程A在Core1修改X,线程B在Core2上修改Y。根据MESI大法,假设是Core1是第一个发起操作的CPU核,Core1上的L1 Cache Line由S(共享)状态变成M(修改,脏数据)状态,然后告知其他的CPU核,图例则是Core2,引用同一地址的Cache Line已经无效了;当Core2发起写操作时,首先导致Core1将X写回主存,Cache Line状态由M变为I(无效),而后才是Core2从主存重新读取该地址内容,Cache Line状态由I变成E(独占),最后进行修改Y操作, Cache Line从E变成M。可见多个线程操作在同一Cache Line上的不同数据,相互竞争同一Cache Line,导致线程彼此牵制影响,变成了串行程序,降低了并发性。此时我们则需要将共享在多线程间的数据进行隔离,使他们不在同一个Cache Line上,从而提升多线程的性能。

Cache Line伪共享处理方案

处理伪共享的两种方式:

增大数组元素的间隔使得不同线程存取的元素位于不同的cache line上。典型的空间换时间。(Linux cache机制与之相关)
在每个线程中创建全局数组各个元素的本地拷贝,然后结束后再写回全局数组。
在Java类中,最优化的设计是考虑清楚哪些变量是不变的,哪些是经常变化的,哪些变化是完全相互独立的,哪些属性一起变化。举个例子:

1
2
3
4
5
6
7
public class Data{
long modifyTime;
boolean flag;
long createTime;
char key;
int value;
}

假如业务场景中,上述的类满足以下几个特点:

当value变量改变时,modifyTime肯定会改变
createTime变量和key变量在创建后,就不会再变化。
flag也经常会变化,不过与modifyTime和value变量毫无关联。
当上面的对象需要由多个线程同时的访问时,从Cache角度来说,就会有一些有趣的问题。当我们没有加任何措施时,Data对象所有的变量极有可能被加载在L1缓存的一行Cache Line中。在高并发访问下,会出现这种问题:

http://static.cyblogs.com/20161111082043281.png

如上图所示,每次value变更时,根据MESI协议,对象其他CPU上相关的Cache Line全部被设置为失效。其他的处理器想要访问未变化的数据(key 和 createTime)时,必须从内存中重新拉取数据,增大了数据访问的开销。

Padding 方式
正确的方式应该将该对象属性分组,将一起变化的放在一组,与其他属性无关的属性放到一组,将不变的属性放到一组。这样当每次对象变化时,不会带动所有的属性重新加载缓存,提升了读取效率。在JDK1.8以前,我们一般是在属性间增加长整型变量来分隔每一组属性。被操作的每一组属性占的字节数加上前后填充属性所占的字节数,不小于一个cache line的字节数就可以达到要求:

1
2
3
4
5
6
7
8
9
10
11
public class DataPadding{
long a1,a2,a3,a4,a5,a6,a7,a8;//防止与前一个对象产生伪共享
int value;
long modifyTime;
long b1,b2,b3,b4,b5,b6,b7,b8;//防止不相关变量伪共享;
boolean flag;
long c1,c2,c3,c4,c5,c6,c7,c8;//
long createTime;
char key;
long d1,d2,d3,d4,d5,d6,d7,d8;//防止与下一个对象产生伪共享
}

通过填充变量,使不相关的变量分开

Contended注解方式

在JDK1.8中,新增了一种注解@sun.misc.Contended,来使各个变量在Cache line中分隔开。注意,jvm需要添加参数-XX:-RestrictContended才能开启此功能
用时,可以在类前或属性前加上此注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 类前加上代表整个类的每个变量都会在单独的cache line中
@sun.misc.Contended
@SuppressWarnings("restriction")
public class ContendedData {
int value;
long modifyTime;
boolean flag;
long createTime;
char key;
}
或者这种:
// 属性前加上时需要加上组标签
@SuppressWarnings("restriction")
public class ContendedGroupData {
@sun.misc.Contended("group1")
int value;
@sun.misc.Contended("group1")
long modifyTime;
@sun.misc.Contended("group2")
boolean flag;
@sun.misc.Contended("group3")
long createTime;
@sun.misc.Contended("group3")
char key;
}

采取上述措施图示:

http://static.cyblogs.com/20161111082403755.png

JDK1.8 ConcurrentHashMap的处理
java.util.concurrent.ConcurrentHashMap在这个如雷贯耳的Map中,有一个很基本的操作问题,在并发条件下进行++操作。因为++这个操作并不是原子的,而且在连续的Atomic中,很容易产生伪共享(false sharing)。所以在其内部有专门的数据结构来保存long型的数据:

1
2
3
4
5
6
7
8
9
10
11
(openjdk\jdk\src\share\classes\java\util\concurrent\ConcurrentHashMap.java line:2506):
/* ---------------- Counter support -------------- */

/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

我们看到该类中,是通过@sun.misc.Contended达到防止false sharing的目的

JDK1.8 Thread 的处理

java.lang.Thread在java中,生成随机数是和线程有着关联。而且在很多情况下,多线程下产生随机数的操作是很常见的,JDK为了确保产生随机数的操作不会产生false sharing ,把产生随机数的三个相关值设为独占cache line。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(openjdk\jdk\src\share\classes\java\lang\Thread.java line:2023
// The following three initially uninitialized fields are exclusively
// managed by class java.util.concurrent.ThreadLocalRandom. These
// fields are used to build the high-performance PRNGs in the
// concurrent code, and we can not risk accidental false sharing.
// Hence, the fields are isolated with @Contended.

/** The current seed for a ThreadLocalRandom */
@sun.misc.Contended("tlr")
long threadLocalRandomSeed;

/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;

/** Secondary seed isolated from public ThreadLocalRandom sequence */
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed;

Java中对Cache line经典设计

Disruptor框架
认识Disruptor

LMAX是在英国注册并受到FCA监管的外汇黄金交易所。也是欧洲第一家也是唯一一家采用多边交易设施Multilateral Trading Facility(MTF)拥有交易所牌照和经纪商牌照的欧洲顶级金融公司。LMAX的零售金融交易平台,是建立在JVM平台上,核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器的核心就是Disruptor(注,本文Disruptor基于当前最新3.3.6版本),这是一个Java实现的并发组件,能够在无锁的情况下实现网络的Queue并发操作,它确保任何数据只由一个线程拥有以进行写访问,从而消除写争用的设计, 这种设计被称作“破坏者”,也是这样命名这个框架的。

Disruptor是一个线程内通信框架,用于线程里共享数据。与LinkedBlockingQueue类似,提供了一个高速的生产者消费者模型,广泛用于批量IO读写,在硬盘读写相关的程序中应用的十分广泛,Apache旗下的HBase、Hive、Storm等框架都有在使用Disruptor。LMAX 创建Disruptor作为可靠消息架构的一部分,并将它设计成一种在不同组件中共享数据非常快的方法。Disruptor运行大致流程入下图:

http://static.cyblogs.com/20161111082723254.png

图中左侧(Input Disruptor部分)可以看作多生产者单消费者模式。外部多个线程作为多生产者并发请求业务逻辑处理器(Business Logic Processor),这些请求的信息经过Receiver存放在粉红色的圆环中,业务处理器则作为消费者从圆环中取得数据进行处理。右侧(Output Disruptor部分)则可看作单生产者多消费者模式。业务逻辑处理器作为单生产者,发布数据到粉红色圆环中,Publisher作为多个消费者接受业务逻辑处理器的结果。这里两处地方的数据共享都是通过那个粉红色的圆环,它就是Disruptor的核心设计RingBuffer。

Disruptor特点

1、无锁机制。
2、没有CAS操作,避免了内存屏障指令的耗时。
3、避开了Cache line伪共享的问题,也是Disruptor部分主要关注的主题。

Disruptor对伪共享的处理
RingBuffer类

RingBuffer类(即上节中粉红色的圆环)的类关系图如下:

http://static.cyblogs.com/20161111082805708.png

通过源码分析,RingBuffer的父类,RingBufferFields采用数组来实现存放线程间的共享数据。下图,第57行,entries数组。

http://static.cyblogs.com/20161111082838223.png

前面分析过数组比链表、树更具有缓存友好性,此处不做细表。不使用LinkedBlockingQueue队列,是基于无锁机制的考虑。详细分析可参考,并发编程网的翻译。这里我们主要分析RingBuffer的继承关系中的填充,解决缓存伪共享问题。如下图:

http://static.cyblogs.com/20161111082919849.png

http://static.cyblogs.com/20161111082958287.png

依据JVM对象继承关系中父类属性与子类属性,内存地址连续排列布局,RingBufferPad的protected long p1,p2,p3,p4,p5,p6,p7;作为缓存前置填充,RingBuffer中的protected long p1,p2,p3,p4,p5,p6,p7;作为缓存后置填充。这样任意线程访问RingBuffer时,RingBuffer放在父类RingBufferFields的属性,都是独占一行Cache line不会产生伪共享问题。如图,RingBuffer的操作字段在RingBufferFields中,使用rbf标识:

http://static.cyblogs.com/20161111083200062.png

按照一行缓存64字节计算,前后填充56字节(7个long),中间大于等于8字节的内容都能独占一行Cache line,此处rbf是大于8字节的。

Sequence类

Sequence类用来跟踪RingBuffer和事件处理器的增长步数,支持多个并发操作包括CAS指令和写指令。同时使用了Padding方式来实现,如下为其类结构图及Padding的类。

Sequence里在volatile long value前后放置了7个long padding,来解决伪共享的问题。示意如图,此处Value等于8字节:

也许读者应该会认为这里的图示比上面RingBuffer的图示更好理解,这里的操作属性只有一个value,两个图相互结合就更能理解了。

Sequencer的实现

在RingBuffer构造函数里面存在一个Sequencer接口,用来遍历数据,在生产者和消费者之间传递数据。Sequencer有两个实现类,单生产者模式的实现SingleProducerSequencer与多生产者模式的实现MultiProducerSequencer。它们的类结构如图:

单生产者是在Cache line中使用padding方式实现,源码如下:

http://static.cyblogs.com/20161111084637099.png

多生产者则是使用 sun.misc.Unsafe来实现的。如下图:

http://static.cyblogs.com/20161111084621458.png

总结与使用示例

可见padding方式在Disruptor中是处理伪共享常见的方式,JDK1.8的@Contended很好的解决了这个问题,不知道Disruptor后面的版本是否会考虑使用它。

Disruptor使用示例代码参考地址。

参考资料

7个示例科普CPU Cache:http://coolshell.cn/articles/10249.html
Linux Cache 机制:http://www.cnblogs.com/liloke/archive/2011/11/20/2255737.html
《深入理解计算机系统》:第六章部分
Disruptor官方文档:https://github.com/LMAX-Exchange/disruptor/tree/master/docs
Disruptor并发编程网文档翻译:http://ifeve.com/disruptor/

0%