简栈文化

Java技术人的成长之路~

今天学习下ArrayList的源代码,不同于其他人写的博客,很多都是翻译源代码中的注释,然后直接贴到文章中去。小编打算换一种书写风格,带着问题看源码可能收获会更大,本文将围绕着下面几个问题展开讨论。

一、问题产生

  • 1、为什么ArrayList集合中存储元素的容器声明为transient Object[] elementData;
  • 2、既然ArrayList可以自动扩容,那么它的扩容机制是怎样实现的?
  • 3、调用ArrayListiterator()返回的迭代器是怎样的?
  • 4、采用ArrayList的迭代器遍历集合时,对集合执行相关修改操作时为什么会抛出ConcurrentModificationException,我们该如何避免?
  • 5、当集合扩容或者克隆时免不了对集合进行拷贝操作,那么ArrayList的数组拷贝是怎么实现的?
  • 6、ArrayList中的序列化机制

小编对ArrayList源码大概浏览了之后,总结出以上几个问题,带着这些问题,让我们一起翻开源码解决吧!

二、问题解答

1、为什么ArrayList集合中存储元素的容器声明为transient Object[] elementData;

ArrayList是一个集合容器,既然是一个容器,那么肯定需要存储某些东西,既然需要存储某些东西,那总得有一个存储的地方吧!就好比说你需要装一吨的水,总得有个池子给你装吧!或者说你想装几十毫升水,总得那个瓶子或者袋子给你装吧!区别就在于不同大小的水,我们需要的容器大小也不相同而已!

既然ArrayList已经支持泛型了,那么为什么ArrayList源码的容器定义为什么还要定义成下面的Object[]类型呢?

1
transient Object[] elementData;

其实无论你采用transient E[] elementData;的方式声明,或者是采用transient Object[] elementData;声明,都是允许的,差别在于前者要求我们我们在具体实例化elementData时需要做一次类型转换,而这次类型转换要求我们程序员保证这种转换不会出现任何错误。为了提醒程序员关注可能出现的类型转换异常,编译器会发出一个Type safety: Unchecked cast from String[] to E[]警告,这样讲不知道会不会很懵比,下面的代码告诉你:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyList<E> {
// 声明数组,类型为E[]
E[] DATAS;
// 初始化数组,必须做一次类型转换
public MyList(int initialCapacity) {
DATAS = (E[]) new Object[initialCapacity];
}
public E getDATAS(int index) {
return DATAS[index];
}
public void setDATAS(E[] dATAS) {
DATAS = dATAS;
}
}

上面的代码在1处我们声明了E[]数组,具体类型取决于你传入E的实际类型,但是要注意,当你对DATAS进行初始化时,你不能像下面这样初始化:

1
E[] DATAS = new E[10]; // 这句代码将报错

也就是说,泛型数组是不能具体化的,也就是不能通过new 泛型[size];的方式进行具体化,那么怎么解决呢?有两种方式:

  • 1、进行前面说的做一次转换,但不推荐

    就像上面代码所展示的,我们可以初始化成Object[]类型之后再转换成E[],但前提是你得保证这次转换不会出现任何错误,通常我们不建议这样子写!

  • 2、直接声明为Object[]

    这种方式也是ArrayList源码的定义方式,那么我们来看看ArrayList是怎么初始化的:

1
2
3
4
5
6
7
8
9
10
11
public ArrayList(int initialCapacity) {
if (initialCapacity > 0) {
// 此处直接new Object[],不会出现任何错误
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
this.elementData = EMPTY_ELEMENTDATA;
} else {
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
}
}

但是有一点还需要注意,但你调用ArrayListtoArray方法将集合转换为对象数组时,有可能出现意想不到的结果,具体可参考小编的另外一篇博文。

总结: 总的来说,我们要知道泛型数组是不能具体化的,以及其解决办法!你可能会很好奇我为什么没有讲transient,这个小编放到下面序列化反序列化时讲。

2、既然ArrayList可以自动扩容,那么它的扩容机制是怎样实现的?

有时候,我们得保证当增加水的时,原来的容器也可以装入新的的水而不至于溢出,也就是ArrayList的自动扩容机制。我们可以想象,假如列表大小为10,那么正常情况下只能装10个元素,我们很好奇在此之后调用add()方法时底层做了什么神奇的事,所以我们看看add()方法是怎么实现的:

1
2
3
4
5
6
7
// 增加一个元素
public boolean add(E e) {
// 确保内部容量大小,size指的是当前列表的实际元素个数
ensureCapacityInternal(size + 1);
elementData[size++] = e;
return true;
}

从上面方法可以看出先判断内部容量是否足够满足size + 1个元素,如果可以,就直接elementData[size++] = e;,否则就需要扩容,那么怎么扩容呢?我们到ensureCapacityInternal()方法看看,这里有一点很重要,请记住下面的参数:

  • minCapacity永远代表增加之后实际的总元素个数
  • newCapacity永远表示列表能够满足存储minCapacity个元素列表所需要扩容的大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 校验内部容量大小
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
// 这个方法只有首次调用时会用到,不然默认返回 minCapacity
private static int calculateCapacity(Object[] elementData, int minCapacity) {
// 这里如果成立,表示该ArrayList是刚刚初始化,还没有add进任何元素
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
return minCapacity;
}
// 扩容判断
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// 判断是否需要扩容,elementData.length表示列表的空间总大小,不是列表的实际元素个数,size才是列表的实际元素个数
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}

上面会判断集合是否刚刚初始化,即还没有调用过add()方法,如果成立,则将集合默认扩容至10,DEFAULT_CAPACITY的值为10,取最大值。最后一个方法的grow()成立的条件是容器的元素大于10且没有可用空间,即需要扩容了,我们再看看grow()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void grow(int minCapacity) {
// 获取旧的列表大小
int oldCapacity = elementData.length;
// 扩容之后的新的容器大小,默认增加一半 ..............................1
int newCapacity = oldCapacity + (oldCapacity >> 1);
// 如果扩容一半之后还不足,则新的容器大小等于minCapacity.............................2
if (newCapacity - minCapacity < 0) newCapacity = minCapacity;
// 如果新的容器大小比MAX_ARRAY_SIZE还大,
if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity);
// 数组拷贝操作
elementData = Arrays.copyOf(elementData, newCapacity);
}
// 最大不能超过Integer.MAX_VALUE
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ? Integer.MAX_VALUE : MAX_ARRAY_SIZE;
}

上面1>>表示右移,也就是相当于除以2,减为一半,2处可能调用addAll()方法时成立。

下面我们列举几种情况:

ID 情况描述 调用add()? 调用addAll(size)? + size大小 执行结果
1 列表刚初始化 初始化一个长度为10的列表,即容器扩容至10个单位
2 列表实际元素个数为10,实际大小也为10,此时调用add操作 容器扩容至15,容器元素个数为11,即有4个位置空闲
3 列表实际元素个数为10,列表长度也为10,此时调用addAll操作 是 + 5 容器扩容至15,没有空余
4 列表实际元素个数为5,列表长度为10,此时调用addAll()操作 是 + 10 容器扩容至15,没有空余

总结:

扩容机制如下:

  • 1、先默认将列表大小newCapacity增加原来一半,即如果原来是10,则新的大小为15;
  • 2、如果新的大小newCapacity依旧不能满足add进来的元素总个数minCapacity,则将列表大小改为和minCapacity一样大;即如果扩大一半后newCapacity为15,但add进来的总元素个数minCapacity为20,则15明显不能存储20个元素,那么此时就将newCapacity大小扩大到20,刚刚好存储20个元素;
  • 3、如果扩容后的列表大小大于2147483639,也就是说大于Integer.MAX_VALUE - 8,此时就要做额外处理了,因为实际总元素大小有可能比Integer.MAX_VALUE还要大,当实际总元素大小minCapacity的值大于Integer.MAX_VALUE,即大于2147483647时,此时minCapacity的值将变为负数,因为int是有符号的,当超过最大值时就变为负数

小编认为,上面第3点也体现了一种智慧,即当一样东西有可能出错时,我们应该提前对其做处理,而不要等到错误发生时再对其进行处理。也就是我们运维要做监控的目的。

3、调用ArrayListiterator()返回的迭代器是怎样的?

我们都知道所有集合都是Collection接口的实现类,又因为Collection继承了Iterable接口,因此所有集合都是可迭代的。我们常常会采用集合的迭代器来遍历集合元素,就像下面的代码:

1
2
3
4
5
6
7
8
9
ArrayList<String> list = new ArrayList<>();
list.add("a");
list.add("b");
// 获取集合的迭代器对象
Iterator<String> iter = list.iterator();
while (iter.hasNext()) {
String item = iter.next();
System.err.println(item);
}

我们可以通过调用集合的iterator()方法获取集合的迭代器对象,那么在ArrayList中,iterator()方法是怎么实现的呢?

1
2
3
public Iterator<E> iterator() {
return new Itr();
}

超级简单,原来是新建了一个叫Itr的对象那么这个Itr又是什么呢?打开源码我们发现Itr类其实是ArrayList的一个内部类,定义如下:

1
2
3
4
5
6
7
8
9
10
11
 private class Itr implements Iterator<E> {
int cursor; // index of next element to return
int lastRet = -1; // index of last element returned; -1 if no such
int expectedModCount = modCount;......................... 1
Itr() {}
public boolean hasNext() {...}// 具体实现被我删除了
public E next() {...}
public void remove() {...}
public void forEachRemaining(Consumer<? super E> consumer) {...}
final void checkForComodification() {...}
}

该迭代器实现了Iterator接口并实现了相关方法,提供我们对集合的遍历能力。总结:ArrayList的迭代器默认是其内部类实现,实现一个自定义迭代器只需要实现Iterator接口并实现相关方法即可。而实现Iterable接口表示该实现类具有像for-each loop迭代遍历的能力。

4、采用ArrayList的迭代器遍历集合时,对集合执行相关修改操作时为什么会抛出ConcurrentModificationException,我们该如何避免?

上面第3小节我们查看了ArrayList迭代器的源代码,我们都知道,如果在迭代的过程中调用非迭代器内部的remove或者clear方法将会抛出ConcurrentModificationException异常,那到底是为什么呢?我们一起来看看。首先这里设计两个很重要的变量,一个是expectedModCount,另一个是modCount,expectedModCount在集合内部迭代器中定义,就像上面第三小节源码1处所示,modCountAbstractList中定义。就像第三小节1处所看到的,默认两者是相等的,即expectedModCount = modCount,只有当其不想等的情况下就会抛出异常。真的是不想等就抛异常吗?我们来看看迭代器内部的next方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E next() {
// 在迭代前会对两个变量进行检查
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}
// 具体检查
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}

可以看出确实是当它们两者之间不想等时就报错,问题来了,那么什么时候会导致它们不想等呢?不急,我们来看看ArrayListremove方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E remove(int index) {
rangeCheck(index);
// 这里会修改modCount的值
modCount++;
E oldValue = elementData(index);

int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; // clear to let GC do its work

return oldValue;
}

可以看出当调用remove()方法时确实是修改了modCount的值,导致报错。那我们怎么做才能不报错有想在迭代过程中增加或者删除数据呢?答案是使用迭代器内部的remove()方法。

总结:

迭代器迭代集合时不能对被迭代集合进行修改,原因是modCountexpectedModCount两个变量值不想等导致的!

5、当集合扩容或者克隆时免不了对集合进行拷贝操作,那么ArrayList的数组拷贝是怎么实现的?

ArrayList中对集合的拷贝是通过调用ArrayscopyOf方法实现的,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static <T> T[] copyOf(T[] original, int newLength) {
return (T[]) copyOf(original, newLength, original.getClass());.................2
}
public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) {
// 在创建新数组对象之前会先对传入的数据类型进行判定
@SuppressWarnings("unchecked")
T[] copy = ((Object)newType == (Object)Object[].class)
? (T[]) new Object[newLength]
: (T[]) Array.newInstance(newType.getComponentType(), newLength);
System.arraycopy(original, 0, copy, 0,
Math.min(original.length, newLength));
return copy;
}

最后还调用了Systemarraycopy方法。

6、ArrayList中的序列化机制

第一小节我们知道ArrayList存储数据的定义方式为:

1
transient Object[] elementData;

我们会觉得非常奇怪,这是一个集合存储元素的核心,却声明为transient,是不是就说就不序列化了?这不科学呀!其实集合存储的数据还是会序列化的,具体我们看看ArrayList中的writeObject方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
writeObject
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException{
// Write out element count, and any hidden stuff
int expectedModCount = modCount;
s.defaultWriteObject();

// Write out size as capacity for behavioural compatibility with clone()
s.writeInt(size);

// 这个地方做一个序列化操作
for (int i=0; i<size; i++) {
s.writeObject(elementData[i]);
}

if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}

从上面的代码中我们可以看出ArrayList其实是有对elementData进行序列化的,只不过这样做的原因是因为elementData中可能会有很多的null元素,为了不把null元素也序列化出去,所以自定义了writeObjectreadObject方法。

参考地址

也许大家对这个问题都不陌生,实际装过系统用过电脑的朋友可能都有这样的经历:自己电脑配的是4G的内存条,可是装完系统之后发现电脑上显示的只有3.2G左右可用内存,其它的内存跑到哪去了?网上也有很多朋友给出了一些解释,大部分我觉得都没有解释得很清楚,今天我们就来看一下其中的具体缘由。

在此之前先来了解一些计算机系统结构和PC硬件方面的一些知识。

总线结构和主板的构成

  说起总线大家肯定不陌生,而且大家平时肯定跟它打过交道,我们在用U盘拷贝数据的时候先要把U盘通过USB接口与电脑相连才能拷贝。USB接口实际上就是一种总线,一般称这种总线为USB总线(也叫做通用串行总线)。在很久之前是没有USB总线的,那个时候每个外设各自采用自己的接口标准,举个最简单的例子:鼠标生产厂商采用鼠标特有的接口,键盘生产厂商用键盘特有的接口,这样一来的话,PC机上就必须提供很多接口,这样一来增加了硬件设计难度和成本,直到后来USB接口的出现,它统一了很多外设接口的标准,不仅使得用户可以很方便地连接一些外设,更增强了PC的可扩展性。所以现在大家看到的鼠标、键盘、U盘、打印机等等这些外设都可以直接通过USB接口直接插到电脑上的。

  在计算机系统中总线是非常重要的一个概念,正是因为有了总线,所有的组成部件才能一起正常协同分工合作。在很久以前的PC机中,采用的是三总线结构,即:数据总线、地址总线、控制总线。它们分别用来传输不同类型的数据,数据总线用来传输数据,地址总线用来传输地址,控制总线用来传输一些控制信号。下面这幅图很清楚地展示了三总线结构:

  img

随着时代的发展,这种简单的总线结构逐渐被淘汰。下面这幅图是现代计算采用的结构:

  img

  事实上这也是现代主板所采用的结构,当然可能部分地方有略微不同(大体结构是差不多的),仔细观察过主板构成的朋友可能对上面一幅图很熟悉。在主板上主要有两大主要部分:北桥(North Bridge也称Host Bridge)和南桥(South Bridge)。北桥主要负责CPU和内存、显卡这些部件的数据传送,而南桥主要负责I/O设备、外部存储设备以及BIOS之间的通信。现在有些主板已经没有北桥了,因为芯片厂商已经把北桥所负责的功能直接集成到CPU中了(不过暂且我们以上副图的模型来讨论)。

  在上副图中,我没有画出 数据总线和地址总线等,因为在某些总线标准中它们被集成到一起了,比如在PCI总线中,地址总线和数据总线总是分时复用的(也就是说假如PCI总线有32位数据总线,这32位总线在某个时刻可以充当数据总线的作用,在下一时刻可以充当地址总线的作用)。有的总线同时提供了数据总线和地址总线。

  下面来说一下几个主要总线和南北桥的作用:

  FSB总线:即前端总线(Front Side Bus),CPU和北桥之间的桥梁,CPU和北桥传递的所有数据必须经过FSB总线,可以这么说FSB总线的频率直接影响到CPU访问内存的速度。

  北桥:北桥是CPU和内存、显卡等部件进行数据交换的唯一桥梁,也就是说CPU想和其他任何部分通信必须经过北桥。北桥芯片中通常集成的还有内存控制器等,用来控制与内存的通信。现在的主板上已经看不到北桥了,它的功能已经被集成到CPU当中了。

  PCI总线:PCI总线是一种高性能局部总线,其不受CPU限制,构成了CPU和外设之间的高速通道。比如现在的显卡一般都是用的PCI插槽,PCI总线传输速度快,能够很好地让显卡和CPU进行数据交换。

  南桥:主要负责I/O设备之间的通信,CPU要想访问外设必须经过南桥芯片。

  在了解了这些基础东西之后,下面来讲解一下为何32位系统最大只支持4GB内存。

来由

  在使用计算机时,其最大支持的内存是由 操作系统 和 硬件 两方面决定的。

  先说一下硬件方面的因素,在上面已经提到了地址总线,在计算机中 CPU的地址总线数目 决定了CPU 的 寻址 范围,这种由地址总线对应的地址称作为物理地址。假如CPU有32根地址总线(一般情况下32位的CPU的地址总线是32位,也有部分32位的CPU地址总线是36位的,比如用做服务器的CPU),那么提供的可寻址物理地址范围 为 232=4GB(在这里要注意一点,我们平常所说的32位CPU和64位CPU指的是CPU一次能够处理的数据宽度,即位宽,不是地址总线的数目)。自从64位CPU出现之后,一次便能够处理64位的数据了,其地址总线一般采用的是36位或者40位(即CPU能够寻址的物理地址空间为64GB或者1T)。在CPU访问其它任何部件的时候,都需要一个地址,就像一个快递员送快递,没有地址他是不知道往哪里送达的,举个例子,CPU想从显存单元读取数据,必须知道要读取的显存单元的实际物理地址才能实现读取操作,同样地,从内存条上的内存单元读取数据也需要知道内存单元的物理地址。换句话说,CPU访问任何存储单元必须知道其物理地址。

  用户在使用计算机时能够访问的最大内存不单是由CPU地址总线的位数决定的,还需要考虑操作系统的实现。实际上用户在使用计算机时,进程所访问到的地址是逻辑地址,并不是真实的物理地址,这个逻辑地址是操作系统提供的,CPU在执行指令时需要先将指令的逻辑地址变换为物理地址才能对相应的存储单元进行数据的读取或者写入(注意逻辑地址和物理地址是一一对应的)。

  对于32位的windows操作系统,其逻辑地址编码采用的地址位数是32位的,那么操作系统所提供的逻辑地址寻址范围是4GB,而在intel x86架构下,采用的是内存映射技术(Memory-Mapped I/O, MMIO),也就说将4GB逻辑地址中一部分要划分出来与BIOS ROM、CPU寄存器、I/O设备这些部件的物理地址进行映射,那么逻辑地址中能够与内存条的物理地址进行映射的空间肯定没有4GB了,看下面这幅图就明白了:

  img

  所以当我们装了32位的windows操作系统,即使我们买了4GB的内存条,实际上能被操作系统访问到的肯定小于4GB,一般情况是3.2GB左右。假如说地址总线位数没有32位,比如说是20位,那么CPU能够寻址到1MB的物理地址空间,此时操作系统即使能支持4GB的逻辑地址空间并且假设内存条是4GB的,能够被用户访问到的空间不会大于1MB(当然此处不考虑虚拟内存技术),所以用户能够访问到的最大内存空间是由硬件和操作系统两者共同决定的,两者都有制约关系。

  对于64位的操作系统,其逻辑地址编码采用的地址位数是40位,能够最大支持1T的逻辑地址空间。考虑一种情况,假如CPU是64位的,地址总线位数是40位,操作系统也是64位的,逻辑地址编码采用的地址位数也是40位,内存条大小是64GB,那么是不是内存条的64GB全部都能被利用了呢?答案是不一定,因为这里面还要考虑一个因素就是内存控制器,内存控制器位于北桥之内(现在基本都是放在CPU里面了),内存控制器的实际连接内存的地址线决定了可以支持的内存容量,也就是说内存控制器与内存槽实际连接的地址线如果没有40位的话,是无法完全利用64GB的内存条的存储空间的。当然对于内存控制器这个问题几乎可以不用考虑,因为现在大多数的内存控制器至少都采用的是40位地址总线。

  关于这个问题就说这么多了,有兴趣深入研究的朋友可以自己查阅更多的资料。

参考地址

堆外内存的优势在于IO操作,相比堆内存可以减少一次copy和gc的次数。下面通过源码去了解堆外内存的分配和回收。一般分配堆外内存通过ByteBuffer allocateDirect(int capacity)方法,其内部是通过如下构造函数来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DirectByteBuffer(int cap) {               
super(-1, 0, cap, cap);// mark, pos, lim, cap
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) { // 修改内存起始地址
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}

首先调用父类的构造方法初始化ByteBuffer的四个基本属性,接下来reserveMemory方法是判断堆外剩余内存是否满足。这里的剩余并不是系统真是的剩余内存,参数-XX:MaxDirectMemorySize指定JVM最多可用的堆外内存。

如果堆外内存不足,则触发System.gc,这里有些难已理解,明明是堆外内存不足,System.gc的作用是建议VM进行full gc,再怎么说也是堆内存的回收。这里先保留这个疑问,继续往下看。

根据VM参数判断是否内存页对齐计算真实分配内存的大小,由-XX:+PageAlignDirectMemory控制,默认为false。allocateMemory是真正分配内存如果失败则回收内存。setMemory为填充内存。

接下来根据是否内存页对齐来计算内存的起始地址。我们知道HeapByteBuffer是基于byte数组来实现,不需要我们去考虑回收由JVM去处理。但是堆外内存JVM无法想堆内存那样回收,因此就有了Cleaner和Deallocator的存在。

每一个DirectBytebuffer都对应一个Deallocator和Cleaner对象,而Deallocator是Cleaner的一个属性。Deallocator继承了Runnable接口,当然run方法内部是释放内存的逻辑。

1
2
3
4
5
6
7
public void run() {
// 释放
unsafe.freeMemory(address);
address = 0;
// 修改堆外内存的占用大小
Bits.unreserveMemory(size, capacity);
}

在分析Cleaner之前我们先复习下PhantomReference(虚引用)

虚引用,正如其名,对一个对象而言,这个引用形同虚设,有和没有一样。如果一个对象与GC Roots之间仅存在虚引用,则称这个对象为虚可达(phantom reachable)对象。
当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在垃圾回收后,将这个虚引用加入引用队列,可以通过检查引用队列中是否有相应的虚引用来判断对象是否已经被回收了。

Cleaner继承自PhantomReference,在谈谈Java Reference的原理中介绍了Reference框架的大体逻辑,在PendingHandlerThread会把Pending list的引用对象移入Reference Queue,这个过程中如果Reference是Cleaner类型,那么会执行clean方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void clean() {
if (remove(this)) {
try {
this.thunk.run();
} catch (final Throwable var2) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
if (System.err != null) {
(new Error("Cleaner terminated abnormally", var2)).printStackTrace();
}
System.exit(1);
return null;
}
});
}
}
}

thunk是Deallocator类型,也就是说它run方法最终是由PendingHandlerThread线程执行的。这就是JDK的自动回收堆外内存。

thunk是Deallocator类型,也就是说它run方法最终是由PendingHandlerThread线程执行的。这就是JDK的自动回收堆外内存。

总结一下:DirectByteBuffer对象指向堆外的内存,它保存了一块内存的基本属性和Cleaner和Deallocator对象等。占用的空间相比堆外内存只是冰山一角,当DirectByteBuffer对象被回收,Cleaner对象也就是虚引用被加入到Pending list,PendingHandlerThread线程执行Cleaner的clean方法,最终释放堆外内存。这也就解释了为什么执行gc可以回收堆外内存了。也可以手动释放,首先拿到DirectByteBuffer的Cleaner对象,执行它的clean方法。

由于cleaner是private访问权限,所以自然想到使用反射来实现。
DirectByteBuffer实现了DirectBuffer接口,这个接口有cleaner方法可以获取cleaner对象

参考地址

Feign简介

​ 在上一篇文章中分析了Eureka的注册、续约、服务剔除、服务自我保护等机制,地址在https://blog.csdn.net/lgq2626/article/details/80288992 。这篇分析SpringCloudfeignSpringCloud微服务项目之间调用是通过httprest请求来进行服务调用的,之前我们会用到HttpClient等工具来进行服务请求,Spring对这种请求进行了处理,封装成了可声明式的web客户端,使得编写web客户端更容易,feign还支持可插拔的编码器和解码器,Spring在用的时候增加了对@requestMapping的处理,同时,SpringCloud还对feign集成了注册中心(eureka)和客户端负载均衡(ribbon),使得我们拥有一个客户端负载均衡的web请求客户端。

Feign在项目中的配置和使用

​ 在Springcloud中使用feign的时候,需要在配置类中加入一个@EnableFeignClients注解。代码如下:

1
2
3
4
5
6
7
8
@SpringBootApplication//springboot 启动类
@EnableFeignClients//开启eureka扫描
@EnableDiscoveryClient//开启eureka客户端
public class Application {
public static void main( String[] args ) throws ClassNotFoundException {
SpringApplication.run(Application.class, args);
}
}

配置feign调用客户端

1
2
3
4
5
@FeignClient(value = "xxx-server",configuration = FeignConfiguration.class)
public interface ConsumerSmsService extends SMSService{
@RequestMapping(value = "/sms/smsMessage", method = RequestMethod.POST)
RespSMSDto sendSms(ReqSMSDto smsReqDto);
}

经过上面的配置,直接在项目里面注入容器调用接口就可以了。

Feign源码分析

​ 在@EnableFeignClients标签中,import了一个FeignClientsRegistrar类,那么这个FeignClientsRegistrar#registerBeanDefinitions()在什么时候调用的呢?跟着Spring的源码走下去,看过源码的人都会直接看到AbstractApplicationContext#refresh()方法,整体整理一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// 扫描本项目里面的java文件,把bean对象封装成BeanDefinitiaon对象,然后调用DefaultListableBeanFactory#registerBeanDefinition()方法把beanName放到DefaultListableBeanFactory 的 List<String> beanDefinitionNames 中去
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);

try {
postProcessBeanFactory(beanFactory);

// 在这里调用到FeignClientsRegistrar对象的registerBeanDefinitions()方法
invokeBeanFactoryPostProcessors(beanFactory);

//从DefaultListableBeanFactory里面的beanDefinitionNames中找到所有实现了BeanPostProcessor接口的方法,如果有排序进行排序后放到list中
registerBeanPostProcessors(beanFactory);

//Spring的国际化
initMessageSource();

//
initApplicationEventMulticaster();

// Initialize other special beans in specific context subclasses.
onRefresh();

//
registerListeners();

// Spring的IOC、ID处理。Spring的AOP。事务都是在IOC完成之后调用了BeanPostProcessor#postProcessBeforeInitialization()和postProcessBeforeInitialization()方法,AOP(事务)就是在这里处理的
finishBeanFactoryInitialization(beanFactory);

// 执行完之后调用实现了所有LifecycleProcessor接口的类的onRefresh()方法,同时调用所有观察了ApplicationEvent接口的事件(观察者模式)
finishRefresh();
}

catch (BeansException ex) {

// 找到所有实现了DisposableBean接口的方法,调用了destroy()方法,这就是bean的销毁
destroyBeans();

// Reset 'active' flag.
cancelRefresh(ex);

throw ex;
}

finally {
resetCommonCaches();
}
}
}

​ 根据上面整理的代码发现,FeignClientsRegistrar#registerBeanDefinitions()方法是在扫描完bean之后,只放了一个beanname的情况下, 并没有进行IOC注册的时候调用的,这就是Spring动态扩展Bean,实现BeanDefinitionRegistryPostProcessor接口的所有方法也会在这里调用下postProcessBeanDefinitionRegistry()方法。关于Spring的东西就分析到这里。下面回到正题,分析FeignClientsRegistrar#registerBeanDefinitions()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
registerDefaultConfiguration(metadata, registry);//扫描EnableFeignClients标签里配置的信息,注册到beanDefinitionNames中。
registerFeignClients(metadata, registry);
}

public void registerFeignClients(AnnotationMetadata metadata,BeanDefinitionRegistry registry) {
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(FeignClient.class);
//省略代码...根据EnableFeignClients配置的basePackages找到包下所有FeignClient注解的类,Spring的Commponet也是这么干的
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(),
"@FeignClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(
FeignClient.class.getCanonicalName());

String name = getClientName(attributes);
/**
* 关键地方:Feign子容器概念:
* 在注入FeignAutoConfiguration类的时候,注入了一个FeignContext对象,这个就是Feign的子容器。
* 这里面装了List<FeignClientSpecification>对象,FeignClientSpecification对象的实质就是在@feignClient上配置的name为key,value为configuration对象的值
* 比如feignclient 这样配置的@FeignClient(url="https://api.weixin.qq.com",name="${usercenter.name}", configuration = UserCenterFeignConfiguration.class, primary= false)
* 那么在FeignContext中就会出现一个FeignClientSpecification{name='sms-server', configuration=[class com.jfbank.sms.configuration.FeignConfiguration]}这样的数据。
* 这个地方比较关键,主要是因为后期对feign客户端的编码解码会用到自定义的类
*/
//这个方法就是在ioc容器中塞入一个FeignClientSpecification对象,从而构建FeignContext子容器。
registerClientConfiguration(registry, name,
attributes.get("configuration"));
//重点分析这个
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}

private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);//对FeignClientFactoryBean对象生成一个BeanDefinition对象
...读取配置
String alias = name + "FeignClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();

boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null

beanDefinition.setPrimary(primary);

String qualifier = getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}

BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[] { alias });
//注册到beanDefinitionNames中对象
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);//
}

​ 读过Dubbo源码的同学都知道,当在DubboNamespaceHandler中解析reference标签的时候,传入了一个ReferenceBean对象,把xml中配置的属性都塞到这个对象上,也是装到了beanDefinitionNames中,然后发现ReferenceBean类和FeignClientFactoryBean都实现了FactoryBean的接口,并且里面都有getObject()getObjectType()方法。当接口调用到这个feign客户端的时候,会从IOC中读取这个FeignClientFactoryBean并且调用getObject方法。下面就是分析getObject方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 @Override
public Object getObject() throws Exception {
FeignContext context = applicationContext.getBean(FeignContext.class);
//从上文中的子容器中获取编码器,解码器等自定义类,然后封装一个Feign.Builder类
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {//当@FeignClient没有配置url的时候
String url;
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));//集成了ribbon客户端负载均衡,下一篇分析
}
//当@FeignClient配置了url的时候
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not lod balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, new HardCodedTarget<>(this.type, this.name, url));
}

首先看配置了url的,指定了urlfeignclient解析,一直跟着代码跟到了Feign.Builder#target()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> T target(Target<T> target) {
return build().newInstance(target);
}

public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory);
}

直接看ReflectiveFeign#newInstance()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//ReflectiveFeign#newInstance()
public <T> T newInstance(Target<T> target) {
//动态代理的handler类目前穿进来的是ParseHandlersByName类,所以这里要看ParseHandlersByName#apply()直接看下一个方法
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {//默认方法会走到这里,比如toString(),hashCode()等方法
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {//这里才是装配的调用类,上文分析到计息的handler是SynchronousMethodHandler#invoke()
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);//jdk动态代理

for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}

//ParseHandlersByName#apply类,构建动态代理的handler
public Map<String, MethodHandler> apply(Target key) {
List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type());
Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
for (MethodMetadata md : metadata) {
BuildTemplateByResolvingArgs buildTemplate;
if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder);//通过自定义的encoder去解析参数
} else if (md.bodyIndex() != null) {
buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder);//通过自定义的encoder去解析参数
} else {
buildTemplate = new BuildTemplateByResolvingArgs(md);
}
//创建handler,再看Factory#create()方法,下一个方法
result.put(md.configKey(),factory.create(key, md, buildTemplate, options, decoder, errorDecoder));
}
return result;
}
//Factory#create(),构建一个SynchronousMethodHandler去处理请求,调用invoke方法
public MethodHandler create(Target<?> target, MethodMetadata md,
RequestTemplate.Factory buildTemplateFromArgs,
Options options, Decoder decoder, ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
logLevel, md, buildTemplateFromArgs, options, decoder,
errorDecoder, decode404);
}

//SynchronousMethodHandler#invoke()方法:实际调用的方法
//@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);//构建requestTemplate对象
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);//下面不分析了,就是执行execute方法并且解码饭后返回值
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}

Feign源码总结

从读取注解到注入IOC容器,再到编码参数,发起请求,解码结果,整个封装过程都对我们开发带来了极大得便利,此文只是分析了feign带有url参数得解析方式,集成eureka和ribbon的在https://blog.csdn.net/lgq2626/article/details/80481514中做了分析。下面流程图总结下流程:
http://static.cyblogs.com/20180525131939613.png

参考地址

前文已经讲过,CMS是老年代垃圾收集器,在收集过程中可以与用户线程并发操作。它可以与Serial收集器和Parallel New收集器搭配使用。CMS牺牲了系统的吞吐量来追求收集速度,适合追求垃圾收集速度的服务器上。

CMS相关参数

参数 类型 默认值 作用
-XX:+UseConcMarkSweepGC boolean false 老年代采用CMS收集器收集
–XX:ParallelGCThreads=n int (ncpus <= 8) ? ncpus : 3 + ((ncpus * 5) / 8) 老年代采用CMS收集器收集
-XX:CMSInitiatingOccupancyFraction int 92 年代堆空间的使用率。比如value=75意味着第一次CMS垃圾收集会在老年代被占用75%时被触发。
-XX:+UseCMSInitiatingOccupancyOnly boolean false 只用设定的回收阈值(上面指定的70%),如果不指定,JVM仅在第一次使用设定值,后续则自动调整

触发条件

周期性GC

由后台线程ConcurrentMarkSweepThread循环判断(默认2s)是否需要触发。

如果没有设置-XX:+UseCMSInitiatingOccupancyOnly,虚拟机会根据收集的数据决定是否触发
老年代使用率达到阈值 CMSInitiatingOccupancyFraction,默认92%。
永久代的使用率达到阈值 CMSInitiatingPermOccupancyFraction,默认92%,前提是开启 CMSClassUnloadingEnabled
新生代的晋升担保失败。

主动触发

YGC过程发生Promotion Failed,进而对老年代进行回收
比如执行了System.gc(),前提是没有参数ExplicitGCInvokesConcurrent

收集过程

初始标记

这是CMS中两次stop-the-world事件中的一次。这一步的作用是标记存活的对象,有两部分:

标记老年代中所有的GC Roots对象
标记年轻代中活着的对象引用到的老年代的对象

CMS-initial-mark:961330K(1572864K)指标记时老年代的已用空间和总空间

并发标记

该阶段GC线程和应用线程并发执行,遍历InitialMarking阶段标记出来的存活对象,然后继续递归标记这些对象可达的对象。

1
2
[CMS-concurrent-mark-start]
[CMS-concurrent-mark: 2.787/3.329 secs] [Times: user=12.12 sys=0.64, real=3.33 secs]

第一行CMS-concurrent-mark-start标识标记阶段开始。第二行中的“2.787/3.329 secs”表示标记阶段的耗时。
表示花费了2.787cpu时间,3.329系统时间。

预清理阶段

由于在并发标记阶段,应用线程和GC线程是并发执行的,因此可能产生新的对象或对象关系发生变化,例如:

新生代的对象晋升到老年代;
直接在老年代分配对象;
老年代对象的引用关系发生变更;

http://static.cyblogs.com/20190702174034511.png

该阶段会把上述对象所在的Card标识为Dirty,后续只需扫描这些Dirty Card的对象,避免扫描整个老年代。
标记dirty card 能够到达的对象

http://static.cyblogs.com/20190702180248905.png

1
2
[CMS-concurrent-preclean-start]
[CMS-concurrent-preclean: 0.342/0.477 secs] [Times: user=1.79 sys=0.10, real=0.48 secs]
可终止的预处理

该阶段发生的前提是,新生代Eden区的内存使用量大于参数CMSScheduleRemarkEdenSizeThreshold 默认是2M,如果新生代的对象太少,就没有必要执行该阶段,直接执行重新标记阶段。
在该阶段,主要循环的做两件事:

处理 FromTo 区的对象,标记可达的老年代对象
和上一个阶段一样,扫描处理Dirty Card中的对象

重新标记

暂停所有用户线程,重新扫描堆中的对象,进行可达性分析,标记活着的对象。有了前面的基础,这个阶段的工作量被大大减轻,停顿时间因此也会减少。注意这个阶段是多线程的。

遍历新生代对象,重新标记
根据GC Roots,重新标记
遍历老年代的Dirty Card,重新标记,这里的Dirty Card大部分已经在clean阶段处理过

并发清理

通过以上5个阶段的标记,老年代所有存活的对象已经被标记并且现在要通过Garbage Collector采用清扫的方式回收那些不能用的对象了。
这个阶段主要是清除那些没有标记的对象并且回收空间;
由于CMS并发清理阶段用户线程还在运行着,伴随程序运行自然就还会有新的垃圾不断产生,这一部分垃圾出现在标记过程之后,CMS无法在当次收集中处理掉它们,只好留待下一次GC时再清理掉。

注意事项

  1. 减少remark阶段停顿

一般CMSGC耗时80%都在remark阶段,如果发现remark阶段停顿时间很长,可以尝试添加该参数:
-XX:+CMSScavengeBeforeRemark。在执行remark操作之前先做一次Young GC,目的在于减少年轻代对老年代的无效引用,降低remark时的开销。

  1. 内存碎片问题

CMS是基于标记-清除算法的,CMS只会删除无用对象,不会对内存做压缩,会造成内存碎片,这时候我们需要用到这个参数:-XX:CMSFullGCsBeforeCompaction=n
意思是说在上一次CMS并发GC执行过后,到底还要再执行多少次full GC才会做压缩。默认是0

  1. Concurrent mode failure

这个异常发生在cms正在回收的时候。执行CMS GC的过程中,同时业务线程也在运行,当年轻带空间满了,执行YGC时,需要将存活的对象放入到老年代,而此时老年代空间不足,这时CMS还没有机会回收老年带产生的,或者在做Minor GC的时候,新生代救助空间放不下,需要放入老年代,而老年代也放不下而产生的。

  • 过早提升与提升失败
    Minor GC 过程中,Survivor Unused 可能不足以容纳 Eden 和另一个 Survivor 中的存活对象, 那么多余的将被移到老年代, 称为过早提升(Premature Promotion),这会导致老年代中短期存活对象的增长, 可能会引发严重的性能问题。 再进一步,如果老年代满了, Minor GC 后会进行 Full GC, 这将导致遍历整个堆, 称为提升失败(Promotion Failure)。

  • 早提升的原因
    Survivor空间太小,容纳不下全部的运行时短生命周期的对象,如果是这个原因,可以尝试将Survivor调大,否则端生命周期的对象提升过快,导致老年代很快就被占满,从而引起频繁的full gc
    对象太大,SurvivorEden没有足够大的空间来存放这些大对象。

  • 提升失败原因
    当提升的时候,发现老年代也没有足够的连续空间来容纳该对象。为什么是没有足够的连续空间而不是空闲空间呢?老年代容纳不下提升的对象有两种情况:

    • 老年代空闲空间不够用了;
    • 老年代虽然空闲空间很多,但是碎片太多,没有连续的空闲空间存放该对象。
  • 解决方法
    如果是因为内存碎片导致的大对象提升失败,cms需要进行空间整理压缩;
    如果是因为提升过快导致的,说明Survivor 空闲空间不足,那么可以尝试调大 Survivor;
    如果是因为老年代空间不够导致的,尝试将CMS触发的阈值调低。

REF:https://juejin.im/post/5c39920b6fb9a049e82bbf94

参考地址

前言

线程池ThreadPoolExecutor在运行的过程中,业务并发量变动,需要不停服务调整线程池的线程数,ThreadPoolExecutor支持动态调整corePoolSizemaximumPoolSize的值。

示例demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ThreadChangeTest {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
10,
10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
int count = 0;
while (true) {
Thread.sleep(1000l);
for (int i = 0; i < 9; i++) {
executor.execute(() -> {
/*try {
Thread.sleep(1l);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("------------core:\t" + executor.getCorePoolSize() + "\tactive:\t" + executor.getActiveCount() + "\tmax:\t" + executor.getMaximumPoolSize());
});
}

count++;
if (count == 20) {
executor.setCorePoolSize(2);
executor.setMaximumPoolSize(9);
System.out.println("----------------------------------------");
}

if (count == 100) {
executor.shutdown();
System.out.println("=============================================");
break;
}
}
Thread.currentThread().join();
}
}

在程序运行中动态修改线程池corePoolSizemaximumPoolSize的值

源码分析

线程池参数调大
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//核心线程调小,中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
//核心线程数调大后,从队列取任务
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
//队列大小是否可以取任务
int k = Math.min(delta, workQueue.size());
//队列有任务就取,否则break
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}

public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

源码看出:线程池的调节时直接设置corePoolSizemaximumPoolSize的值

其中

1
workerCountOf(ctl.get())

代表工作任务线程数,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

调大corePoolSizemaximumPoolSize,线程池运行过程中自动生效,线程池处理逻辑增强。

线程池调小

调小corePoolSizemaximumPoolSize均会执行

1
interruptIdleWorkers();
跟踪interruptIdleWorkers源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//workers是所有已存在的线程,包括空闲线程
for (Worker w : workers) {
Thread t = w.thread;
//这里注意,非常关键,加锁w.tryLock()
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//从上面的参数onlyOne is false
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

这里的workers注意:是一个HashSet,存放规则:

核心线程优先占满,即使核心线程有空闲,新任务来了会优先开启新的线程而不是复用,核心线程仅在占满才会复用,然后使用队列,最后使用max线程,max线程数对应的workers会动态变化,

参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

线程池任务执行源码

我们看ThreadPoolExecutor执行任务的源码,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//这里注意,加锁了,非常关键
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

可以看出在任务拿出来后,立即加锁

包括任务执行的过程都是加锁的。

加锁分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

使用了AQS,自定义了加锁方式CAS模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class AbstractQueuedSynchronizer 
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}

可以看出使用tryAcquiretryRelease,均重写方法

1
2
3
4
5
6
7
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

compareAndSetState(0, 1)

使用上面的代码加锁,意味着线程执行过程中都是加锁的,不会被销毁,只会销毁空闲线程,或者当前线程执行结束销毁。

线程池调小corePoolSizemaximumPoolSize对当前正在执行的任务没有影响。

调节队列大小

队列是不可以动态调整的。

1
private final int capacity;

总结

  • 线程池corePoolSizemaximumPoolSize调大注意max线程数不要调过大,计算机资源是有限的。

  • 线程池的队列初始化大小注意,不能动态调节,队列占用的是堆内存,注意JVM的内存大小与GC能力,尽量减小大对象的存在。

  • 线程池corePoolSizemaximumPoolSize和队列调小注意,线程池的处理能力减弱,可能会执行拒绝策略。

参考地址

在数据仓库的建设中,一般都会围绕着星型模型和雪花模型来设计表关系或者结构。下面我们先来理解这两种模型的概念。

星型模型

http://static.cyblogs.com/1345516634_6388.jpg

​ 星型模是一种多维的数据关系,它由一个事实表和一组维表组成。每个维表都有一个维作为主键,所有这些维的主键组合成事实表的主键。强调的是对维度进行预处理,将多个维度集合到一个事实表,形成一个宽表。这也是我们在使用hive时,经常会看到一些大宽表的原因,大宽表一般都是事实表,包含了维度关联的主键和一些度量信息,而维度表则是事实表里面维度的具体信息,使用时候一般通过join来组合数据,相对来说对OLAP的分析比较方便。

雪花模型

http://static.cyblogs.com/1345516734_4305.jpg

​ 当有一个或多个维表没有直接连接到事实表上,而是通过其他维表连接到事实表上时,其图解就像多个雪花连接在一起,故称雪花模型。雪花模型是对星型模型的扩展。它对星型模型的维表进一步层次化,原有的各维表可能被扩展为小的事实表,形成一些局部的 “层次 “ 区域,这些被分解的表都连接到主维度表而不是事实表。雪花模型更加符合数据库范式,减少数据冗余,但是在分析数据的时候,操作比较复杂,需要join的表比较多所以其性能并不一定比星型模型高。

星型模型和雪花模型对比

属性 星型模型 雪花模型
数据总量
可读性 容易
表个数
查询速度
冗余度
对实时表的情况 增加宽度 字段比较少,冗余底
扩展性

应用场景

​ 星型模型的设计方式主要带来的好处是能够提升查询效率,因为生成的事实表已经经过预处理,主要的数据都在事实表里面,所以只要扫描实时表就能够进行大量的查询,而不必进行大量的join,其次维表数据一般比较少,在join可直接放入内存进行join以提升效率,除此之外,星型模型的事实表可读性比较好,不用关联多个表就能获取大部分核心信息,设计维护相对比较简答。

雪花模型的设计方式是比较符合数据库范式的理念,设计方式比较正规,数据冗余少,但在查询的时候可能需要join多张表从而导致查询效率下降,此外规范化操作在后期维护比较复杂。

总结

​ 通过上面的对比,我们可以发现数据仓库大多数时候是比较适合使用星型模型构建底层数据Hive表,通过大量的冗余来提升查询效率,星型模型对OLAP的分析引擎支持比较友好,这一点在Kylin中比较能体现。而雪花模型在关系型数据库中如MySQL,Oracle中非常常见,尤其像电商的数据库表。在数据仓库中雪花模型的应用场景比较少,但也不是没有,所以在具体设计的时候,可以考虑是不是能结合两者的优点参与设计,以此达到设计的最优化目的。

参考地址

聚集(clustered)索引,也叫聚簇索引

定义:数据行的物理顺序与列值(一般是主键的那一列)的逻辑顺序相同,一个表中只能拥有一个聚集索引。

http://static.cyblogs.com/20181225211503670.png

注:第一列的地址表示该行数据在磁盘中的物理地址,后面三列才是我们SQL里面用的表里的列,其中id是主键,建立了聚集索引。

结合上面的表格就可以理解这句话了吧:数据行的物理顺序与列值的顺序相同,如果我们查询id比较靠后的数据,那么这行数据的地址在磁盘中的物理地址也会比较靠后。而且由于物理排列方式与聚集索引的顺序相同,所以也就只能建立一个聚集索引了。

聚集索引实际存放的示意图

从上图可以看出聚集索引的好处了,索引的叶子节点就是对应的数据节点(MySQL的MyISAM除外,此存储引擎的聚集索引和非聚集索引只多了个唯一约束,其他没什么区别),可以直接获取到对应的全部列的数据,而非聚集索引在索引没有覆盖到对应的列的时候需要进行二次查询,后面会详细讲。因此在查询方面,聚集索引的速度往往会更占优势。

创建聚集索引

如果不创建索引,系统会自动创建一个隐含列作为表的聚集索引。

  • 创建表的时候指定主键(注意:SQL Sever默认主键为聚集索引,也可以指定为非聚集索引,而MySQL里主键就是聚集索引)
1
2
3
4
create table t1(
id int primary key,
name nvarchar(255)
)
  • 创建表后添加聚集索引

MySQL

1
alter table table_name add primary key(colum_name)

值得注意的是,最好还是在创建表的时候添加聚集索引,由于聚集索引的物理顺序上的特殊性,因此如果再在上面创建索引的时候会根据索引列的排序移动全部数据行上面的顺序,会非常地耗费时间以及性能。

非聚集(unclustered)索引

定义:该索引中索引的 逻辑顺序与磁盘上行的物理存储顺序不同 ,一个表中可以拥有多个非聚集索引。

其实按照定义,除了聚集索引以外的索引都是非聚集索引,只是人们想细分一下非聚集索引,分成普通索引,唯一索引,全文索引。如果非要把非聚集索引类比成现实生活中的东西,那么非聚集索引就像新华字典的偏旁字典,他结构顺序与实际存放顺序不一定一致。

http://static.cyblogs.com/2018122521150563.png

非聚集索引的二次查询问题

非聚集索引叶节点仍然是索引节点,只是有一个指针指向对应的数据块,此如果使用非聚集索引查询,而查询列中包含了其他该索引没有覆盖的列,那么他还要进行第二次的查询,查询节点上对应的数据行的数据。

有表t1:

http://static.cyblogs.com/20181225211503873.png

其中有 聚集索引clustered index(id),非聚集索引index(username)

使用以下语句进行查询,不需要进行二次查询,直接就可以从非聚集索引的节点里面就可以获取到查询列的数据。

1
2
3
select id, username from t1 where username = '小明'

select username from t1 where username = '小明'

但是使用以下语句进行查询,就需要二次的查询去获取原数据行的score:

1
select username, score from t1 where username = '小明'

SQL Server里面查询效率如下所示,Index Seek就是索引所花费的时间,Key Lookup就是二次查询所花费的时间。可以看的出二次查询所花费的查询开销占比很大,达到50%。

这篇博客有一个简单示例:https://blog.csdn.net/jiadajing267/article/details/54581262

总结如下:

动作描述 使用聚集索引 使用非聚集索引
列经常被分组排序
返回某范围内的数据 不应
一个或极少不同值 不应 不应
小数目的不同值 不应
大数目的不同值 不应
频繁更新的列 不应
外键列
主键列
频繁修改索引列 不应

我们需要搞清楚以下几个问题

第一:聚集索引的约束是唯一性,是否要求字段也是唯一的呢? 不要求唯一!

分析:如果认为是的朋友,可能是受系统默认设置的影响,一般我们指定一个表的主键,如果这个表之前没有聚集索引,同时建立主键时候没有强制指定使用非聚集索引,SQL会默认在此字段上创建一个聚集索引,而主键都是唯一的,所以理所当然的认为创建聚集索引的字段也需要唯一。

结论:聚集索引可以创建在任何一列你想创建的字段上,这是从理论上讲,实际情况并不能随便指定,否则在性能上会是恶梦。

第二:为什么聚集索引可以创建在任何一列上,如果此表没有主键约束,即有可能存在重复行数据呢?

粗一看,这还真是和聚集索引的约束相背,但实际情况真可以创建聚集索引。

分析其原因是:如果未使用 UNIQUE 属性创建聚集索引,数据库引擎将向表自动添加一个四字节 uniqueifier 列。必要时,数据库引擎 将向行自动添加一个 uniqueifier 值,使每个键唯一。此列和列值供内部使用,用户不能查看或访问。

第三:是不是聚集索引就一定要比非聚集索引性能优呢?

如果想查询学分在60-90之间的学生的学分以及姓名,在学分上创建聚集索引是否是最优的呢?

答:否。既然只输出两列,我们可以在学分以及学生姓名上创建联合非聚集索引,此时的索引就形成了覆盖索引,即索引所存储的内容就是最终输出的数据,这种索引在比以学分为聚集索引做查询性能更好。

第四:在数据库中通过什么描述聚集索引与非聚集索引的?

索引是通过二叉树的形式进行描述的,我们可以这样区分聚集与非聚集索引的区别:聚集索引的叶节点就是最终的数据节点,而非聚集索引的叶节仍然是索引节点,但它有一个指向最终数据的指针。

第五:在主键是创建聚集索引的表在数据插入上为什么比主键上创建非聚集索引表速度要慢?

  有了上面第四点的认识,我们分析这个问题就有把握了,在有主键的表中插入数据行,由于有主键唯一性的约束,所以需要保证插入的数据没有重复。我们来比较下主键为聚集索引和非聚集索引的查找情况:聚集索引由于索引叶节点就是数据页,所以如果想检查主键的唯一性,需要遍历所有数据节点才行,但非聚集索引不同,由于非聚集索引上已经包含了主键值,所以查找主键唯一性,只需要遍历所有的索引页就行(索引的存储空间比实际数据要少),这比遍历所有数据行减少了不少IO消耗。这就是为什么主键上创建非聚集索引比主键上创建聚集索引在插入数据时要快的真正原因。

参考地址

前提

近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理的便于后续业务数据统计、标签系统构建等扩展功能的数据模型。基于当前团队的资源和能力,优先调研了Alibaba开源中间件Canal的使用。

http://static.cyblogs.com/m-w-c-1.png

这篇文章简单介绍一下如何快速地搭建一套Canal相关的组件。

关于Canal

简介

下面的简介和下一节的原理均来自于Canal项目的README

img

Canal[kə'næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务Cache刷新
  • 带业务逻辑的增量数据处理
Canal的工作原理

MySQL主备复制原理:

img

  • MySQLMaster实例将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看)
  • MySQLSlave实例将masterbinary log events拷贝到它的中继日志(relay log
  • MySQLSlave实例重放relay log中的事件,将数据变更反映它到自身的数据

Canal的工作原理如下:

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
  • MySQL Master收到dump请求,开始推送binary logSlave(即Canal
  • Canal解析binary log对象(原始为byte流),并且可以通过连接器发送到对应的消息队列等中间件中
关于Canal的版本和部件

截止笔者开始编写本文的时候(2020-03-05),Canal的最新发布版本是v1.1.5-alpha-12019-10-09发布的),最新的正式版是v1.1.42019-09-02发布的)。其中,v1.1.4主要添加了鉴权、监控的功能,并且做了一些列的性能优化,此版本集成的连接器是TcpKafkaRockerMQ。而v1.1.5-alpha-1版本已经新增了RabbitMQ连接器,但是此版本的RabbitMQ连接器暂时不能定义连接RabbitMQ的端口号,不过此问题已经在master分支中修复(具体可以参看源码中的CanalRabbitMQProducer类的提交记录)。换言之,v1.1.4版本中目前能使用的内置连接器只有TcpKafkaRockerMQ三种,如果想尝鲜使用RabbitMQ连接器,可以选用下面的两种方式之一:

  • 选用v1.1.5-alpha-1版本,但是无法修改RabbitMQport属性,默认为5672
  • 基于master分支自行构建Canal

目前,Canal项目的活跃度比较高,但是考虑到功能的稳定性问题,笔者建议选用稳定版本在生产环境中实施,当前可以选用v1.1.4版本,本文的例子用选用的就是v1.1.4版本,配合Kafka连接器使用Canal主要包括三个核心部件:

  • canal-admin:后台管理模块,提供面向WebUICanal管理能力。
  • canal-adapter:适配器,增加客户端数据落地的适配及启动功能,包括REST、日志适配器、关系型数据库的数据同步(表对表同步)、HBase数据同步、ES数据同步等等。
  • canal-deployer:发布器,核心功能所在,包括binlog解析、转换和发送报文到连接器中等等功能都由此模块提供。

一般情况下,canal-deployer部件是必须的,其他两个部件按需选用即可。

部署所需的中间件

搭建一套可以用的组件需要部署MySQLZookeeperKafkaCanal四个中间件的实例,下面简单分析一下部署过程。选用的虚拟机系统是CentOS7

安装MySQL

为了简单起见,选用yum源安装(官方链接是https://dev.mysql.com/downloads/repo/yum):

img

mysql80-community-release-el7-3虽然包名带了mysql80关键字,其实已经集成了MySQL主流版本5.6、5.7和8.x等等的最新安装包仓库

选用的是最新版的MySQL8.x社区版,下载CentOS7适用的rpm包

1
2
3
4
cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下载完毕之后
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm

此时列举一下yum仓库里面的MySQL相关的包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141
mysql-connectors-community-source MySQL Connectors Community - disabled
mysql-tools-community/x86_64 MySQL Tools Community enabled: 105
mysql-tools-community-source MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64 MySQL Tools Preview disabled
mysql-tools-preview-source MySQL Tools Preview - Source disabled
mysql55-community/x86_64 MySQL 5.5 Community Server disabled
mysql55-community-source MySQL 5.5 Community Server - disabled
mysql56-community/x86_64 MySQL 5.6 Community Server disabled
mysql56-community-source MySQL 5.6 Community Server - disabled
mysql57-community/x86_64 MySQL 5.7 Community Server disabled
mysql57-community-source MySQL 5.7 Community Server - disabled
mysql80-community/x86_64 MySQL 8.0 Community Server enabled: 161
mysql80-community-source MySQL 8.0 Community Server - disabled

编辑/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]块中enabled设置为1,其实默认就是这样子,不用改,如果要选用5.x版本则需要修改对应的块):

1
2
3
4
5
6
[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

然后安装MySQL服务:

1
sudo yum install mysql-community-server

这个过程比较漫长,因为需要下载和安装5个rpm安装包(或者是所有安装包组合的压缩包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)。如果网络比较差,也可以直接从官网手动下载后安装:

img

1
2
3
4
5
6
7
8
9
10
11
12
13
// 下载下面5个rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server

// 强制安装
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps

安装完毕之后,启动MySQL服务,然后搜索MySQL服务的root账号的临时密码用于首次登陆(mysql -u root -p):

1
2
3
4
5
6
7
8
9
// 启动服务,关闭服务就是service mysqld stop
service mysqld start
// 查看临时密码 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登录临时root用户,使用临时密码
[root@localhost log]# mysql -u root -p

接下来做下面的操作:

  • 修改root用户的密码:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密码规则必须包含大小写字母、数字和特殊字符)
  • 更新roothost,切换数据库use mysql;,指定host%以便可以让其他服务器远程访问UPDATE USER SET HOST = '%' WHERE USER = 'root';
  • 赋予'root'@'%'用户,所有权限,执行GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
  • 改变root'@'%用户的密码校验规则以便可以使用Navicat等工具访问:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

http://static.cyblogs.com/m-w-c-6.png

操作完成之后,就可以使用root用户远程访问此虚拟机上的MySQL服务。最后确认是否开启了binlog(注意一点是MySQL8.x默认开启binlogSHOW VARIABLES LIKE '%bin%';

http://static.cyblogs.com/m-w-c-7.png

最后在MySQLShell执行下面的命令,新建一个用户名canal密码为QWqw12!@的新用户,赋予REPLICATION SLAVEREPLICATION CLIENT权限:

1
2
3
4
CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

切换回去root用户,创建一个数据库test

1
CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
安装Zookeeper

CanalKafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。笔者这里选用2020-03-04发布的3.6.0版本:

1
2
3
4
5
6
7
8
midkr /data/zk
# 创建数据目录
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

zoo.cfg文件中的dataDir设置为/data/zk/data,然后启动Zookeeper

1
2
3
4
5
[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

这里注意一点,要启动此版本的Zookeeper服务必须本地安装好JDK8+,这一点需要自行处理。启动的默认端口是2181,启动成功后的日志如下:

http://static.cyblogs.com/m-w-c-8.png

安装Kafka

Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper。笔者在此选用2.4.0并且Scala版本为2.13的安装包:

1
2
3
4
mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz

由于解压后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中对应的zookeeper.connect=localhost:2181已经符合需要,不必修改,需要修改日志文件的目录log.dirs/data/kafka/data。然后启动Kafka服务:

1
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

http://static.cyblogs.com/m-w-c-9.png

这样启动一旦退出控制台就会结束Kafka进程,可以添加-daemon参数用于控制Kafka进程后台不挂断运行。

1
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
安装和使用Canal

终于到了主角登场,这里选用Canalv1.1.4稳定发布版,只需要下载deployer模块:

1
2
3
4
5
mkdir /data/canal
cd /data/canal
# 这里注意一点,Github在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

解压后的目录如下:

1
2
3
4
5
6
7
8
9
10
11
- bin   # 运维脚本
- conf # 配置文件
canal_local.properties # canal本地配置,一般不需要动
canal.properties # canal服务配置
logback.xml # logback日志配置
metrics # 度量统计配置
spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
instance.properties # 实例配置,一般指单个数据库的配置
- lib # 服务依赖包
- logs # 日志文件输出目录

在开发和测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题。这里需要关注canal.propertiesinstance.properties两个配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。
  • canal.serverMode配置项指定为kafka,可选值有tcpkafkarocketmqmaster分支或者最新的的v1.1.5-alpha-1版本,可以选用rabbitmq),默认是kafka
  • canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意义。
kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下,指RabbitMQ服务的Host和Port

其他配置项可以参考下面两个官方Wiki的链接:

instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties需要修改的配置项主要包括:

  • canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321

  • 配置数据源实例,包括地址、用户、密码和目标数据库:

    • canal.instance.master.address,这里指定为127.0.0.1:3306
    • canal.instance.dbUsername,这里指定为canal
    • canal.instance.dbPassword,这里指定为QWqw12!@
    • 新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程)。
  • Kafka相关配置,这里暂时使用静态topic和单个partition

  • canal.mq.topic,这里指定为test也就是解析完的binlog结构化数据会发送到Kafka的命名为testtopic

    • canal.mq.partition,这里指定为0

配置工作做好之后,可以启动Canal服务:

1
2
3
4
5
sh /data/canal/bin/startup.sh 
# 查看服务日志
tail -100f /data/canal/logs/canal/canal
# 查看实例日志 -- 一般情况下,关注实例日志即可
tail -100f /data/canal/logs/example/example.log

启动正常后,见实例日志如下:

http://static.cyblogs.com/m-w-c-10.png

test数据库创建一个订单表,并且执行几个简单的DML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use `test`;

CREATE TABLE `order`
(
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
order_id VARCHAR(64) NOT NULL COMMENT '订单ID',
amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE uniq_order_id (`order_id`)
) COMMENT '订单表';

INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE FROM `order` WHERE order_id = '10086';

这个时候,可以利用Kafkakafka-console-consumer或者Kafka Tools查看test这个topic的数据:

1
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

http://static.cyblogs.com/m-w-c-11.png

具体的数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// test数据库建库脚本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}

// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',\n order_id VARCHAR(64) NOT NULL COMMENT '订单ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',\n UNIQUE uniq_order_id (`order_id`)\n) COMMENT '订单表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}

// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}

// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}

// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}

可见Kafka的名为testtopic已经写入了对应的结构化binlog事件数据,可以编写消费者监听Kafka对应的topic然后对获取到的数据进行后续处理。

小结

这篇文章大部分篇幅用于介绍其他中间件是怎么部署的,这个问题侧面说明了Canal本身部署并不复杂,它的配置文件属性项比较多,但是实际上需要自定义和改动的配置项是比较少的,也就是说明了它的运维成本和学习成本并不高。后面会分析基于结构化binlog事件做ELT和持久化相关工作以及Canal的生产环境可用级别HA集群的搭建。

参考地址

1. Kafka简介

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  1. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能
  2. 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输
  3. 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输
  4. 同时支持离线数据处理和实时数据处理
  5. Scale out:支持在线水平扩展

2. Kafka架构

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.

Producer:负责发布消息到Kafka broker

Consumer:消息消费者,向Kafka broker读取消息的客户端。

Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

3. Kafka拓扑结构

http://static.cyblogs.com/KafkaArchitecture.png

如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

4. Delivery Guarantee

  有这么几种可能的delivery guarantee:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

5. Kafka HA设计解析

5.1 Replica复制算法

​ 为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上
  3. 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
5.2 Broker活着的判定

kafka判定broker是否活着,通过以下2个方式:

  1. 和zk的session没有断(通过心跳来维系)
  2. follower能及时将leader消息复制过来,不能落后太多(例如默认lag超过4000就会踢出ISR)
5.3 所有Replica都不工作的情况

如果所有副本都出问题,一般有两种选择:

  1. 等待ISR中的任一个Replica“活”过来,并且选它作为Leader(一致性好,但是可用性差)
  2. 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader(一致性差,但是可用性相比第一种方式好)
5.4 Propagate消息

​ Producer在发布消息到某个Partition时,先通过 Metadata (通过 Broker 获取并且缓存在 Producer 内) 找到该 Partition 的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。

5.5 ACK前需要保证有多少个备份

​ 和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
  Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。。
  Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。

6. 分区Leader选举方法

一般比较容易想到的一个方法是:所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

该方法会存在3个问题:

  1. split-brain 这是由Zookeeper的特性引起的,虽然Zookeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
  2. herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
  3. Zookeeper负载过重 每个Replica都要为此在Zookeeper上注册一个Watch,当集群规模增加到几千个Partition时Zookeeper负载会过重。

改进的方法——所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

7. 各组件Failover过程

Broker failover过程
  1. Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
  3. 对set_p中的每一个Partition

  3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR  

​ 3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。  

3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1

  1. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

  Broker failover顺序图如下所示。

http://static.cyblogs.com/kafka_broker_failover.png

Controller failure过程
  1. Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3. 对set_p中的每一个Partition:
      3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
      3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
      3.3 将新的Leader,ISR和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
      Broker failover顺序图如下所示。

http://static.cyblogs.com/kafka_broker_failover.png

Partition重新分配

管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:

  1. 将Zookeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新Zookeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
  3. 将RAR - OAR中的Replica设置为NewReplica状态。
  4. 等待直到RAR中所有的Replica都与其Leader同步。
  5. 将RAR中所有的Replica都设置为OnlineReplica状态。
  6. 将Cache中的AR设置为RAR。
  7. 若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加Zookeeper中的leader epoch。
  8. 将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将Zookeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。
  9. 将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。
  10. 将Zookeeper中的AR设置为RAR。
  11. 删除/admin/reassign_partition

参考文章

0%