简栈文化

Java技术人的成长之路~

实现Reactor模型可分为以下三种:

  • 单线程模型
  • 单Reactor多线程模型
  • 主从Reactor多线程模型。

单线程模型

Reactor单线程模型,指的是所有的IO操作都在同一个线程上面完成,线程的职责如下:

  • 作为NIO服务端,接收客户端的TCP连接;

  • 作为NIO客户端,向服务端发起TCP连接;

  • 读取通信对端的请求或者应答消息;

  • 向通信对端发送消息请求或者应答消息。

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息解码。用户线程可以通过消息编码通过NIO线程将消息发送给客户端。

Server端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Reactor1 {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("accept from "+socketChannel.socket().getInetAddress().toString());
// LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable() && key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count <= 0) {
socketChannel.close();
key.cancel();
System.out.println("Received invalide data, close the connection");
//LOGGER.info("Received invalide data, close the connection");
continue;
}
System.out.println("Received message"+new String(buffer.array()));
//LOGGER.info("Received message {}", new String(buffer.array()));
}
keys.remove(key);
}
}
}
}
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Client1 {

public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel;
socketChannel = SocketChannel.open();
//socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 1234));
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//可以方便地修改日期格式
String str = dateFormat.format( now );
byte[] requst = str.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(requst.length);
buffer.put(requst);
buffer.flip();
try {
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
}catch (IOException e) {
e.printStackTrace();
}
socketChannel.close();
}
}

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
为了解决这些问题,演进出了Reactor多线程模型。

单Reactor多线程模型

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。当获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
http://static.cyblogs.com/20180720174300266.png

Reactor多线程模型的特点:

  1. 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;

  2. 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;

  3. 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;

服务端的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Reactor2 {
private static ExecutorService pool = Executors.newFixedThreadPool(100);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
if(selector.selectNow() < 0){
continue;
}
Set<SelectionKey> sets = selector.selectedKeys();
Iterator<SelectionKey> keys = sets.iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if(key.isAcceptable()) {
ServerSocketChannel Serverchannel = (ServerSocketChannel) key.channel();
SocketChannel channel = Serverchannel.accept();
channel.configureBlocking(false);
System.out.println("accept from "+channel.socket().getInetAddress().toString());
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isValid()&&key.isReadable()) {
pool.submit(new Processor(key));
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Processor implements Callable {
SelectionKey key;

public Processor(SelectionKey key) {
this.key = key;
}

@Override
public Object call() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
key.cancel();
socketChannel.close();

System.out.println("Received invalide data, close the connection");
return null;
}else if(count==0) {
return null;
}
System.out.println("Received message"+new String(buffer.array()));
System.out.println("current thread"+Thread.currentThread().toString());
return null;
}
}

在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

多个Reactor模式(主从Reactor)

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

多个Reactor模式架构图
http://static.cyblogs.com/2018072113140256.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MainReactor {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

int coreNum = Runtime.getRuntime().availableProcessors();
FollowerReactor[] followers = new FollowerReactor[coreNum];
for(int i=0; i<coreNum; i++) {
followers[i] = new FollowerReactor();
}

int index = 0;
while(selector.select()>0) {
Set<SelectionKey> keys = selector.selectedKeys();
for(SelectionKey key:keys) {
keys.remove(key);
if(key.isValid()&&key.isAcceptable()) {
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel1.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request:" + socketChannel.socket().getInetAddress());
FollowerReactor follower = followers[++index%coreNum];
follower.register(socketChannel);
//follower.wakeUp();
}
}
}

}
}

上面的代码是主Reactor,子Reactor根据前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。子Reactor的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FollowerReactor {
private Selector selector;
private static ExecutorService service =Executors.newFixedThreadPool(
2*Runtime.getRuntime().availableProcessors());
public void register(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(selector, SelectionKey.OP_READ);
}
public void wakeUp() {
}
public FollowerReactor() throws IOException {
selector = Selector.open();
select();
}
public void wakeup() {
this.selector.wakeup();
}
public void select() {
service.submit(() -> {
while(true) {
if(selector.select(500)<=0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if(key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if(count<0) {
channel.close();
key.cancel();
System.out.println(channel+"->red end !");
continue;
}else if(count==0) {
System.out.println(channel+",size is 0 !");
continue;
}else{
System.out.println(channel+",message is :"+new String(buffer.array()));

}
}
}
}
});

}
}

在子Reactor中创建了一个静态的线程池,且线程池的大小为机器核数的两倍,每个字Reactor包换一个Selector实例,同事每次创建一个子Reactor都提交一个任务到线程池,阻塞到selector方法,直到新的channel注册到该Selector上,才继续执行。

参考地址

MySQL是现在最流行的关系型数据库(RDB)的选择,创建一个应用时,无论是用户数据还是订单数据,使用关系型数据库存储是最可靠稳定的选择,借助RDB提供的可靠性、事务等功能,为应用提供完善的支持。MySQL是开源软件,可以免费使用,MySQL在发展多年后越来越成熟,成为大部分公司的数据库首选。MySQL采用插件式的存储引擎架构,5.5版本后默认使用InnoDB存储引擎。

MySQL架构

MySQL从概念上可以分为四层:

  • 第一层是接入层: 不同语言的客户端通过mysql的协议与mysql服务器进行连接通信,接入层进行权限验证、连接池管理、线程管理等。
  • 第二层是服务层: 包括sql解析器、sql优化器、数据缓冲、缓存等。
  • 第三层是存储引擎层: mysql中存储引擎是基于表的。
  • 第四层是系统文件层: 保存数据、索引、日志等。
    http://static.cyblogs.com/mysql-arch.png

MVCC

MVCC是Multi Version Concurrency Control的简称,代表多版本并发控制。为什么需要MVCC,还要从数据库事务的ACID特性说起。
相信很多朋友都了解ACID,它们分别代表了

  • Atomicity(原子性)
  • Consistency(一致性)
  • Isolation(隔离性)
  • Durability(持久性)

原子性表示一个事务的操作结果要么全部执行要么全部不执行。

一致性表示事务总是从一个一致的状态转换到另一个一致的状态。

隔离性表示一个事务的修改结果在什么时间能够被其他事务看到,SQL1992规范中对隔离性定义了不同的隔离级别,分为读未提交(READ UNCOMMITED),事务能够看到其他事务没有提及的修改,当另一个事务又回滚了修改后的情况又被称为脏读dirty read。
读已提交(READ COMMITTED),事务能够看到其他事务提交后的修改,这时会出现一个事务内两次读取数据可能因为其他事务提交的修改导致不一致的情况,称为不可重复读。 可重复读(REPEATABLE READ),在两次读取时读取到的数据的状态是一致的,和序列化(SERIALIZABLE)可重复读中可能出现第二次读读到第一次没有读到的数据,也就是被其他事务插入的数据,这种情况称为幻读phantom read,序列化级别中不能出现幻读。
隔离级别依次增强,但是导致的问题是并发能力的减弱。各种数据库厂商会对各个隔离级别进行实现。和Java中的多线程问题相同,数据库通常使用锁来实现隔离性。
最原生的锁,锁住一个资源后会禁止其他任何线程访问同一个资源。但是很多应用的一个特点都是读多写少的场景,很多数据的读取次数远大于修改的次数,而读取数据间互相排斥显得不是很必要。所以就使用了一种读写锁的方法,读锁和读锁之间不互斥,而写锁和写锁、读锁都互斥。这样就很大提升了系统的并发能力。之后人们发现并发读还是不够,又提出了能不能让读写之间也不冲突的方法,就是读取数据时通过一种类似快照的方式将数据保存下来,这样读锁就和写锁不冲突了,不同的事务session会看到自己特定版本的数据。当然快照是一种概念模型,不同的数据库可能用不同的方式来实现这种功能。

InnoDB与MVCC

MySQL中的InnoDB存储引擎的特性有,默认隔离级别REPEATABLE READ,行级锁,实现了MVCC,Consistent nonlocking read(默认读不加锁,一致性非锁定读),Insert Buffer,Adaptive Hash Index,DoubleWrite,Cluster Index。
上面列举了这么多,表示InnoDB有很多特性、很快。
InnoDB中通过UndoLog实现了数据的多版本,而并发控制通过锁来实现。
Undo Log除了实现MVCC外,还用于事务的回滚。

Redo log,bin log,Undo log

MySQL Innodb中存在多种日志,除了错误日志、查询日志外,还有很多和数据持久性、一致性有关的日志。
binlog,是mysql服务层产生的日志,常用来进行数据恢复、数据库复制,常见的mysql主从架构,就是采用slave同步master的binlog实现的,另外通过解析binlog能够实现mysql到其他数据源(如ElasticSearch)的数据复制。
redo log记录了数据操作在物理层面的修改,mysql中使用了大量缓存,缓存存在于内存中,修改操作时会直接修改内存,而不是立刻修改磁盘,当内存和磁盘的数据不一致时,称内存中的数据为脏页(dirty page)。为了保证数据的安全性,事务进行中时会不断的产生redo log,在事务提交时进行一次flush操作,保存到磁盘中,redo log是按照顺序写入的,磁盘的顺序读写的速度远大于随机读写。当数据库或主机失效重启时,会根据redo log进行数据的恢复,如果redo log中有事务提交,则进行事务提交修改数据。这样实现了事务的原子性、一致性和持久性。

Undo Log: 除了记录redo log外,当进行数据修改时还会记录undo log,undo log用于数据的撤回操作,它记录了修改的反向操作,比如,插入对应删除,修改对应修改为原来的数据,通过undo log可以实现事务回滚,并且可以根据undo log回溯到某个特定的版本的数据,实现MVCC。

redo log 和binlog的一致性,为了防止写完binlog但是redo log的事务还没提交导致的不一致,innodb 使用了两阶段提交
大致执行序列为

1
2
3
InnoDB prepare  (持有prepare_commit_mutex);
write/sync Binlog;
InnoDB commit (写入COMMIT标记后释放prepare_commit_mutex)

MVCC实现

innodb中通过B+树作为索引的数据结构,并且主键所在的索引为ClusterIndex(聚簇索引),ClusterIndex中的叶子节点中保存了对应的数据内容。一个表只能有一个主键,所以只能有一个聚簇索引,如果表没有定义主键,则选择第一个非NULL唯一索引作为聚簇索引,如果还没有则生成一个隐藏id列作为聚簇索引。
除了Cluster Index外的索引是Secondary Index(辅助索引)。辅助索引中的叶子节点保存的是聚簇索引的叶子节点的值。
InnoDB行记录中除了刚才提到的rowid外,还有trx_iddb_roll_ptrtrx_id表示最近修改的事务的id,db_roll_ptr指向undo segment中的undo log
新增一个事务时事务id会增加,trx_id能够表示事务开始的先后顺序。

Undo log分为InsertUpdate两种,delete可以看做是一种特殊的update,即在记录上修改删除标记。
update undo log记录了数据之前的数据信息,通过这些信息可以还原到之前版本的状态。
当进行插入操作时,生成的Insert undo log在事务提交后即可删除,因为其他事务不需要这个undo log
进行删除修改操作时,会生成对应的undo log,并将当前数据记录中的db_roll_ptr指向新的undo log
undolog

数据可见性判断

1
2
3
4
5
6
7
8
CREATE TABLE `testunique` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`uid` int(11) DEFAULT NULL
`ukey` int(11) DEFAULT NULL
PRIMARY KEY (`id`),
KEY `id_uid` (`uid`),
KEY `index_key` (`ukey`)
) ENGINE=InnoDB AUTO_INCREMENT=70 DEFAULT CHARSET=utf8;

隔离级别REPEATABLE READ

session1 session2
begin;
select * from testunique;
insert into testunique values(NULL,NULL,1);
select * from testunique;
commit
select * from testunique;
commit
select * from testunique;

只有当session2 commit之后的查询才能查到session1插入的数据

事务可见性的处理过程:
undo-view
RR级别下一个事务开始后第一个snapshot read的时候,会将当期活动的事务id记录下来,记录到read view中。RC级别则是每次snapshot read都会创建一个新的read view
假设当前,read view中最大的事务id为tmax,最小为tmin。则判断一个数据是否可见以及对应的版本的方法为。
如果该行中的trx_id,赋值给tid,如果tid和当前事务id相等或小于tmin,说明是事务内发生的或开启前的修改,则直接返回该版本数据; 如果
trx_id大于tmax,则查看该版本的db_roll_ptr中的trx_id,赋值给tid并从头开始判断。如果tid小于tmax并且不在read view中,则返回,否则中回滚段中找出undo logtrx_id,赋值给tid从头判断。

所以可见性是,只有当第一次读之前提交的修改和自己的修改可见,其他的均不可见。

参考地址:

Mongodb分片概括

  • 分片在多台服务器上分布数据的方法, Mongodb使用分片来支持具有非常大的数据集和高吞吐量的操作的部署
  • 具有大数据集和高吞吐量应用程序的数据库系统,可以挑战单台服务器的容量。
    例如,高查询率可以耗尽服务器的cpu容量,工作集大小大于系统的RAM强制磁盘驱动器的I/O容量,
  • 有两种方法来解决系统增长:垂直和水平缩放。
    • 垂直缩放 涉及增加的单个服务器的容量,例如使用更强大的CPU,加入更多的RAM,或增加的存储空间量。可用技术中的限制可能限制单个机器对于给定工作负载足够强大。此外,基于云的提供商具有基于可用硬件配置的硬上限。因此,对于垂直缩放存在实际的最大值。
    • 包括将系统数据和负载在多个服务器,添加额外的服务器,需要增加容量。虽然单个机器的总速度或容量可能不高,但是每个机器处理整个工作负载的子集,潜在地提供比单个高速大容量服务器更好的效率。扩展部署的容量仅需要根据需要添加额外的服务器,这可以是比单个机器的高端硬件低的总体成本。权衡是基础设施的复杂性和部署的维护。
  • Mongodb的支持水平扩展,分片。
分片目的

对于单台数据库服务器,庞大的数据量及高吞吐量的应用程序对它而言无疑是个巨大的挑战。频繁的CRUD操作能够耗尽服务器的CPU资源,快速的数据增长也会让硬盘存储无能为力,最终内存无法满足数据需要导致大量的I/O,主机负载严重。为了解决这种问题,对于数据库系统一般有两种方法:垂直扩展分片(水平扩展)。

【垂直扩展】:添加更多的CPU和存储资源来增加系统性能。这种方式缺点是:拥有大量CPU和RAM资源的高端机器比普通PC机器昂贵得太多,而且单点故障会影响整个系统的服务。

【分片】:相反地,分片将大的数据集分配到多台主机上,每个分片是一个独立的数据库,这些分片整体上构成一个完整的逻辑数据库。分片减少了每台服务器上的数据操作量,随着集群的增长,每台分片处理越来越少的数据,结果,增加了系统整体服务能力。另外,分片还减少了每台服务器需要存储的数据量。

MongoDB中的分片

MongoDB通过配置分片集群来支持分片,一个分片集群包括以下几个组件:分片,查询路由,配置服务器

  • **分片:**用来存储数据,为了提供系统可用性和数据一致性,一个生产环境的分片集群,通常每个分片是一个副本集。
  • 查询路由:指客户端应用访问每个分片的路径。
  • 配置服务器:存储集群的元数据,这些数据包含了集群数据集到各分片的映射关系。查询路由就是通过这些元数据到特定的分片上执行指定的数据操作。(从v3.2开始,配置服务器也可以作为副本集,但是必须使用WiredTiger存储引擎,反对使用3个镜像实例作为配置服务器)
数据划分

MongoDB的数据划分,是以集合级别为标准。分片通过shard key来划分集合数据。

  • shard key:

为了对集合分片,你需要指定一个shard key。shard key既可以是集合的每个文档的索引字段也可以是集合中每个文档都有的组合索引字段。MongoDB将shard keys值按照块(chunks)划分,并且均匀的将这些chunks分配到各个分片上。MongoDB使用基于范围划分基于散列划分来划分chunks的。

  • 基于范围划分

MongoDB通过shard key值将数据集划分到不同的范围就称为基于范围划分。对于数值型的shard key:你可以虚构一条从负无穷到正无穷的直线(理解为x轴),每个shard key 值都落在这条直线的某个点上,然后MongoDB把这条线划分为许多更小的没有重复的范围成为块(chunks),一个chunk就是就某些最小值到最大值的范围。

  • 基于散列划分:

MongoDB计算每个字段的hash值,然后用这些hash值建立chunks。

  • 基于范围和基于散列划分的性能比较:

基于范围划分对于范围查询比较高效。假设在shard key上进行范围查询,查询路由很容易能够知道哪些块与这个范围重叠,然后把相关查询按照这个路线发送到仅仅包含这些chunks的分片。但是基于范围划分很容易导致数据不均匀分布,这样会削弱分片集群的功能。例如当shard key是个成直线上升的字段,如时间。那么,所有在给定时间范围内的请求都会映射到相同的chunk,也就是相同的分片上。这种情况下,小部分的分片将会承受大多数的请求,那么系统整体扩展并不理想。

相反的,基于散列划分是以牺牲高效范围查询为代价,它能够均匀的分布数据,散列值能够保证数据随机分布到各个分片上。

  • 使用标签来自定义数据分布

MongoDB允许DBA们通过标签标记分片的方式直接平衡数据分布策略,DBA可以创建标签并且将它们与shard key值的范围进行关联,然后分配这些标签到各个分片上,最终平衡器转移带有标签标记的数据到对应的分片上,确保集群总是按标签描述的那样进行数据分布。标签是控制平衡器行为及集群中块分布的主要方法

维持数据分布平衡

新加入的数据及服务器都会导致集群数据分布不平衡,MongoDB采用两种方式确保数据分布的平衡:

  • 拆分

拆分是一个后台进程,防止块变得太大。当一个块增长到指定块大小的时候,拆分进程就会块一分为二,整个拆分过程是高效的。不会涉及到数据的迁移等操作。

  • 平衡

平衡器是一个后台进程,管理块的迁移。平衡器能够运行在集群任何的mongd实例上。当集群中数据分布不均匀时,平衡器就会将某个分片中比较多的块迁移到拥有块较少的分片中,直到数据分片平衡为止。举个例子:如果集合users有100个块在分片1里,50个块在分片2中,那么平衡器就会将分片1中的块迁移到分片2中,直到维持平衡。

分片采用后台操作的方式管理着源分片和目标分片之间块的迁移。在迁移的过程中,源分片中的块会将所有文档发送到目标分片中,然后目标分片会获取并应用这些变化。最后,更新配置服务器上关于块位置元数据。

  • 从集群中增加和删除分片

添加新分片到集群中会产生数据不平衡,因为新分片中没有块,当MongoDB开始迁移数据到新分片中时,等到数据分片平衡恐怕需要点时间。

当删除一个分片时,平衡器将会把分片中所有块迁移到另一个分片中,在完成这些迁移并更新元数据后,你就可以安全的删除分片了。

分片集群

  • 一个mongodb分片集群由以下几部分组成

    img

  • shard 每个shard包含分片数据的子集,每个shard可以部署一个副本集
    一台机器的一个数据表 Collection1 存储了 1T 数据,压力太大了!在分给4个机器后,每个机器都是256G,则分摊了集中在一台机器的压力。也许有人问一台机器硬盘加大一点不就可以了,为什么要分给四台机器呢?不要光想到存储空间,实际运行的数据库还有硬盘的读写、网络的IO、CPU和内存的瓶颈。在mongodb集群只要设置好了分片规则,通过mongos操作数据库就能自动把对应的数据操作请求转发到对应的分片机器上。在生产环境中分片的片键可要好好设置,这个影响到了怎么把数据均匀分到多个分片机器上,不要出现其中一台机器分了1T,其他机器没有分到的情况,这样还不如不分片!

img

  • mongos MongoS充当一个查询的路由器,提供客户端应用程序和所述分片簇之间的接口,mongos作为数据库集群请求的入口,所有的请求都是通过mongos来进行协调的,不需要在应用程序添加一个路由选择器,mongos自己就是一个请求分发中心,它负责把对应的数据请求转发到对应的shard服务器上,在生产环境中通常有多个monogs作为请求的入口,防止其中一个挂掉所有mongos请求都没有办法操作
  • config servers 为集群配置的服务器存储元数据和配置设置,从Mongodb3.4开始,配置服务器必须部署为复制集,mongos本身没有物理存储分片服务器和数据路由信息,只是缓存在内存当中,配置服务器则实际存储这些数据,mongos第一次启动或者关掉重启会从configserver中加载配置信息,以后如果配置信息有变化会通过所有的mongos更新自己的状态,这样mongs就能继续准确路由,在生产环境中通常有多个config server配置服务器,因为它存储了分片路由的元数据,如果就一个如果挂掉一个,整个mongodb基础就会挂掉。
片键
  • 片键
    1、在分发集合中文件时,mongodb的分区使用的收集片键关键,在片键由存在目标集合中的每个文档中的一个不可变或多个字段
    2、在分割集合的时候选择片键,分片键完成之后是不能更改的,分片集合只能有1个片键,到片键的非空集合,集合必须有一个索引,与片键启动,对于空空集合,如果集合尚未具有指定分片键的相关索引,则Mongodb会创建索引
    3、分片键的选择会影响分片集群的性能和效率以及可伸缩性,具有最佳可能的硬件可以通过分片达到瓶颈,片键和其支持指数的选择也可以影响数据的拆分,但集群可以使用
    4、片键决定了集群中一个集合的文件咋不同的片键中的分布,片键字段必须被索引,且在集合中的每条记录都不能为空,可以是单个字段或者是复合字段
    5、Mongodb使用片键的范围是吧数据分布在分片中,每个范围,又称为数据块,定义了一个不重叠的片键范围Mongodb把数据块与他们存储的文档分布到集群中的不同分布中,当一个数据块的大小超过数据块最大大小的时候,Mongodb会宜聚片键的范围将数据块分裂为更小的数据块

    img

  • 片键的使用语法
    1、在分片集合,必须制定目标集合和片键的sh.shardCollection()

1
sh.shardCollection(namespace, key)

2、哈希片键使用单字段的哈希索引进行数据在分片之间的平均分发,除数取余一致性哈希
3、被选为片键的字段必须有足够大的基数,或者有足够多的不同的值,对于单调的递增的字段如果ObjectID或是时间戳,哈希索引效果更好
4、如果在一个空集合创建哈希片键,Mongodb会自动创建并迁移数据块,以保证每个分片上都有两个数据块,也可以执行shardCollection指定numInitialChunks参数以控制初始化时Mongodb创建数据块数目,或者手动调用split命令在分片上分裂数据块
5、对使用了哈希片键分片的集合进行请求时,Mongodb会自动计算哈希值,应用不需要解析哈希值

shard集群部署

  • 部署ip规划

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    172.17.237.33:30001    config1
    172.17.237.34:30002 config2
    172.17.237.36:30003 config3
    172.17.237.37:40000 mongos
    172.17.237.38:50000 shard1
    172.17.237.39:50001 shard2
    172.17.237.40:50002 shard3
    172.17.237.41:60000 sha1
    172.17.237.42:60001 sha2
    172.17.237.43:60002 sha3
配置config server 副本集
  • 配置confi1配置文件
1
2
3
4
5
6
7
8
9
10
11
[root@My-Dev db2]# vim config1.conf 
[root@My-Dev db1]# vim configsvr.conf
logpath=/home/mongodb/test/db1/log/db1.log
pidfilepath=/home/mongodb/test/db1/db1.pid
logappend=true
port=30000
fork=true
dbpath=/home/mongodb/test/db1/data
configsvr=true # 在配置文件添加此项就行
oplogSize=512
replSet=config
  • 配置confi2配置文件
1
2
3
4
5
6
7
8
9
10
[root@My-Dev db2]# vim config2.conf 
logpath=/home/mongodb/test/db2/log/db2.log
pidfilepath=/home/mongodb/test/db2/db2.pid
logappend=true
port=30001
fork=true
dbpath=/home/mongodb/test/db2/data
oplogSize=512
replSet=config
configsvr=true
  • 配置confi3配置文件
1
2
3
4
5
6
7
8
9
10
[root@My-Dev db2]# vim config3.conf
logpath=/home/mongodb/test/db3/log/db3.log
pidfilepath=/home/mongodb/test/db3/db3.pid
logappend=true
port=30002
fork=true
dbpath=/home/mongodb/test/db3/data
oplogSize=512
replSet=config
configsvr=true
  • 启动config server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db1/config1.conf 
about to fork child process, waiting until server is ready for connections.
forked process: 5260
child process started successfully, parent exiting

[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db2/config2.conf
about to fork child process, waiting until server is ready for connections.
forked process: 5202
child process started successfully, parent exiting

[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db3/config3.conf
about to fork child process, waiting until server is ready for connections.
forked process: 4260
child process started successfully, parent exiting
  • 配置config副本集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> use admin
switched to db admin

> config = { _id:"config",members:[ {_id:0,host:"conf1:30000"}, {_id:1,host:"conf2:30001"}, {_id:2,host:"conf3:30002"}] } #定义副本集
{
"_id" : "config",
"members" : [
{
"_id" : 0,
"host" : "conf1:30000"
},
{
"_id" : 1,
"host" : "conf2:30001"
},
{
"_id" : 2,
"host" : "conf3:30002"
}
]
}
> rs.initiate(config) #初始化副本集
{ "ok" : 1 }
配置mongos
  • 添加配置mongos配置文件
    遇到坑了,在启动mongos的时候启动失败,结果是mongodb3.0以后的版本config server必须是复制集才行,结果我的版本是3.4最新的版本,所以说还需要添加两台confi server
1
2
3
4
5
6
7
8
[root@My-Dev db4]# vim  mongos.conf 

logpath=/home/mongodb/test/db4/log/db4.log
pidfilepath=/home/mongodb/test/db4/db4.pid
logappend=true
port=40004
fork=true
configdb=mongos/172.17.237.33:30000,172.17.237.34:30001,172.17.237.36:30002 #如果有多个mongo confi的话就用逗号分隔开
  • 启动mongos
1
2
3
4
[root@My-Dev bin]# ./mongos -f /home/mongodb/test/db4/mongos.conf 
about to fork child process, waiting until server is ready for connections.
forked process: 6268
child process started successfully, parent exiting
shard2副本集集群部署
  • 配置sha配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[root@My-Dev db8]# more shard21.conf 
logpath=/home/mongodb/test/db8/log/db8.log
pidfilepath=/home/mongodb/test/db8/db8.pid
directoryperdb=true
logappend=true
port=60000
fork=true
dbpath=/home/mongodb/test/db8/data
oplogSize=512
replSet=sha
shardsvr=true



[root@My-Dev db9]# more shard22.conf
logpath=/home/mongodb/test/db9/log/db9.log
pidfilepath=/home/mongodb/test/db9/db9.pid
directoryperdb=true
logappend=true
port=60001
fork=true
dbpath=/home/mongodb/test/db9/data
oplogSize=512
replSet=sha
shardsvr=true


[root@My-Dev db10]# more shard23.conf
logpath=/home/mongodb/test/db10/log/db10.log
pidfilepath=/home/mongodb/test/db10/db10.pid
directoryperdb=true
logappend=true
port=60002
fork=true
dbpath=/home/mongodb/test/db10/data
oplogSize=512
replSet=sha
shardsvr=true
  • 启动shard
1
2
3
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db8/shard21.conf 
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db9/shard22.conf
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db10/shard23.conf
  • 配置shard2副本集集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> use admin 
switched to db admin
> sha = { _id:"sha",members:[ {_id:0,host:"sha1:60000"}, {_id:1,host:"sha2:60001"}, {_id:2,host:"sha3:60002"}]}
{
"_id" : "sha",
"members" : [
{
"_id" : 0,
"host" : "sha1:60000"
},
{
"_id" : 1,
"host" : "sha2:60001"
},
{
"_id" : 2,
"host" : "sha3:60002"
}
]
}
> rs.initiate(sha)
{ "ok" : 1 }
shard1副本集集群部署
  • 配置shard配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@My-Dev db5]# vim shard1.conf 
logpath=/home/mongodb/test/db5/log/db5.log
pidfilepath=/home/mongodb/test/db5/db5.pid
directoryperdb=true
logappend=true
port=50000
fork=true
dbpath=/home/mongodb/test/db5/data
oplogSize=512
replSet=shard
shardsvr=true


[root@My-Dev db6]# vim shard2.conf
logpath=/home/mongodb/test/db6/log/db6.log
pidfilepath=/home/mongodb/test/db6/db6.pid
directoryperdb=true
logappend=true
port=50001
fork=true
dbpath=/home/mongodb/test/db6/data
oplogSize=512
replSet=shard
shardsvr=true


[root@My-Dev db7]# vim shard3.conf
logpath=/home/mongodb/test/db7/log/db7.log
pidfilepath=/home/mongodb/test/db7/db7.pid
directoryperdb=true
logappend=true
port=50002
fork=true
dbpath=/home/mongodb/test/db7/data
oplogSize=512
replSet=shard
shardsvr=true
  • 启动shard
1
2
3
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db7/shard1.conf 
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db7/shard2.conf
[root@My-Dev bin]# ./mongod -f /home/mongodb/test/db7/shard3.conf
  • 配置shard2副本集集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> use admin
switched to db admin
> shard = { _id:"shard",members:[ {_id:0,host:"shard1:50000"}, {_id:1,host:"shard2:50001"}, {_id:2,host:"shard3:50002"}] }
{
"_id" : "shard",
"members" : [
{
"_id" : 0,
"host" : "shard1:50000"
},
{
"_id" : 1,
"host" : "shard2:50001"
},
{
"_id" : 2,
"host" : "shard3:50002"
}
]
}
> rs.initiate(shard)
{ "ok" : 1 }
分片配置
  • 分片集合中是否有数据
    默认第一个添加的shard就是主shard,存放没有被分割的shard就是主shard
    在创建分片的时,必须在索引中创建的,如果这个集合中有数据,则首先自己先创建索引,然后进行分片,如果是分片集合中没有数据的话,则就不需要创建索引,就可以分片
  • 登陆mongos配置分片,向分区集群中添加shard服务器和副本集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@My-Dev bin]# ./mongo mongos:40004    #登陆到mongos中

mongos> sh.status() #查看分片状态
--- Sharding Status ---
sharding version: {
"_id" : 1,
"minCompatibleVersion" : 5,
"currentVersion" : 6,
"clusterId" : ObjectId("589b0cff36b0915841e2a0a2")
}
shards:
active mongoses:
"3.4.1" : 1
autosplit:
Currently enabled: yes
balancer:
Currently enabled: yes
Currently running: no
Balancer lock taken at Wed Feb 08 2017 20:20:16 GMT+0800 (CST) by ConfigServer:Balancer
Failed balancer rounds in last 5 attempts: 0
Migration Results for the last 24 hours:
No recent migrations
databases:
  • 添加shard副本集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#首先要登陆到shard副本集中查看那个是主节点,本次实验室使用了两个shard副本集 sh.addShard("<replSetName>/主节点IP/port") 
mongos> sh.addShard("shard/shard1:50000")
{ "shardAdded" : "shard", "ok" : 1 }

mongos> sh.addShard("sha/sha:60000")
{ "shardAdded" : "shard", "ok" : 1 }

mongos> sh.status() #查看分片集群已经成功把shard加入分片中
--- Sharding Status ---
sharding version: {
"_id" : 1,
"minCompatibleVersion" : 5,
"currentVersion" : 6,
"clusterId" : ObjectId("589b0cff36b0915841e2a0a2")
}
shards:
{ "_id" : "sha", "host" : "sha/sha1:60000,sha2:60001,sha3:60002", "state" : 1 }
{ "_id" : "shard", "host" : "shard/shard1:50000,shard2:50001,shard3:50002", "state" : 1 }
active mongoses:
"3.4.1" : 1
autosplit:
Currently enabled: yes
balancer:
Currently enabled: yes
Currently running: no
Balancer lock taken at Wed Feb 08 2017 20:20:16 GMT+0800 (CST) by ConfigServer:Balancer
Failed balancer rounds in last 5 attempts: 5
Last reported error: Cannot accept sharding commands if not started with --shardsvr
Time of Reported error: Thu Feb 09 2017 17:42:21 GMT+0800 (CST)
Migration Results for the last 24 hours:
No recent migrations
databa
  • 指定那个数据库使用分片,创建片键
1
2
3
4
5
6
mongos> sh.enableSharding("zhao")  #指定zhao数据库中使用分片
{ "ok" : 1 }

mongos> sh.shardCollection("zhao.call",{name:1,age:1}) #在zhao数据库和call集合中创建了name和age为升序的片键
{ "collectionsharded" : "zhao.call", "ok" : 1 }

  • 查看sh.status()信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
mongos> sh.status()
--- Sharding Status ---
sharding version: {
"_id" : 1,
"minCompatibleVersion" : 5,
"currentVersion" : 6,
"clusterId" : ObjectId("589b0cff36b0915841e2a0a2")
}
shards:
{ "_id" : "sha", "host" : "sha/sha1:60000,sha2:60001,sha3:60002", "state" : 1 }
{ "_id" : "shard", "host" : "shard/shard1:50000,shard2:50001,shard3:50002", "state" : 1 }
active mongoses:
"3.4.1" : 1
autosplit:
Currently enabled: yes
balancer:
Currently enabled: yes
Currently running: no
Balancer lock taken at Wed Feb 08 2017 20:20:16 GMT+0800 (CST) by ConfigServer:Balancer
Failed balancer rounds in last 5 attempts: 5
Last reported error: Cannot accept sharding commands if not started with --shardsvr
Time of Reported error: Thu Feb 09 2017 17:56:02 GMT+0800 (CST)
Migration Results for the last 24 hours:
No recent migrations
databases:
{ "_id" : "zhao", "primary" : "shard", "partitioned" : true }
zhao.call
shard key: { "name" : 1, "age" : 1 }
unique: false
balancing: true
chunks:
shard 1
{ "name" : { "$minKey" : 1 }, "age" : { "$minKey" : 1 } } -->> { "name" : { "$maxKey" : 1 }, "age" : { "$maxKey" : 1 } } on : shard Timestamp(1, 0)
  • 测试批量插入数据验证
1
mongos> for ( var i=1;i<10000000;i++){db.call.insert({"name":"user"+i,age:i})};
  • 查看当前是否已经分片到两个shard中去了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
mongos> sh.status()
--- Sharding Status ---
sharding version: {
"_id" : 1,
"minCompatibleVersion" : 5,
"currentVersion" : 6,
"clusterId" : ObjectId("589b0cff36b0915841e2a0a2")
}
shards:
{ "_id" : "sha", "host" : "sha/sha1:60000,sha2:60001,sha3:60002", "state" : 1 }
{ "_id" : "shard", "host" : "shard/shard1:50000,shard2:50001,shard3:50002", "state" : 1 }
active mongoses:
"3.4.1" : 1
autosplit:
Currently enabled: yes
balancer:
Currently enabled: yes
Currently running: no
Balancer lock taken at Wed Feb 08 2017 20:20:16 GMT+0800 (CST) by ConfigServer:Balancer
Failed balancer rounds in last 5 attempts: 5
Last reported error: Cannot accept sharding commands if not started with --shardsvr
Time of Reported error: Thu Feb 09 2017 17:56:02 GMT+0800 (CST)
Migration Results for the last 24 hours:
4 : Success
databases:
{ "_id" : "zhao", "primary" : "shard", "partitioned" : true }
zhao.call
shard key: { "name" : 1, "age" : 1 }
unique: false
balancing: true
chunks: #数据已经分片到两个chunks里面了
sha 4
shard 5
{ "name" : { "$minKey" : 1 }, "age" : { "$minKey" : 1 } } -->> { "name" : "user1", "age" : 1 } on : sha Timestamp(4, 1)
{ "name" : "user1", "age" : 1 } -->> { "name" : "user1", "age" : 21 } on : shard Timestamp(5, 1)
{ "name" : "user1", "age" : 21 } -->> { "name" : "user1", "age" : 164503 } on : shard Timestamp(2, 2)
{ "name" : "user1", "age" : 164503 } -->> { "name" : "user1", "age" : 355309 } on : shard Timestamp(2, 3)
{ "name" : "user1", "age" : 355309 } -->> { "name" : "user1", "age" : 523081 } on : sha Timestamp(3, 2)
{ "name" : "user1", "age" : 523081 } -->> { "name" : "user1", "age" : 710594 } on : sha Timestamp(3, 3)
{ "name" : "user1", "age" : 710594 } -->> { "name" : "user1", "age" : 875076 } on : shard Timestamp(4, 2)
{ "name" : "user1", "age" : 875076 } -->> { "name" : "user1", "age" : 1056645 } on : shard Timestamp(4, 3)
{ "name" : "user1", "age" : 1056645 } -->> { "name" : { "$maxKey" : 1 }, "age" : { "$maxKey" : 1 } } on : sha Timestamp(5, 0)
  • 查看当前分片中是否均匀的分配到连个shard当中,true是均匀的
    false不是均匀的
1
2
mongos> sh.getBalancerState()
true
选择sharing kes注意点
  • 考虑应该在哪里储存数据?
  • 应该在哪里读取数据?
  • sharding key 应该是主键
  • sharding key 应该你能尽量保证避免分片查询

sharing 进级

  • 如果sharing 分片不均匀没有分片均匀
  • sharding : 新增shard和移除shard
1
mongos> sh.addShard("sha4/192.168.2.10:21001")
Balancer
  • 开启Balncer
    开启Balancer之后,chunks之后会自动均分
1
mongos> sh.startBalancer()
  • 设置Balancer进程运行时间窗口
    默认情况ixaBalancing进程在运行时为降低Balancing进程对系统的影响,可以设置Balancer进程的运行时间窗口,让Balancer进程在指定时间窗口操作
1
2
#设置时间窗口
db.settings.update({ _id : "balancer" }, { $set : { activeWindow : { start : "23:00", stop : "6:00" } } }, true )
  • 查看Balancer运行时间窗口
1
2
3
4
5
6
# 查看Balancer时间窗口
mongos> db.settings.find();
{ "_id" : "balancer", "activeWindow" : { "start" : "23:00", "stop" : "6:00" }, "stopped" : false }

mongos> sh.getBalancerWindow()
{ "start" : "23:00", "stop" : "6:00" }
  • 删除Balancer进程运行时间窗口
1
2
3
4
 mongos> db.settings.update({ "_id" : "balancer" }, { $unset : { activeWindow : 1 }});
mongos> db.settings.find();
{ "_id" : "chunksize", "value" : 10 }
{ "_id" : "balancer", "stopped" : false }
在shell脚本中执行mongodb
1
[root@My-Dev ~]# echo  -e "use zhao \n  db.call.find()" |mongo --port 60001 
Mongodb片键的添加
  • 首先进入mongos的的admin数据库中
1
2
3
4
5
mongos> use admin
switched to db admin
mongos> db.runCommand({"enablesharding":"zl"}) #创建zl库中
{ "ok" : 1 }
mongos> db.runCommand(db.runCommand({"shardcollection":"$ent.t_srvappraise_back","key")
  • 分片脚本
1
2
3
4
5
6
7
8
9
10
#!/bin/bash
url=10.241.96.155
port=30000
ent=test1

./mongo $url:$port/admin <<EOF
db.runCommand({"enablesharding":"$ent"});
db.runCommand({"shardcollection":"$ent.t_srvappraise_back","key":{"sa_seid":"hashed"}})
exit;
EOF

参考地址

Java堆(Java Heap)是JVM所管理的最大内存区域,也是所有线程共享的一块区域,在JVM启动时创建。

此内存区域存放的都是对象的实例和数组。JVM规范中说到:”所有的对象实例以及数组都要在堆上分配”。

Java堆是垃圾回收器管理的主要区域,百分之九十九的垃圾回收发生在Java堆,另外百分之一发生在方法区,因此又称之为”GC堆”。根据JVM规范规定的内容,Java堆可以处于物理上不连续的内存空间中。

当前JVM对于堆的垃圾回收,采用分代收集的策略。根据堆中对象的存活周期将堆内存分为新生代和老年代。在新生代中,每次垃圾回收都有大批对象死去,只有少量存活。而老年代中存放的对象存活率高。

这样划分的目的是为了使 JVM 能够更好的管理堆内存中的对象,包括内存的分配以及回收。

标记清除算法

标记清除算法是最基础的回收算法,分为标记和清除两个部分:首先标记出所有需要回收的对象,这一过程在可达性分析过程中进行。在标记完之后统一回收所有被标记的对象。

标记清除算法有如下不足

  • 效率问题

标记和清除这两个过程的效率不高

  • 空间问题

清除之后会产生大量不连续的内存碎片,内存碎片太多会导致以后的程序运行中无法分配出较大的内存,从内不得不触发另外的垃圾回收。

http://static.cyblogs.com/20180617105859782.png

如上图中,经过标记清除之后,假设有了100M空间,但是这100M是不连续的,最大的一块连续空间可能才10M,所以导致之后程序需要一块20M内存空间时就不得不再进行一次GC来继续清理空间,效率极低。

鉴于标记清除算法有如上的缺陷,所以现在一般是用的是其的变种算法。

复制算法(新生代算法)

复制算法概念

复制算法是针对Java堆中的新生代内存垃圾回收所使用的回收策略,解决了”标记-清理”的效率问题。

复制算法将堆中可用的新生代内存按容量划分成大小相等的两块内存区域,每次只使用其中的一块区域。当其中一块内存区域需要进行垃圾回收时,会将此区域内还存活着的对象复制到另一块上面,然后再把此内存区域一次性清理掉。

这样做的好处是每次都是对整个新生代一半的内存区域进行内存回收,内存分配时也就不需要考虑内存碎片等复杂情况,只需要移动堆顶指针,按顺序分配即可。此算法实现简单,运行高效。算法的执行流程如下图 :

http://static.cyblogs.com/2018061711175251.png

现在主流的虚拟机,包括HotSpot都是采用的这种回收策略进行新生代内存的回收。

新生代内存划分

新生代中98%的对象都是”朝生夕死”的,所以并不需要按照1 : 1的比例来划分内存空间,而是将内存(新生代内存)分为一块较大的Eden(伊甸园)空间和两块较小的Survivor(幸存者)空间,每次使用Eden和其中一块Survivor(两个Survivor区域一个称为From区,另一个称为To区域)。

当进行垃圾回收时,将Eden和Survivor中还存活的对象一次性复制到另一块Survivor空间上,最后清理掉Eden和刚才用过的Survivor空间。

当Survivor空间不够用时,则需要依赖其他内存(老年代)进行分配担保。

HotSpot默认Eden与Survivor的大小比例是8 : 1,也就是说Eden:Survivor From : Survivor To = 8:1:1。所以每次新生代可用内存空间为整个新生代容量的90%,而剩下的10%用来存放回收后存活的对象。

HotSpot实现的复制算法流程如下:

 1. 当Eden区满的时候,会触发第一次Minor gc,把还活着的对象拷贝到Survivor From区;当Eden区再次触发Minor gc的时候,会扫描Eden区和From区域,对两个区域进行垃圾回收,经过这次回收后还存活的对象,则直接复制到To区域,并将Eden和From区域清空。

 2. 当后续Eden又发生Minor gc的时候,会对Eden和To区域进行垃圾回收,存活的对象复制到From区域,并将Eden和To区域清空。

 3. 部分对象会在From和To区域中复制来复制去,如此交换15次(由JVM参数MaxTenuringThreshold决定,这个参数默认是15),最终如果还是存活,就存入到老年代。

http://static.cyblogs.com/20180617112302206.png

发生在新生代的垃圾回收成为Minor GC,Minor GC又称为新生代GC,因为新生代对象大多都具备朝生夕灭的特性,因此Minor GC(采用复制算法)非常频繁,一般回收速度也比较快。

  1. 标记整理算法(老年代回收算法)
     复制算法在对象存活率较高的老年代会进行很多次的复制操作,效率很低,所以在栈的老年代不适用复制算法。

 针对老年代对象存活率高的特点,提出了一种称之为”标记-整理算法”。标记过程仍与”标记-清除”过程一致,但后续步骤不是直接对可回收对象进行清理,而是让所有存活对象都向一端移动,然后直接清理掉端边界以外的内存。流程图如下:

http://static.cyblogs.com/20180617112627331.png

发生在老年代的GC称为Full GC,又称为Major GC,其经常会伴随至少一次的Minor GC(并非绝对,在Parallel Scavenge收集器中就有直接进行Full GC的策略选择过程)。Major GC的速度一般会比Minor GC慢10倍以上。

参考地址

Java线程与Linux内核线程的映射关系Linux从内核2.6开始使用NPTL (Native POSIX Thread Library)支持,但这时线程本质上还轻量级进程。

Java里的线程是由JVM来管理的,它如何对应到操作系统的线程是由JVM的实现来确定的。Linux 2.6上的HotSpot使用了NPTL机制,JVM线程跟内核轻量级进程有一一对应的关系。线程的调度完全交给了操作系统内核,当然jvm还保留一些策略足以影响到其内部的线程调度,举个例子,在linux下,只要一个Thread.run就会调用一个fork产生一个线程。

Java线程在WindowsLinux平台上的实现方式,现在看来,是内核线程的实现方式。**这种方式实现的线程,是直接由操作系统内核支持的——由内核完成线程切换,内核通过操纵调度器(Thread Scheduler)实现线程调度,并将线程任务反映到各个处理器上。**内核线程是内核的一个分身。程序一般不直接使用该内核线程,而是使用其高级接口,即轻量级进程(LWP),也即线程。这看起来可能很拗口。看图:

Java线程与Linux内核线程的映射关系

(说明:KLT即内核线程Kernel Thread,是“内核分身”。每一个KLT对应到进程P中的某一个轻量级进程LWP(也即线程),期间要经过用户态、内核态的切换,并在Thread Scheduler 下反应到处理器CPU上。)

​ 这种线程实现的方式也有它的缺陷:在程序面上使用内核线程,必然在操作系统上多次来回切换用户态及内核态;另外,因为是一对一的线程模型,LWP的支持数是有限的。

对于一个大型程序,我们可以开辟的线程数量至少等于运行机器的cpu内核数量。java程序里我们可以通过下面的一行代码得到这个数量:

1
Runtime.getRuntime().availableProcessors();

所以最小线程数量即时cpu内核数量。如果所有的任务都是计算密集型的,这个最小线程数量就是我们需要的线程数。开辟更多的线程只会影响程序的性能,因为线程之间的切换工作,会消耗额外的资源。如果任务是IO密集型的任务,我们可以开辟更多的线程执行任务。当一个任务执行IO操作的时候,线程将会被阻塞,处理器立刻会切换到另外一个合适的线程去执行。如果我们只拥有与内核数量一样多的线程,即使我们有任务要执行,他们也不能执行,因为处理器没有可以用来调度的线程。

​ **如果线程有50%的时间被阻塞,线程的数量就应该是内核数量的2倍。**如果更少的比例被阻塞,那么它们就是计算密集型的,则需要开辟较少的线程。如果有更多的时间被阻塞,那么就是IO密集型的程序,则可以开辟更多的线程。于是我们可以得到下面的线程数量计算公式:线程数量=内核数量 / (1 – 阻塞率)

我们可以通过相应的分析工具或者javamanagement包来得到阻塞率的数值。

参考地址

Java中线程的状态分为6种。

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
    线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED):表示该线程已经执行完毕。

这6种状态定义在Thread类的State枚举中,可查看源码进行一一对应。

线程的状态图

http://static.cyblogs.com/20181120173640764.jpg

初始状态

实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态。

就绪状态

就绪状态只是说你资格运行,调度程序没有挑选到你,你就永远是就绪状态。
调用线程的start()方法,此线程进入就绪状态。
当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入就绪状态。
当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。
锁池里的线程拿到对象锁后,进入就绪状态。

运行中状态

线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。

阻塞状态

阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态。

等待

处于这种状态的线程不会被分配CPU执行时间,它们要等待被显式地唤醒,否则会处于无限期等待的状态。

超时等待

处于这种状态的线程不会被分配CPU执行时间,不过无须无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。

终止状态

当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

等待队列

调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) 代码段内。
与等待队列相关的步骤和图

http://static.cyblogs.com/20180701221233161.jpg

1、线程1获取对象A的锁,正在使用对象A。
2、线程1调用对象A的wait()方法。
3、线程1释放对象A的锁,并马上进入等待队列。
4、锁池里面的对象争抢对象A的锁。
5、线程5获得对象A的锁,进入synchronized块,使用对象A。
6、线程5调用对象A的notifyAll()方法,唤醒所有线程,所有线程进入同步队列。若线程5调用对象A的notify()方法,则唤醒一个线程,不知道会唤醒谁,被唤醒的那个线程进入同步队列。
7、notifyAll()方法所在synchronized结束,线程5释放对象A的锁。
8、同步队列的线程争抢对象锁,但线程1什么时候能抢到就不知道了。

同步队列状态

  • 当前线程想调用对象A的同步方法时,发现对象A的锁被别的线程占有,此时当前线程进入同步队列。简言之,同步队列里面放的都是想争夺对象锁的线程。
  • 当一个线程1被另外一个线程2唤醒时,1线程进入同步队列,去争夺对象锁。
  • 同步队列是在同步的环境下才有的概念,一个对象对应一个同步队列。
  • 线程等待时间到了或被notify/notifyAll唤醒后,会进入同步队列竞争锁,如果获得锁,进入RUNNABLE状态,否则进入BLOCKED状态等待获取锁。

几个方法的比较

1、Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。作用:给其它线程执行机会的最佳方式。
2、Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
3、thread.join()/thread.join(long millis),当前线程里调用其它线程t的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程t执行完毕或者millis时间到,当前线程一般情况下进入RUNNABLE状态,也有可能进入BLOCKED状态(因为join是基于wait实现的)。
4、obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
5、obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。
6、LockSupport.park()/LockSupport.parkNanos(long nanos),LockSupport.parkUntil(long deadlines), 当前线程进入WAITING/TIMED_WAITING状态。对比wait方法,不需要获得锁就可以让线程进入WAITING/TIMED_WAITING状态,需要通过LockSupport.unpark(Thread thread)唤醒。

疑问

  • 等待队列里许许多多的线程都wait()在一个对象上,此时某一线程调用了对象的notify()方法,那唤醒的到底是哪个线程?随机?队列FIFO?or sth else?Java文档就简单的写了句:选择是任意性的(The choice is arbitrary and occurs at the discretion of the implementation)。

参考地址

简介

ReentrantLocksynchronized都是提供了同步的功能,JDK1.6之后对synchronized性能进行了优化,所以两者的性能上几乎没什么区别,但是ReentrantLock提供了了一些高级功能。

  • 等待可中断:在synchronized中,如果一个线程在等待锁,他只用两种结果,要么获得锁执行完,要么一直保持等待。可中断的等待是通知正在等待的线程,告诉他没必要再等待后。

  • 实现公平锁:公平锁:会按照时间的先后顺序,保证先到先得。特点是它不会产生饥饿现象。而synchroized关键字进行所控制时,锁是非公平的。而重入锁可以设置为公平锁。 public ReetranLock(boolean fair)fairtrue时,表示锁是公平的。实现公平锁必然要求系统维护一个有序队列,因此公平锁的成本比较高,性能也非常低向。默认情况下锁是非公平的。

  • 绑定多个条件:类似于Object类的waitnotify方法,它是与ReentrantLock绑定的条件,可以绑定多个条件。

一个简单的例子

注意:退出临界区要释放锁,否则其他线程就没有机回访问临界区了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestReentrantLock implements Runnable {
public static ReentrantLock rlock = new ReentrantLock();
public static int i=0;
@Override
public void run(){
for(int j=0;j<1000000;j++){
rlock.lock();
try{
i++;
}finally {
rlock.unlock();
}
}
}

public static void main(String args[]) throws InterruptedException {
TestReentrantLock tl = new TestReentrantLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
//表示当前线程等待t1执行完
t1.join();
t2.join();
System.out.println(i);
}
}

注意:退出临界区要释放锁,否则其他线程就没有机回访问临界区了。

Lock接口

Lock接口是JDK1.5新加的同步工具接口,它的实现类有ReentrantLockWriteLock等,接口中定义了通用的方法:

1
2
3
4
5
6
7
8
9
10
11
12
void lock();
void unlock();
// 可中断获取锁,与lock()不同之处在于可响应中断操作,即在获取锁的过程中可中断
// synchronized在获取锁时是不可中断的
void lockInterruptibly() throws InterruptedException;
//尝试非阻塞获取锁,调用该方法后立即返回结果,如果能够获取则返回true,否则返回false
boolean tryLock();
//根据传入的时间段获取锁,在指定时间内没有获取锁则返回false,如果在指定时间内当前线程未被中并断获取到锁则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//获取等待通知组件,该组件与当前锁绑定,当前线程只有获得了锁
//才能调用该组件的wait()方法,而调用后,当前线程将释放锁。
Condition newCondition();

其中lockunlock方法提供了synchronized的功能,其他方法使得同步过程更加的灵活。

什么叫重入锁

一个线程可以多次进入,当然必须多次释放锁。

1
2
3
4
5
6
7
8
9
10
rlock.lock();
rlock.lock();
try{
i++;
}finally {
rlock.unlock();
rlock.unlock();
//如果释放次数多,则回抛出java.lang.IllegalMonitorStateException异常
//rlock.unlock();
}

下面根据案例主要介绍ReentrantLock的用法,在后面的文章中介绍它的实现原理。

中断响应

如果一个线程正在等待锁,那么它可以收到一个通知,被告知无序再等待,可以停止工作了。在synchronized中,
如果一个线程在等待锁,他只用两种结果,要么获得锁执行完,要么一直保持等待。

下面通过一个死锁的例子,介绍中断响应的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DeathLock implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public DeathLock(int lock){
this.lock = lock;
}
@Override
public void run() {
try{
if(lock==1){
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" get lock1");
try{
Thread.sleep(1000);
}catch(Exception e){}
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" get lock2");
}else{
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" get lock2");
try{
Thread.sleep(1000);
}catch(InterruptedException e){}
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" get lock1");
}
}catch(InterruptedException e){
e.printStackTrace();
}finally {
//判断当前线程是否拥有该锁
if(lock1.isHeldByCurrentThread())
lock1.unlock();
if(lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getName()+" 退出!");
}
}
public static void main(String args[]) throws InterruptedException{
//这里new出两个实现Runnable的对象是因为为了传进去不同的lock值
Thread thread1 = new Thread(new DeathLock(1),"thread1");
Thread thread2 = new Thread(new DeathLock(1),"thread2");
thread1.start();
thread2.start();
Thread.sleep(1000);
thread2.interrupt();
}
}

执行过程是thread1占用lock1,休眠500毫秒,然后想占用lock2,与此同时,thread2占用lock2,休眠1000毫秒后在请求lock1。可是当thread1,想请求lock2时,已经被thread2占用,因此只能进入阻塞状态,thread2也同理进入阻塞状态。因此进入死锁。但是这里使用了lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中可以响应中断。在thred2调用interrupt()方法,thread2线程被中断,thread2放弃对lock的申请,同时释放已获得的lock2,所以thread1可以得到lock2继续执行下去。

结果为:

http://static.cyblogs.com/20170809094510953.png

thread2先中断,抛出异常,跳入finally块,释放资源,最终退出。

锁申请等待限时

如果给定一个等待时间,超过时间,让线程自动放弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TimeLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if(lock.tryLock(2, TimeUnit.SECONDS)){
Thread.sleep(5000);
}else{
System.out.println(Thread.currentThread().getName()+" get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
if(lock.isHeldByCurrentThread())
lock.unlock();
}
}
public static void main(String args[]) {
TimeLock t = new TimeLock();
Thread thread1 = new Thread(t,"thread1");
Thread thread2 = new Thread(t,"thread2");
thread1.start();
thread2.start();
}
}

tryLock()两个参数分别表示等待时长和计时单位,表示线程在请求锁的过程中,最多等待5秒,如果超过改时间则返回false,如果成果获得锁,则返回true
该程序中首先任意一个线程先获得锁,然后休眠5秒,然而它一直占有锁,因此另一个线程无法再2秒内获得锁,因此失败。

tryLock()方法也可以不带参数,这种情况下,当前线程会尝试获得锁,如果锁未被其他线程占用,则申请锁会成功,把那个返回true,如果锁被其他线程占用,则当前线程不会等待,而是立即返回false。这种模式下不会引起线程等待,因此也不会产生死锁。

公平锁

大多数情况下,锁的申请都是非公平的,也就是说,线程1首先申请锁A,接着线程2也请求了锁A,当锁A可用时,线程1,2都有可能获得锁,系统只是在等待队列中随机挑选一个,因此不能保证公平性。
所以有了公平锁,公平锁:会按照时间的先后顺序,保证先到先得。特点是它不会产生饥饿现象。而synchroized关键字进行所控制时,锁是非公平的。而重入锁可以设置为公平锁。
public ReetranLock(boolean fair)
fairtrue时,表示锁是公平的。实现公平锁必然要求系统维护一个有序队列,因此公平锁的成本比较高,性能也非常低向。默认情况下锁是非公平的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FairLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock(true);
@Override
public void run() {
while(true)
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+" get lock !");
}finally{
lock.unlock();
}
}
public static void main(String args[]) {
FairLock f = new FairLock();
Thread thread1 = new Thread(f,"thread1");
Thread thread2 = new Thread(f,"thread2");
thread1.start();
thread2.start();
}
}

部分结果为:

http://static.cyblogs.com/20170809094651573.png

可以看出两个线程基本上是交替获得锁。

Condition条件(搭配重入锁使用)

Condition类似于wait()notify()的功能,它是与重入锁关联使用的。Lock接口中提供了newCondition()方法,该方法可以返回绑定到此LockCondition实例。

方法解释:
await方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()signalAll()方法时,线程会重新获得锁并继续执行,当线程被中断时,也能跳出等待。与Objectwait()方法相似。
singal()方法用于唤醒一个在等待中的线程。
注意:以上连个方法调用之前必须当前线程拥有锁。否则抛出IllegalMonitorStateException异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ConditionDemo implements Runnable {
public static ReentrantLock lock = new ReentrantLock(true);
public static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
//当前线程释放锁,进入等待状态
condition.await();
System.out.println(Thread.currentThread().getName()+" get lock !");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String args[]) throws InterruptedException {
ConditionDemo t = new ConditionDemo();
Thread thread1 = new Thread(t,"thread1");
thread1.start();
Thread.sleep(2000);
//当thread1进入处于等待状态,main线程获得锁
lock.lock();
System.out.println(Thread.currentThread().getName() + " get lock !");
condition.signal();
lock.unlock();
}
}
/* 结果:
main get lock !
thread1 get lock !
*/

thread1线程调用await时,要求线程持有相关的重入锁,调用后,线程释放这把锁,同理signal方法调用时,也要求线程先获得相关的锁,在signal方法调用后,系统会从当前的Condition对象的等待队列中唤醒一个线程,一旦线程唤醒,它会重新尝试获得之前绑定的锁,一旦成功获取await方法返回,继续执行。在调用signal后先睡眠2秒,并且保持了锁,释放了锁之后,await方法获取锁后才得以返回继续执行。因此打印出来的时间差为2000毫秒。

最后

上面结合例子介绍了ReentrantLock主要的用法,还有一些很有意思的用法,比如正在等待锁的线程,当前线程是否拥有锁等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//查询当前线程保持此锁的次数。
int getHoldCount()
//返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
protected Thread getOwner();
//返回一个 collection,它包含可能正等待获取此锁的线程,其内部维持一个队列,这点稍后会分析。
protected Collection<Thread> getQueuedThreads();
//返回正等待获取此锁的线程估计数。
int getQueueLength();
// 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition);
//返回等待与此锁相关的给定条件的线程估计数。
int getWaitQueueLength(Condition condition);
// 查询给定线程是否正在等待获取此锁。
boolean hasQueuedThread(Thread thread);
//查询是否有些线程正在等待获取此锁。
boolean hasQueuedThreads();
//查询是否有些线程正在等待与此锁有关的给定条件。
boolean hasWaiters(Condition condition);
//如果此锁的公平设置为 true,则返回 true。
boolean isFair()
//查询当前线程是否保持此锁。
boolean isHeldByCurrentThread()
//查询此锁是否由任意线程保持。
boolean isLocked()

进入ReentrantLock的源码发现,ReentrantLock类的绝大部分功能是通过它的内部类Sync来实现的,而Sync又继承了AbstractQueuedSynchronizer类。这就是大名鼎鼎的AQSDoug Lea最著名的作品,后面的文章分析它的精华所在。

参考地址

前言

任务和线程的启动很容易。在大多数时候,我们都会让它们运行直到结束,或者让它们自行停止。然而,有时候我们希望提前结束任务或线程,或许是因为用户取消了操作,或者应用程序需要被快速关闭。

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java 没有提供任何机制来安全的终止线程。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清除如何执行清除工作。

正题

在开始文章前,有几个问题需要思考一下:

  • 取消任务的方式由哪几种?
  • 中断的策略是什么?
  • 如何响应中断?
取消任务的方式有哪几种

取消任务的方式大体上有一下两种:

  • 设置取消标志位
  • 中断
设置取消标志位

设置某个“已请求取消”标志,而任务将定期地查看该标志。如果设置了这个标记,那么任务将提前结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;

public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}

public void cancel() {
cancelled = true;
}

public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}

static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
}

上面代码使用了这项技术,其中的 PrimeGenerator 持续地枚举素数,知道它被取消。cancel 方法将设置 cancelled 标志,并且主循环在搜索下一个素数之前会首先检查这个标志(为了使这个过程能可靠的工作,标志 cancelled 必须为 volatile 类型)。

PrimeGenerator 使用了一种简单的取消策略:客户代码通过调用 cancel 来请求取消,PrimeGenerator 在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。

一个可取消的任务必须拥有取消策略,在这个策略中将详细地定义取消操作的“How”“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

中断

PrimeGenerator 中的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如 BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志位,因此永远不会结束。

接下来的代码说明了这个问题。生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put 方法也会阻塞。当生产者在 put 方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用 cancel 方法设置 cancelled 标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的 put 方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以 put 方法将一直保持阻塞状态)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
private volatile boolean needMoreStatus = false;

BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException consumed) {
}
}

public void cancel() {
cancelled = true;
}

public synchronized BlockingQueue<BigInteger> get() {
return queue;
}

static BlockingQueue<BigInteger> aSecondOfPrimes() throws InterruptedException {
BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(10);
BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
producer.start();
try {
SECONDS.sleep(1);
} finally {
producer.cancel();
}
return producer.get();
}
}

BrokenPrimeProducer 说明了一些自定义的取消机制无法与可阻塞的库函数实现良好交互的原因。如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用许多库类中提供的中断支持。通常,中断是实现取消的最合理方式。

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

JavaAPI 或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true。在 Thread 中包含了中断线程以及查询线程中断状态的方法。。interrupt 方法能中断目标线程,而 isInterrupted 方法能返回目标线程的中断状态。静态的 interrupted 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

1
2
3
4
5
6
7
8
public class Thread{
// 中断线程
public void interrupt() { ... }
// 中断状态
public boolean isInterrupted() { ... }
// 清除中断状态
public static boolean interrupted(){ ... }
}

阻塞库方法,例如 Thread.sleepObject.wait 等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出 InterruptedException,表示阻塞操作由于中断而提前结束。JVM 并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得“有黏性”——如果不触发 InterruptedException,那么中断状态一直保持,直到明确地清除中断状态。

调用 interrupt 并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己(这些时刻也被称为取消点)。有些方法,例如 waitsleepjoin 等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已经被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计槽糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求做出响应。

在使用静态的 interrupted 时应该小心,因为它会清除当前线程的中断状态。如果调用 interrupted 时返回了 true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出 InterruptedException,或者通过再次调用 interrupt 来恢复中断状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}

public void cancel() {
interrupt();
}

public synchronized BlockingQueue<BigInteger> get() {
return queue;
}
}

在上面代码中,在每次迭代循环中,有两个位置可以检测出中断:在阻塞的 put 方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的 put 方法,因此这里并不一定需要进行显式的检测,但执行检测却会使 PrimeProducer 对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。如果可中断的阻塞方法的调用频率并不高,不足以获得足够的响应性,那么显式的检测中断状态能起到一定的帮助作用。

中断策略是什么

正如任务中应该包含取消策略一样,线程同样应该包含中断策略中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多块的速度来响应中断。

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准终端策略的线程或线程池,只能用于能知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。

任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心的保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。

这就是为什么大多数可阻塞的库函数都只是抛出 InterruptedException 作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因为它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以取消进一步的操作。

当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出 InterruptedException 或者表示已经收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中心包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将 InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕获 InterruptedException 之后恢复中断状态:

1
Thread.currentThread().interrupt();

正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

如何响应中断

当调用可中断的阻塞库函数时,例如 Thread.sleepBlockingQueue.put 等,有两种使用策略可用来处理 InterruptedException

传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。
传递 InterruptedException 与将 InterruptedException 添加到 throws 字句中一样容易,如下代码清单:

1
2
3
4
5
6
private BlockingQueue<BigInteger> queue;
...

public BigInteger getNextInteger() throws InterruptedException {
return queue.take();
}

如果不想或无法传递 InterruptedException(或许通过 Runnable 来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是再次调用 interrupt 来恢复中断状态。你不能屏蔽 InterruptedException,例如在 catch 块中捕获到异常却不做任何处理,除非在你的代码中实现了线程的中断策略。虽然 PrimeProducer 屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获 InterruptedException 时恢复状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public BigInteger getNextInteger(BlockingQueue<BigInteger> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// 重新尝试
}
}
} finally {
if(interrupted)
Thread.currentThread().interrupt();
}
}

如上代码,如果过早的设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已经被设置时会立即抛出 InterruptedException(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步指示(当访问这些信息时,要确保使用同步)。

参考地址

1、G1垃圾收集器介绍

G1垃圾收集器针对具有大量内存的多处理器机器。它试图以很高的概率满足GC停顿时间目标,同时实现高吞吐量且几乎不需要配置。G1旨在在延迟和吞吐量之间提供最佳平衡,应用场景包括如下环境特征:

  • 堆大小可达10 GB或更大,超过50%的Java堆占用实时数据。
  • 随着时间的推移,对象分配速度和晋升(从新生代到老年代的晋升)速度会发生显著变化。
  • 堆中大量的碎片。
  • 可预测的时间停顿目标不超过几百毫秒,避免长时间垃圾收集停顿。

G1取了CMS,G1也是默认的收集器(JVM9、JVM10)。

G1收集器有很高的性能,并尝试通过以下几节所述的几种方式来满足停顿时间的目标。

2、启用G1收集器

G1是默认收集器,因此通常不需要执行任何其他操作。您可以通过在命令行上提供-XX:+ UseG1GC来显式启用它。

3、基本概念

G1是分代的、增量的、并行、大部分、并发的、stop-the-word、以及疏散(将活着的对象从一个区域(youngor young + old)拷贝到另一个区域)的垃圾收集器,用于监视每个stop-the-word停顿的停顿时间目标。与其他收集器类似,G1将堆分成(虚拟)新生代和老年代。空间回收的主要集中在年轻代,因为这样做最有效率,在老年代偶尔会有空间回收。

一些操作总是在stop-the-word停顿中执行以提高吞吐量。其他需要更多时间停止应用程序的操作(例如全局标记等全堆操作)将与应用程序同时并行并发执行。为了让空间回收stop-the-word停顿短,G1逐步地并行执行空间回收。G1通过跟踪以前的应用程序行为信息和垃圾收集停顿的信息来建立相关成本的模型,以此实现可预测性。它使用这些信息来调整在停顿中完成的工作。例如,G1首先在最有效率的区域回收空间(即大部分填充垃圾的区域,因此是名称)。

G1主要通过疏散回收空间:回收时,在选定的内存区域内发现的活动对象被复制到新的内存区域,在此过程中压缩它们。疏散完成后,由先前活动对象占据的空间被重新用于应用程序的分配。

垃圾收集器不是实时收集器。 它试图在较长的时间内,以很高的概率满足设定的停顿时间的目标,但对于给定的停顿,并不总是具有绝对的确定性。

3.1、堆布局

G1将堆分成一组相同大小的region,每个region占有一个连续的虚拟内存地址,如图1所示。 region是内存分配和内存回收的基本单位。 在任何给定的时间,这些region中的每一个都可以是空的(浅灰色),或者已经分配给特定的年轻代或老年代。 当内存请求出现时,内存管理器会拿出空闲区域。 内存管理器将它们分配给某一代,然后将它们作为空闲空间返回给应用程序,在该空间中,它可以分配自身。

图1-G1堆内存布局

[图1-G1堆内存布局]

年轻代包含eden区(红色)和幸存区(包含“S”的红色)。 这些region提供与其他收集器中相应的连续空间相同的功能,区别在于,在G1中,这些region通常在内存中以非连续模式布局。老region(浅蓝色)组成了老年代。对于跨越多个区域的物体,老年代region可能会变得很大(包含“H”的浅蓝色)。

应用程序总是将内存分配到新生代,即eden region,但是除了那些被直接分配到老年代的大对象。

G1 GC停顿可以回收整个新生代的空间,并且在任何收集停顿时,任何附加的老年代region都可以回收。停顿期间,G1将此收集集合中的对象复制到堆中的一个或多个不同region。对象的目的region域取决于该对象的源region:整个新生代被复制到幸存者或老年代region,老年代region的对象复制到其他不同老年代region。

3.2、GC周期

在较高的水平上,G1收集器在两个阶段之间交替。 young-only阶段包含垃圾收集,这些垃圾收集逐渐用老年代的对象来填充当前可用的内存。space-reclamation阶段是G1除了处理年轻一代之外,逐步回收老一代的空间。然后,循环以young-only阶段重新开始。

图2以一个可能发生的垃圾收集停顿序列为例说明了这个周期。

img

[图2垃圾收集周期概述]

以下列表详细介绍了G1垃圾回收周期的这两个阶段,以及它们的停顿和转换过程:

1、young-only阶段:这一阶段从对象晋升到老年代的收集开始。当老年代占用率达到某一阈值(Initiating Heap Occupancy threshold)时,young-only阶段和space-reclamation阶段之间的转换就开始了。在这个时候,G1安排了一个初始标记的young-only收集,而不是一个普通的young-only收集。

  • 初始标记:除了执行常规的young-only的收集之外,这种类型的收集开始标记过程。并发标记确定老年代region中的所有当前可到达(实时)对象将保留到以下space-reclamation阶段。标记尚未完全结束时,可能会发生常规新生代收集。标记结束了两个特殊的stop-the-word停顿:重新标注和清理。
  • 重新标记:此停顿完成标记本身,并执行全局引用处理和类卸载。在重新标记和清理阶段之间G1并发计算对象活跃度概要信息,并将在清理停顿中用于更新内部数据结构。
  • 清理:此停顿也回收完全空白的区域,并确定space-reclamation阶段是否会实际执行。如果有space-reclamation阶段,那么young-only阶段完成一次young-only收集。

2、space-reclamation阶段:这一阶段由多个混合收集组成,除了新生代region之外,还会疏散老年代region存活对象。当G1确定疏散更多老一代的地区不会产生足够的可用空间时,space-reclamation阶段结束。

在space-reclamation之后,收集周期将以另一个young-only阶段重新开始。 作为备选,如果应用程序在收集对象存活信息时内存溢出,G1像其他收集器一样就地执行stop-the-word全局堆压缩(Full GC)。

4、G1内部

本节介绍G1 GC的一些重要细节。

4.1、确定Initiating HeapOccupancy

The Initiating Heap Occupancy Percent(IHOP)是触发初始标记收集的阈值,它被定义为老年代大小的百分比。

默认情况下,G1通过观察标记周期中标记需要多长时间以及老年代通常分配多少内存来自动确定最佳IHOP。这个功能称为Adaptive IHOP。如果此功能处于活动状态,在没有足够的观察值来很好地预测Initiating Heap Occupancy阈值的情况下,选项-XX:InitiatingHeapOccupancyPercent会以当前老年代的大小的百分比来确定初始值。 使用选项-XX:-G1UseAdaptiveIHOP关闭G1的这种行为。 在这种情况下,-XX:InitiatingHeapOccupancyPercent的值始终确定此阈值。

在内部,AdaptiveIHOP尝试设置Initiating Heap Occupancy,以便在老年代占有率处于当前最大老年代大小减去作为额外缓冲区的-XX:G1HeapReservePercent值时,开始空间回收阶段的第一个混合GC。

4.2、标记

G1标记使用称为 Snapshot-At-The-Beginning (SATB)的算法。 它在初始标记停顿时获取堆的虚拟快照,当标记开始时处于活动状态的所有对象,在标记剩余部分也被认为是活动的对象。这意味着标记期间变为死亡(无法访问)的对象,对space-reclamation阶段仍然被认为是存活的(有一些例外)。与其他收集器相比,这可能会导致一些额外的内存被错误保留。但是,在重新标记停顿期间,SATB可能会提供更好的延迟。在该标记过程中,过于保守考虑的存活对象,将在下一次标记过程中回收。

4.3、在堆内存紧张下的行为

当应用程序保持如此多的内存,疏散过程无法找到足够的空间进行复制时,会发生疏散失败。疏散失败意味着G1试图通过以下方式来完成当前GC,保留任何已经移动到新位置的对象,不复制任何尚未移动的对象,只调整对象之间的引用。疏散失败可能会带来一些额外开销,但通常应该像其他年轻代收集一样快。在疏散失败的GC之后,G1将照常恢复应用程序,无需任何其他措施。G1假定疏散失败发生在GC结束附近; 也就是说,大多数对象已经移动并且有足够的空间继续运行应用程序,直到标记完成并开始space-reclamation。

如果这个假设不成立,那么G1最终将安排一个fullGC。 这种类型的收集就地执行整个堆的压缩。 这可能非常缓慢。

4.4、大对象

大对象是大于或等于半个region大小的对象。 除非使用-XX:G1HeapRegionSize选项进行设置,否则当前region的尺寸按照人体工程学设计确定,参考“G1 GC人体工程学默认值”章节所述。

这些大对象有时以特殊的方式进行处理:

  • 每一个大对象都被分配为老年代的一系列连续region。对象的开始位置始终位于该序列中第一个region的开始位置。该序列中最后一个region的剩余空间将丢失,直到整个对象被回收。
  • 一般来说,只有在清理停顿期间的标记结束时,或者在Full GC期间,如果大对象变得无法到达,则可以被回收。但是,对于原始类型数组的大对象(例如bool,各种整数和浮点值)有一个特殊规定。如果G1在任何类型的GC停顿时都没有被任何对象引用,那么G1会尝试回收大对象。此行为默认启用,但可以使用选项-XX:G1EagerReclaimHumongousObjects将其禁用。
  • 大对象的分配可能会导致GC停顿过早发生。 G1会在每个大对象分配中检查InitiatingHeap Occupancy阈值,如果当前占用率超过该阈值,可能会立即强制新生代收集初始标记。
  • 大对象不会移动,即使在fullGC中也不会移动。 这可能会导致过早执行缓慢的fullGC或意外的内存溢出情况,尽管region空间碎片留下大量剩余空间。
4.5、young-only阶段代大小设置

在young-only阶段,要收集的region集合(收集集合)只包括新生代region。G1一直在young-only 收集结束时设置新生代大小。这样,G1根据实际暂停时间的长期观察值,就可以满足使用-XX:MaxGCPauseTimeMillis和-XX:PauseTimeIntervalMillis设置的停顿时间目标。它考虑了新生代同样规模的疏散需要多长时间。这包括诸如在收集期间需要复制多少个对象以及这些对象之间是如何相互关联的信息。

如果没有其他约束,则G1适应性地将年轻代大小设定为-XX:G1NewSizePercent和-XX:G1MaxNewSizePercent确定的值以满足停顿时间。

4.6、Space-reclamation阶段代大小设置

在space-reclamation阶段,G1试图在单个垃圾收集停顿中最大回收化老年代空间。新生代的大小设置为允许的最小值,通常由-XX:G1NeSizeSizePercent确定,并且任何老年代region回收空间都会被添加,直到G1确定添加更多region将超过停顿时间的目标。在特定的GC停顿中,G1按其回收效率、剩余可用时间顺序添加老年代region,以获得最终收集集合。

要收集的潜在候选老年代region(收集集合候选region)的低端数量除以由-XX:G1MixedGCCountTarget确定的space-reclamation阶段的长度,得到商X,每次GC所采用的老年代region数量由商X确定下界。收集集合候选region:在本阶段开始时所占用的(小于-XX:G1MixedGCLiveThresholdPercent)老年代region

当收集组候选region中可回收的剩余空间量小于-XX:G1HeapWastePercent设置的百分比时,space-reclamation阶段结束。

5、G1 GC的人机工程学默认值

本主题概述了G1的最重要的参数及其默认值。他们给出了对预期行为和资源使用情况粗略概述,没有任何其他选项。

img
**

注意:意味着实际的值是由人体工程学决定的,这取决于环境。

6、与其他收集器对比

这是对G1和其他收集器之间主要区别的总结:

  • Parallel GC在老年代中全部压缩和回收空间。G1逐渐将这个工作分布在多个更短的收集中。这显著缩短了暂停时间,但可能会降低吞吐量。
  • 与CMS相似,G1并发执行部分老年代space-reclamation。然而,CMS不能将老年代堆碎片整理出来,最终会运行长时间的full GC。
  • G1的开销可能比其他收集器高,因为它的并发特性影响了吞吐量。

由于它的工作原理,G1有一些独特的机制来提高垃圾收集效率:

  • 在任何收集过程中,G1都可以回收老年代的一些完全空的、大的区域。这可以避免许多不必要的垃圾收集,在不费多大力气的情况下释放大量的空间。
  • G1可以选择性地尝试同时在Java堆上删除重复的字符串。

从老年代中回收空的大型对象总是启用的。您可以使用选项-XX:-G1EagerReclaimHumongousObjects来禁用该特性。在默认情况下,字符串重复删除是禁用的。你可以使用选项-XX:+ G1EnableStringDeduplication来启用它。

由于水平有限,翻译的不好,欢迎批评指正。

参考地址

前言

BlockingQueue即阻塞队列,它算是一种将ReentrantLock用得非常精彩的一种表现,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

http://static.cyblogs.com/20161108212521456.png

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueueDelayQueueLinkedBlockingDequeLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。下面的源码以ArrayBlockingQueue为例。

分析

BlockingQueue内部有一个ReentrantLock,其生成了两个Condition,在ArrayBlockingQueue的属性声明中可以看见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

...

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

而如果能把notEmptynotFullput线程、take线程拟人的话,那么我想puttake操作可能会是下面这种流程:

put(e)

http://static.cyblogs.com/20161108212418173.png

take()

http://static.cyblogs.com/20161108212452384.png

其中ArrayBlockingQueue.put(E e)源码如下(其中中文注释为自定义注释,下同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 如果队列已满,则等待
insert(e);
} finally {
lock.unlock();
}
}

/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); // 有新的元素被插入,通知等待中的取走元素线程
}

ArrayBlockingQueue.take()源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 如果队列为空,则等待
return extract();
} finally {
lock.unlock();
}
}

/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); // 有新的元素被取走,通知等待中的插入元素线程
return x;
}

可以看见,put(E)与take()是同步的,在put操作中,当队列满了,会阻塞put操作,直到队列中有空闲的位置。而在take操作中,当队列为空时,会阻塞take操作,直到队列中有新的元素。

而这里使用两个Condition,则可以避免调用signal()时,会唤醒相同的put或take操作。

参考地址

0%