简栈文化

Java技术人的成长之路~

我们用Spring就是因为它能帮我们很好的管理Bean,如果我们能充分的理解Bean的生命周期,就能在想要的环节去做想做的事情。

周期只有四个!

是的,Spring Bean的生命周期只有这四个阶段。把这四个阶段和每个阶段对应的扩展点糅合在一起虽然没有问题,但是这样非常凌乱,难以记忆。要彻底搞清楚Spring的生命周期,首先要把这四个阶段牢牢记住。实例化和属性赋值对应构造方法和setter方法的注入,初始化和销毁是用户能自定义扩展的两个阶段。在这四步之间穿插的各种扩展点,稍后会讲。

    1. 实例化 Instantiation
    1. 属性赋值 Populate
    1. 初始化 Initialization
    1. 销毁 Destruction

实例化 -> 属性赋值 -> 初始化 -> 销毁

主要逻辑都在doCreate()方法中,逻辑很清晰,就是顺序调用以下三个方法,这三个方法与三个生命周期阶段一一对应,非常重要,在后续扩展接口分析中也会涉及。

    1. createBeanInstance() -> 实例化
    1. populateBean() -> 属性赋值
    1. initializeBean() -> 初始化

源码如下,能证明实例化,属性赋值和初始化这三个生命周期的存在。关于本文的Spring源码都将忽略无关部分,便于理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 忽略了无关代码
protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final @Nullable Object[] args)
throws BeanCreationException {

// Instantiate the bean.
BeanWrapper instanceWrapper = null;
if (instanceWrapper == null) {
// 实例化阶段!
instanceWrapper = createBeanInstance(beanName, mbd, args);
}

// Initialize the bean instance.
Object exposedObject = bean;
try {
// 属性赋值阶段!
populateBean(beanName, mbd, instanceWrapper);
// 初始化阶段!
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
}

至于销毁,是在容器关闭时调用的,详见ConfigurableApplicationContext#close()

常用扩展点

Spring生命周期相关的常用扩展点非常多,所以问题不是不知道,而是记不住或者记不牢。其实记不住的根本原因还是不够了解,这里通过源码+分类的方式帮大家记忆。

第一大类:影响多个Bean的接口

实现了这些接口的Bean会切入到多个Bean的生命周期中。正因为如此,这些接口的功能非常强大,Spring内部扩展也经常使用这些接口,例如自动注入以及AOP的实现都和他们有关。

  • BeanPostProcessor
  • InstantiationAwareBeanPostProcessor

这两兄弟可能是Spring扩展中最重要的两个接口!InstantiationAwareBeanPostProcessor作用于实例化阶段的前后,BeanPostProcessor作用于初始化阶段的前后。正好和第一、第三个生命周期阶段对应。通过图能更好理解:

http://static.cyblogs.com/SpringBean的生命周期.jpg

InstantiationAwareBeanPostProcessor实际上继承了BeanPostProcessor接口,严格意义上来看他们不是两兄弟,而是两父子。但是从生命周期角度我们重点关注其特有的对实例化阶段的影响,图中省略了从BeanPostProcessor继承的方法。

1
InstantiationAwareBeanPostProcessor extends BeanPostProcessor
InstantiationAwareBeanPostProcessor源码分析:
  • postProcessBeforeInstantiation调用点,忽略无关代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected Object createBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args)
throws BeanCreationException {

try {
// Give BeanPostProcessors a chance to return a proxy instead of the target bean instance.
// postProcessBeforeInstantiation方法调用点,这里就不跟进了,
// 有兴趣的同学可以自己看下,就是for循环调用所有的InstantiationAwareBeanPostProcessor
Object bean = resolveBeforeInstantiation(beanName, mbdToUse);
if (bean != null) {
return bean;
}
}

try {
// 上文提到的doCreateBean方法,可以看到
// postProcessBeforeInstantiation方法在创建Bean之前调用
Object beanInstance = doCreateBean(beanName, mbdToUse, args);
if (logger.isTraceEnabled()) {
logger.trace("Finished creating instance of bean '" + beanName + "'");
}
return beanInstance;
}
}

可以看到,postProcessBeforeInstantiationdoCreateBean之前调用,也就是在bean实例化之前调用的,英文源码注释解释道该方法的返回值会替换原本的Bean作为代理,这也是Aop等功能实现的关键点。

  • postProcessAfterInstantiation调用点,忽略无关代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void populateBean(String beanName, RootBeanDefinition mbd, @Nullable BeanWrapper bw) {

// Give any InstantiationAwareBeanPostProcessors the opportunity to modify the
// state of the bean before properties are set. This can be used, for example,
// to support styles of field injection.
boolean continueWithPropertyPopulation = true;
// InstantiationAwareBeanPostProcessor#postProcessAfterInstantiation()
// 方法作为属性赋值的前置检查条件,在属性赋值之前执行,能够影响是否进行属性赋值!
if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) {
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof InstantiationAwareBeanPostProcessor) {
InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
if (!ibp.postProcessAfterInstantiation(bw.getWrappedInstance(), beanName)) {
continueWithPropertyPopulation = false;
break;
}
}
}
}
// 忽略后续的属性赋值操作代码
}

可以看到该方法在属性赋值方法内,但是在真正执行赋值操作之前。其返回值为boolean,返回false时可以阻断属性赋值阶段(continueWithPropertyPopulation = false;)。

关于BeanPostProcessor执行阶段的源码穿插在下文Aware接口的调用时机分析中,因为部分Aware功能的就是通过他实现的!只需要先记住BeanPostProcessor在初始化前后调用就可以了。

第二大类:只调用一次的接口

这一大类接口的特点是功能丰富,常用于用户自定义扩展。
第二大类中又可以分为两类:

  1. Aware类型的接口
  2. 生命周期接口
无所不知的Aware

Aware类型的接口的作用就是让我们能够拿到Spring容器中的一些资源。基本都能够见名知意,Aware之前的名字就是可以拿到什么资源,例如BeanNameAware可以拿到BeanName,以此类推。调用时机需要注意:所有的Aware方法都是在初始化阶段之前调用的!
Aware接口众多,这里同样通过分类的方式帮助大家记忆。
Aware接口具体可以分为两组,至于为什么这么分,详见下面的源码分析。如下排列顺序同样也是Aware接口的执行顺序,能够见名知意的接口不再解释。

Aware Group1

  1. BeanNameAware
  2. BeanClassLoaderAware
  3. BeanFactoryAware

Aware Group2

  1. EnvironmentAware
  2. EmbeddedValueResolverAware 这个知道的人可能不多,实现该接口能够获取Spring EL解析器,用户的自定义注解需要支持spel表达式的时候可以使用,非常方便。
  3. ApplicationContextAware(ResourceLoaderAware\ApplicationEventPublisherAware\MessageSourceAware) 这几个接口可能让人有点懵,实际上这几个接口可以一起记,其返回值实质上都是当前的ApplicationContext对象,因为ApplicationContext是一个复合接口,如下:
1
2
3
4
public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory,
MessageSource, ApplicationEventPublisher, ResourcePatternResolver {

}

这里涉及到另一道面试题,ApplicationContextBeanFactory的区别,可以从ApplicationContext继承的这几个接口入手,除去BeanFactory相关的两个接口就是ApplicationContext独有的功能,这里不详细说明。

Aware调用时机源码分析

详情如下,忽略了部分无关代码。代码位置就是我们上文提到的initializeBean方法详情,这也说明了Aware都是在初始化阶段之前调用的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 见名知意,初始化阶段调用的方法
protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {

// 这里调用的是Group1中的三个Bean开头的Aware
invokeAwareMethods(beanName, bean);

Object wrappedBean = bean;

// 这里调用的是Group2中的几个Aware,
// 而实质上这里就是前面所说的BeanPostProcessor的调用点!
// 也就是说与Group1中的Aware不同,这里是通过BeanPostProcessor(ApplicationContextAwareProcessor)实现的。
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
// 下文即将介绍的InitializingBean调用点
invokeInitMethods(beanName, wrappedBean, mbd);
// BeanPostProcessor的另一个调用点
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);

return wrappedBean;
}

可以看到并不是所有的Aware接口都使用同样的方式调用。Bean××Aware都是在代码中直接调用的,而ApplicationContext相关的Aware都是通过BeanPostProcessor#postProcessBeforeInitialization()实现的。感兴趣的可以自己看一下ApplicationContextAwareProcessor这个类的源码,就是判断当前创建的Bean是否实现了相关的Aware方法,如果实现了会调用回调方法将资源传递给Bean
至于Spring为什么这么实现,应该没什么特殊的考量。也许和Spring的版本升级有关。基于对修改关闭,对扩展开放的原则,Spring对一些新的Aware采用了扩展的方式添加。

BeanPostProcessor的调用时机也能在这里体现,包围住invokeInitMethods方法,也就说明了在初始化阶段的前后执行。

关于Aware接口的执行顺序,其实只需要记住第一组在第二组执行之前就行了。每组中各个Aware方法的调用顺序其实没有必要记,有需要的时候点进源码一看便知。

简单的两个生命周期接口

至于剩下的两个生命周期接口就很简单了,实例化和属性赋值都是Spring帮助我们做的,能够自己实现的有初始化和销毁两个生命周期阶段。

  1. InitializingBean 对应生命周期的初始化阶段,在上面源码的invokeInitMethods(beanName, wrappedBean, mbd);方法中调用。
    有一点需要注意,因为Aware方法都是执行在初始化方法之前,所以可以在初始化方法中放心大胆的使用Aware接口获取的资源,这也是我们自定义扩展Spring的常用方式。
    除了实现InitializingBean接口之外还能通过注解或者xml配置的方式指定初始化方法,至于这几种定义方式的调用顺序其实没有必要记。因为这几个方法对应的都是同一个生命周期,只是实现方式不同,我们一般只采用其中一种方式。
  2. DisposableBean 类似于InitializingBean,对应生命周期的销毁阶段,以ConfigurableApplicationContext#close()方法作为入口,实现是通过循环取所有实现了DisposableBean接口的Bean然后调用其destroy()方法 。感兴趣的可以自行跟一下源码。

扩展阅读: BeanPostProcessor 注册时机与执行顺序

Spring所做的事情就是把各种方式定义的Java类变成它的BeanDefinition,然后通过Bean工厂变成Bean放入到它的各种容器中,这样子就被Spring所管理了。

http://static.cyblogs.com/Spring%20Bean定义.jpg

注册时机

我们知道BeanPostProcessor也会注册为Bean,那么Spring是如何保证BeanPostProcessor在我们的业务Bean之前初始化完成呢?
请看我们熟悉的refresh()方法的源码,省略部分无关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {

try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);

// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);

// Register bean processors that intercept bean creation.
// 所有BeanPostProcesser初始化的调用点
registerBeanPostProcessors(beanFactory);

// Initialize message source for this context.
initMessageSource();

// Initialize event multicaster for this context.
initApplicationEventMulticaster();

// Initialize other special beans in specific context subclasses.
onRefresh();

// Check for listener beans and register them.
registerListeners();

// Instantiate all remaining (non-lazy-init) singletons.
// 所有单例非懒加载Bean的调用点
finishBeanFactoryInitialization(beanFactory);

// Last step: publish corresponding event.
finishRefresh();
}

}

可以看出,Spring是先执行registerBeanPostProcessors()进行BeanPostProcessors的注册,然后再执行finishBeanFactoryInitialization初始化我们的单例非懒加载的Bean

执行顺序

BeanPostProcessor有很多个,而且每个BeanPostProcessor都影响多个Bean,其执行顺序至关重要,必须能够控制其执行顺序才行。关于执行顺序这里需要引入两个排序相关的接口:PriorityOrdered、Ordered

  • PriorityOrdered是一等公民,首先被执行,PriorityOrdered公民之间通过接口返回值排序
  • Ordered是二等公民,然后执行,Ordered公民之间通过接口返回值排序
  • 都没有实现是三等公民,最后执行

在以下源码中,可以很清晰的看到Spring注册各种类型BeanPostProcessor的逻辑,根据实现不同排序接口进行分组。优先级高的先加入,优先级低的后加入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
// 首先,加入实现了PriorityOrdered接口的BeanPostProcessors,顺便根据PriorityOrdered排了序
String[] postProcessorNames =
beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
}
}
sortPostProcessors(currentRegistryProcessors, beanFactory);
registryProcessors.addAll(currentRegistryProcessors);
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();

// Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
// 然后,加入实现了Ordered接口的BeanPostProcessors,顺便根据Ordered排了序
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
}
}
sortPostProcessors(currentRegistryProcessors, beanFactory);
registryProcessors.addAll(currentRegistryProcessors);
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();

// Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
// 最后加入其他常规的BeanPostProcessors
boolean reiterate = true;
while (reiterate) {
reiterate = false;
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
if (!processedBeans.contains(ppName)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
reiterate = true;
}
}
sortPostProcessors(currentRegistryProcessors, beanFactory);
registryProcessors.addAll(currentRegistryProcessors);
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();
}

根据排序接口返回值排序,默认升序排序,返回值越低优先级越高。

1
2
3
4
5
6
7
8
9
10
11
/**
* Useful constant for the highest precedence value.
* @see java.lang.Integer#MIN_VALUE
*/
int HIGHEST_PRECEDENCE = Integer.MIN_VALUE;

/**
* Useful constant for the lowest precedence value.
* @see java.lang.Integer#MAX_VALUE
*/
int LOWEST_PRECEDENCE = Integer.MAX_VALUE;

PriorityOrderedOrdered接口作为Spring整个框架通用的排序接口,在Spring中应用广泛,也是非常重要的接口。

总结

Spring Bean的生命周期分为四个阶段多个扩展点。扩展点又可以分为影响多个Bean影响单个Bean。整理如下:
四个阶段

  • 实例化 Instantiation
  • 属性赋值 Populate
  • 初始化 Initialization
  • 销毁 Destruction

多个扩展点

  • 影响多个Bean
    • BeanPostProcessor
    • InstantiationAwareBeanPostProcessor
  • 影响单个Bean
    • Aware
      • Aware Group1
        • BeanNameAware
        • BeanClassLoaderAware
        • BeanFactoryAware
      • Aware Group2
        • EnvironmentAware
        • EmbeddedValueResolverAware
        • ApplicationContextAware(ResourceLoaderAware\ApplicationEventPublisherAware\MessageSourceAware)
    • 生命周期
      • InitializingBean
      • DisposableBean

至此,Spring Bean的生命周期介绍完毕,由于作者水平有限难免有疏漏,欢迎留言纠错。

参考地址

数据库使用锁是为了支持更好的并发,提供数据的完整性和一致性。InnoDB是一个支持行锁的存储引擎,锁的类型有:

  • 共享锁(S)

  • 排他锁(X)

  • 意向共享(IS)

  • 意向排他(IX)

为了提供更好的并发,InnoDB提供了非锁定读:不需要等待访问行上的锁释放,读取行的一个快照。该方法是通过InnoDB的一个特性:MVCC来实现的。

InnoDB有三种行锁的算法:

1、Record Lock:单个行记录上的锁。

2、Gap Lock:间隙锁,锁定一个范围,但不包括记录本身。GAP锁的目的,是为了防止同一事务的两次当前读,出现幻读的情况。

3、Next-Key Lock:1+2,锁定一个范围,并且锁定记录本身。对于行的查询,都是采用该方法,主要目的是解决幻读的问题。

http://static.cyblogs.com/QQ20200529-221058@2x.jpg

为什么section B上面的插入语句会出现锁等待的情况InnoDB是行锁,在section A里面锁住了a=8的行,其他应该不受影响。why?

因为InnoDB对于行的查询都是采用了Next-Key Lock的算法,锁定的不是单个值,而是一个范围(GAP)。上面索引值有1,3,5,8,11,其记录的GAP的区间如下:是一个左开右闭的空间(原因是默认主键的有序自增的特性,结合后面的例子说明)

1
(-∞,1],(1,3],(3,5],(5,8],(8,11],(11,+∞)

特别需要注意的是,InnoDB存储引擎还会对辅助索引下一个键值加上gap lock。如上面分析,那就可以解释了。

1
2
3
4
5
6
7
mysql> select * from t where a = 8 for update;
+------+
| a |
+------+
| 8 |
+------+
1 row in set (0.01 sec)

SQL语句锁定的范围是(5,8],下个下个键值范围是(8,11],所以插入5~11之间的值的时候都会被锁定,要求等待。即:插入5,6,7,8,9,10 会被锁住。插入非这个范围内的值都正常。

如果是范围的排它,那是非常的影响性能的,是否可以只让它锁一行呢?

http://static.cyblogs.com/QQ20200529-223123@2x.jpg

**分析:**因为会话1已经对id=8的记录加了一个X锁,由于是RR隔离级别,INNODB要防止幻读需要加GAP锁:即id=5(8的左边),id=11(8的右边)之间需要加间隙锁(GAP)。这样[5,e][8,g][8,g][11,j]之间的数据都要被锁。上面测试已经验证了这一点,根据索引的有序性,数据按照主键(name)排序,后面写入的[5,cz][5,e]的左边)和[11,ja][11,j]的右边)不属于上面的范围从而可以写入。

超时时间的参数:innodb_lock_wait_timeout ,默认是50秒。
超时是否回滚参数:innodb_rollback_on_timeout 默认是OFF。

http://static.cyblogs.com/QQ20200529-223803@2x.jpg

经过测试,不会回滚超时引发的异常,当参数innodb_rollback_on_timeout 设置成ON时,则可以回滚,会把插进去的12回滚掉。

Record Lock什么时候用?还是用上面的列子,把辅助索引改成唯一属性的索引。

http://static.cyblogs.com/QQ20200529-224150@2x.jpg

为什么section B上面的插入语句可以正常,和测试一不一样?

分析:

因为InnoDB对于行的查询都是采用了Next-Key Lock的算法,锁定的不是单个值,而是一个范围,按照这个方法是会和第一次测试结果一样。但是,当查询的索引含有唯一属性的时候,Next-Key Lock 会进行优化,将其降级为Record Lock,即仅锁住索引本身,不是范围。

注意:通过主键或则唯一索引来锁定不存在的值,也会产生GAP锁定。

参考地址

简介

Elasticsearch是一个高度可扩展的开源的分布式Restful全文搜索和分析引擎。它允许用户快速的(近实时的)存储、搜索和分析海量数据。它通常用作底层引擎技术,为具有复杂搜索功能和要求的应用程序提供支持。
以下是ES可用于的一些场景:

  1. 电商网站提供搜索功能:可使用ES来存储产品的目录和库存,并为它们提供搜索和自动填充建议。
  2. 收集日志和交易数据,并进行分析:可使用Logstash来收集、聚合和解析数据, 然后让Logstash将此数据提供给ES。然后可在ES中搜索和聚合开发者感兴趣的信息。
  3. 需要快速调查、分析、可视化查询大量数据的特定问题:可以使用ES存储数据,然后使用Kibana构建自定义仪表板,来可视化展示数据。还可以使用ES的聚合功能针对这些数据进行复杂的商业分析。

我们要认识一个人Doug Cutting

为什么要提Doug Cutting,因为Elasticsearch的底层是Lucene,而Lucene就是Doug Cutting大神写的。

引用来自于: 鲜枣课堂

1998年9月4日,Google公司在美国硅谷成立。正如大家所知,它是一家做搜索引擎起家的公司。

http://static.cyblogs.com/640.jpg

无独有偶,一位名叫Doug Cutting的美国工程师,也迷上了搜索引擎。他做了一个用于文本搜索的函数库(姑且理解为软件的功能组件),命名为Lucene

http://static.cyblogs.com/pUm6Hxkd434Mk1VTAruKa8.jpg

左为Doug Cutting,右为Lucene的LOGO

Lucene是用JAVA写成的,目标是为各种中小型应用软件加入全文检索功能。因为好用而且开源(代码公开),非常受程序员们的欢迎。

早期的时候,这个项目被发布在Doug Cutting的个人网站和SourceForge(一个开源软件网站)。后来,2001年底,Lucene成为Apache软件基金会jakarta项目的一个子项目。

http://static.cyblogs.com/iaqYeVeXiaLwMxssVyfyV0f69tfVMod6.jpg

Apache软件基金会,搞IT的应该都认识

2004年,Doug Cutting再接再励,在Lucene的基础上,和Apache开源伙伴Mike Cafarella合作,开发了一款可以代替当时的主流搜索的开源搜索引擎,命名为Nutch

http://static.cyblogs.com/aqYeVeXiaLwMxssV.png

Nutch是一个建立在Lucene核心之上的网页搜索应用程序,可以下载下来直接使用。它在Lucene的基础上加了网络爬虫和一些网页相关的功能,目的就是从一个简单的站内检索推广到全球网络的搜索上,就像Google一样。

Nutch在业界的影响力比Lucene更大。

大批网站采用了Nutch平台,大大降低了技术门槛,使低成本的普通计算机取代高价的Web服务器成为可能。甚至有一段时间,在硅谷有了一股用Nutch低成本创业的潮流。

随着时间的推移,无论是Google还是Nutch,都面临搜索对象“体积”不断增大的问题。

尤其是Google,作为互联网搜索引擎,需要存储大量的网页,并不断优化自己的搜索算法,提升搜索效率。

http://static.cyblogs.com/TAruKa8WKbr3qDia9ba.jpg

Google搜索栏

在这个过程中,Google确实找到了不少好办法,并且无私地分享了出来。

2003年,Google发表了一篇技术学术论文,公开介绍了自己的谷歌文件系统GFS(Google File System)。这是Google公司为了存储海量搜索数据而设计的专用文件系统。

第二年,也就是2004年,Doug Cutting基于Google的GFS论文,实现了分布式文件存储系统,并将它命名为NDFS(Nutch Distributed File System)

http://static.cyblogs.com/google_gfs_ndfs.jpg

还是2004年,Google又发表了一篇技术学术论文,介绍自己的MapReduce编程模型。这个编程模型,用于大规模数据集(大于1TB)的并行分析运算。

第二年(2005年),Doug Cutting又基于MapReduce,在Nutch搜索引擎实现了该功能。

http://static.cyblogs.com/goole_mapreduce.jpg

2006年,当时依然很厉害的Yahoo(雅虎)公司,招安了Doug Cutting。

http://static.cyblogs.com/goole_mapreduce_002.jpg

这里要补充说明一下雅虎招安Doug的背景:2004年之前,作为互联网开拓者的雅虎,是使用Google搜索引擎作为自家搜索服务的。在2004年开始,雅虎放弃了Google,开始自己研发搜索引擎。所以。。。

加盟Yahoo之后,Doug Cutting将NDFS和MapReduce进行了升级改造,并重新命名为Hadoop(NDFS也改名为HDFS,Hadoop Distributed File System)。

这个,就是后来大名鼎鼎的大数据框架系统——Hadoop的由来。而Doug Cutting,则被人们称为Hadoop之父

http://static.cyblogs.com/goole_mapreduce_003.jpg

Hadoop这个名字,实际上是Doug Cutting他儿子的黄色玩具大象的名字。所以,Hadoop的Logo,就是一只奔跑的黄色大象。

http://static.cyblogs.com/goole_mapreduce_004.jpg

我们继续往下说。

还是2006年,Google又发论文了。

这次,它们介绍了自己的BigTable。这是一种分布式数据存储系统,一种用来处理海量数据的非关系型数据库。

Doug Cutting当然没有放过,在自己的hadoop系统里面,引入了BigTable,并命名为HBase

http://static.cyblogs.com/goole_mapreduce_005.jpg

好吧,反正就是紧跟Google时代步伐,你出什么,我学什么。

所以,Hadoop的核心部分,基本上都有Google的影子。

http://static.cyblogs.com/goole_mapreduce_006.png

其实从这里也能看到,站在巨人肩膀上或者仿照强者,也可以走出一条属于自己的道路。

安装Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
➜  Tools  brew search elasticsearch
==> Formulae
elasticsearch elasticsearch@2.4 elasticsearch@5.6

➜ Tools brew install elasticsearch@5.6
==> Downloading https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.16.tar.gz
######################################################################## 100.0%
Warning: elasticsearch@5.6 has been deprecated!
==> Caveats
Data: /usr/local/var/elasticsearch/elasticsearch_chenyuan/
Logs: /usr/local/var/log/elasticsearch/elasticsearch_chenyuan.log
Plugins: /usr/local/opt/elasticsearch@5.6/libexec/plugins/
Config: /usr/local/etc/elasticsearch/
plugin script: /usr/local/opt/elasticsearch@5.6/libexec/bin/elasticsearch-plugin

elasticsearch@5.6 is keg-only, which means it was not symlinked into /usr/local,
because this is an alternate version of another formula.

If you need to have elasticsearch@5.6 first in your PATH run:
echo 'export PATH="/usr/local/opt/elasticsearch@5.6/bin:$PATH"' >> ~/.zshrc


To have launchd start elasticsearch@5.6 now and restart at login:
brew services start elasticsearch@5.6
Or, if you don't want/need a background service you can just run:
/usr/local/opt/elasticsearch@5.6/bin/elasticsearch
==> Summary
/usr/local/Cellar/elasticsearch@5.6/5.6.16: 106 files, 36.0MB, built in 10 seconds
==> `brew cleanup` has not been run in 30 days, running now...
Removing: /Users/chenyuan/Library/Caches/Homebrew/erlang--22.1.2.mojave.bottle.tar.gz... (77.3MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/gettext--0.20.1.catalina.bottle.tar.gz... (8.3MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/icu4c--64.2.catalina.bottle.tar.gz... (26.1MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/jpeg--9c.mojave.bottle.tar.gz... (300.8KB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/libpng--1.6.37.mojave.bottle.tar.gz... (442.2KB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/libtiff--4.0.10_1.mojave.bottle.tar.gz... (1MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/node--12.11.1.catalina.bottle.tar.gz... (14.8MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/openssl@1.1--1.1.1d.mojave.bottle.tar.gz... (5.2MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/perl--5.30.0.catalina.bottle.tar.gz... (16.3MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/rabbitmq--3.8.0.tar.xz... (11MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/subversion--1.12.2_1.catalina.bottle.1.tar.gz... (10MB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/utf8proc--2.4.0.catalina.bottle.tar.gz... (152.2KB)
Removing: /Users/chenyuan/Library/Caches/Homebrew/wxmac--3.0.4_2.mojave.bottle.tar.gz... (7.4MB)
Removing: /Users/chenyuan/Library/Logs/Homebrew/icu4c... (64B)
Removing: /Users/chenyuan/Library/Logs/Homebrew/node... (64B)
Pruned 0 symbolic links and 2 directories from /usr/local

查看一下版本号,结果没有结果。

1
2
➜  ~  elasticsearch --version
zsh: command not found: elasticsearch

然后看之前的日志,需要你手动配置一下环境变量。

1
2
3
4
5
6
echo 'export PATH="/usr/local/opt/elasticsearch@5.6/bin:$PATH"' >> ~/.zshrc

# 重新加载环境变量
➜ ~ source ~/.zshrc
➜ ~ elasticsearch --version
Version: 5.6.16, Build: 3a740d1/2019-03-13T15:33:36.565Z, JVM: 1.8.0_162

启动ElasticSearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
➜  ~  elasticsearch
[2020-05-14T21:47:06,301][INFO ][o.e.n.Node ] [] initializing ...
[2020-05-14T21:47:06,403][INFO ][o.e.e.NodeEnvironment ] [vXW29Yn] using [1] data paths, mounts [[/ (/dev/disk1s5)]], net usable_space [42.5gb], net total_space [465.7gb], spins? [unknown], types [apfs]
[2020-05-14T21:47:06,404][INFO ][o.e.e.NodeEnvironment ] [vXW29Yn] heap size [1.9gb], compressed ordinary object pointers [true]
[2020-05-14T21:47:06,406][INFO ][o.e.n.Node ] node name [vXW29Yn] derived from node ID [vXW29YnkRDaIb8XuGeKRxQ]; set [node.name] to override
[2020-05-14T21:47:06,406][INFO ][o.e.n.Node ] version[5.6.16], pid[75858], build[3a740d1/2019-03-13T15:33:36.565Z], OS[Mac OS X/10.15.4/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_162/25.162-b12]
[2020-05-14T21:47:06,406][INFO ][o.e.n.Node ] JVM arguments [-Xms2g, -Xmx2g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -XX:+AlwaysPreTouch, -Xss1m, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djna.nosys=true, -Djdk.io.permissionsUseCanonicalPath=true, -Dio.netty.noUnsafe=true, -Dio.netty.noKeySetOptimization=true, -Dio.netty.recycler.maxCapacityPerThread=0, -Dlog4j.shutdownHookEnabled=false, -Dlog4j2.disable.jmx=true, -Dlog4j.skipJansi=true, -XX:+HeapDumpOnOutOfMemoryError, -Des.path.home=/usr/local/Cellar/elasticsearch@5.6/5.6.16/libexec]
[2020-05-14T21:47:07,237][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [aggs-matrix-stats]
[2020-05-14T21:47:07,237][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [ingest-common]
[2020-05-14T21:47:07,237][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [lang-expression]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [lang-groovy]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [lang-mustache]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [lang-painless]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [parent-join]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [percolator]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [reindex]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [transport-netty3]
[2020-05-14T21:47:07,238][INFO ][o.e.p.PluginsService ] [vXW29Yn] loaded module [transport-netty4]
[2020-05-14T21:47:07,239][INFO ][o.e.p.PluginsService ] [vXW29Yn] no plugins loaded
[2020-05-14T21:47:08,643][INFO ][o.e.d.DiscoveryModule ] [vXW29Yn] using discovery type [zen]
[2020-05-14T21:47:09,099][INFO ][o.e.n.Node ] initialized
[2020-05-14T21:47:09,099][INFO ][o.e.n.Node ] [vXW29Yn] starting ...
[2020-05-14T21:47:09,347][INFO ][o.e.t.TransportService ] [vXW29Yn] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2020-05-14T21:47:12,405][INFO ][o.e.c.s.ClusterService ] [vXW29Yn] new_master {vXW29Yn}{vXW29YnkRDaIb8XuGeKRxQ}{0aNOjaAGSGGLSXHQUS-lyg}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2020-05-14T21:47:12,425][INFO ][o.e.h.n.Netty4HttpServerTransport] [vXW29Yn] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2020-05-14T21:47:12,425][INFO ][o.e.n.Node ] [vXW29Yn] started
[2020-05-14T21:47:12,431][INFO ][o.e.g.GatewayService ] [vXW29Yn] recovered [0] indices into cluster_state

直接在浏览器输入:http://localhost:9200/

http://static.cyblogs.com/QQ20200514-214940@2x.jpg

安装Kibana

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
➜  ~  brew search kibana
==> Formulae
kibana kibana@5.6
➜ ~ brew install kibana@5.6
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/bottles/kibana%405.6-5.6.16.catalina.bottle.1.tar.gz
==> Downloading from https://akamai.bintray.com/f4/f451a8784dc52182670152d040f6533d4dc2f1b251ef3797eed6c6ff565db8af?__gda__=exp=1589465020~hm
######################################################################## 100.0%
Warning: kibana@5.6 has been deprecated!
==> Pouring kibana@5.6-5.6.16.catalina.bottle.1.tar.gz
==> Caveats
Config: /usr/local/etc/kibana/
If you wish to preserve your plugins upon upgrade, make a copy of
/usr/local/opt/kibana@5.6/plugins before upgrading, and copy it into the
new keg location after upgrading.

kibana@5.6 is keg-only, which means it was not symlinked into /usr/local,
because this is an alternate version of another formula.

If you need to have kibana@5.6 first in your PATH run:
echo 'export PATH="/usr/local/opt/kibana@5.6/bin:$PATH"' >> ~/.zshrc


To have launchd start kibana@5.6 now and restart at login:
brew services start kibana@5.6
Or, if you don't want/need a background service you can just run:
/usr/local/opt/kibana@5.6/bin/kibana
==> Summary
/usr/local/Cellar/kibana@5.6/5.6.16: 37,391 files, 200MB

同样的道理,配置好环境变量。

启动kibana

1
2
3
4
5
6
7
8
9
10
11
➜  ~  kibana
log [13:56:55.842] [info][status][plugin:kibana@5.6.16] Status changed from uninitialized to green - Ready
log [13:56:55.901] [info][status][plugin:elasticsearch@5.6.16] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [13:56:55.923] [info][status][plugin:console@5.6.16] Status changed from uninitialized to green - Ready
log [13:56:55.952] [info][status][plugin:metrics@5.6.16] Status changed from uninitialized to green - Ready
log [13:56:56.158] [info][status][plugin:timelion@5.6.16] Status changed from uninitialized to green - Ready
log [13:56:56.162] [info][listening] Server running at http://localhost:5601
log [13:56:56.163] [info][status][ui settings] Status changed from uninitialized to yellow - Elasticsearch plugin is yellow
log [13:57:01.165] [info][status][plugin:elasticsearch@5.6.16] Status changed from yellow to yellow - No existing Kibana index found
log [13:57:02.456] [info][status][plugin:elasticsearch@5.6.16] Status changed from yellow to green - Kibana index ready
log [13:57:02.457] [info][status][ui settings] Status changed from yellow to green - Ready

直接在浏览器输入:http://localhost:5601/

http://static.cyblogs.com/QQ20200514-215814@2x.jpg

添加数据

1
2
3
4
5
6
7
8
PUT /megacorp/employee/1
{
"first_name" : "John",
"last_name" : "Smith",
"age" : 25,
"about" : "I love to go rock climbing",
"interests": [ "sports", "music" ]
}

http://static.cyblogs.com/QQ20200514-220241@2x.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"_index": "megacorp",
"_type": "employee",
"_id": "2",
"_version": 1, // 版本
"result": "created", // 是新增还是修改
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}

查询数据

对着官方文档一一的研究了一下它的一些语法与介绍。我个人觉得ElasticSearch的官方文档还算比较过关的:https://www.elastic.co/guide/cn/elasticsearch/guide/current/foreword_id.html 学习起来基本毫无障碍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
POST /my_store/products/_bulk
{ "index": { "_id": 1 }}
{ "price" : 10, "productID" : "XHDK-A-1293-#fJ3" }
{ "index": { "_id": 2 }}
{ "price" : 20, "productID" : "KDKE-B-9947-#kL5" }
{ "index": { "_id": 3 }}
{ "price" : 30, "productID" : "JODL-X-1937-#pV7" }
{ "index": { "_id": 4 }}
{ "price" : 30, "productID" : "QQPX-R-3956-#aD8" }


GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"term" : {
"price" : 20
}
}
}
}
}

GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"term" : {
"productID" : "XHDK-A-1293-#fJ3"
}
}
}
}
}

GET /my_store/_analyze
{
"field": "productID",
"text": "XHDK-A-1293-#fJ3"
}


DELETE /my_store

PUT /my_store
{
"mappings" : {
"products" : {
"properties" : {
"productID" : {
"type" : "string",
"index" : "not_analyzed"
}
}
}
}

}


GET /my_store/products/_search
{
"query" : {
"filtered" : {
"filter" : {
"bool" : {
"should" : [
{ "term" : {"price" : 20}},
{ "term" : {"productID" : "XHDK-A-1293-#fJ3"}}
],
"must_not" : {
"term" : {"price" : 30}
}
}
}
}
}
}


GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"terms" : {
"price" : [20, 30]
}
}
}
}
}


GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"range" : {
"price" : {
"gte" : 20,
"lt" : 40
}
}
}
}
}
}


POST /my_index/posts/_bulk
{ "index": { "_id": "1" }}
{ "tags" : ["search"] }
{ "index": { "_id": "2" }}
{ "tags" : ["search", "open_source"] }
{ "index": { "_id": "3" }}
{ "other_field" : "some data" }
{ "index": { "_id": "4" }}
{ "tags" : null }
{ "index": { "_id": "5" }}
{ "tags" : ["search", null] }


GET /my_index/posts/_search
{
"query" : {
"constant_score" : {
"filter" : {
"exists" : { "field" : "tags" }
}
}
}
}


GET /my_index/posts/_search
{
"query" : {
"constant_score" : {
"filter": {
"missing" : { "field" : "tags" }
}
}
}
}



POST /cars/transactions/_bulk
{ "index": {}}
{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }
{ "index": {}}
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" }
{ "index": {}}
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }
{ "index": {}}
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" }


GET /cars/transactions/_search
{
"size" : 0,
"aggs" : {
"popular_colors111" : {
"terms" : {
"field" : "color.keyword"
}
}
}
}


GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color.keyword"
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}


GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color.keyword"
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
},
"make": {
"terms": {
"field": "make.keyword"
}
}
}
}
}
}


GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color.keyword"
},
"aggs": {
"avg_price": { "avg": { "field": "price" }
},
"make" : {
"terms" : {
"field" : "make.keyword"
},
"aggs" : {
"min_price" : { "min": { "field": "price"} },
"max_price" : { "max": { "field": "price"} }
}
}
}
}
}
}

GET /cars/transactions/_search
{
"size" : 0,
"aggs":{
"price":{
"histogram":{
"field": "price",
"interval": 20000
},
"aggs":{
"revenue": {
"sum": {
"field" : "price"
}
}
}
}
}
}

GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"makes": {
"terms": {
"field": "make.keyword",
"size": 10
},
"aggs": {
"stats": {
"extended_stats": {
"field": "price"
}
}
}
}
}
}



GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"sales": {
"date_histogram": {
"field": "sold",
"interval": "month",
"format": "yyyy-MM-dd",
"min_doc_count" : 0,
"extended_bounds" : {
"min" : "2014-01-01",
"max" : "2014-12-31"
}
}
}
}
}


GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"sales": {
"date_histogram": {
"field": "sold",
"interval": "quarter",
"format": "yyyy-MM-dd",
"min_doc_count" : 0,
"extended_bounds" : {
"min" : "2014-01-01",
"max" : "2014-12-31"
}
},
"aggs": {
"per_make_sum": {
"terms": {
"field": "make.keyword"
},
"aggs": {
"sum_price": {
"sum": { "field": "price" }
}
}
},
"total_sum": {
"sum": { "field": "price" }
}
}
}
}
}


GET /cars/transactions/_search
{
"query" : {
"match" : {
"make" : "ford"
}
},
"aggs" : {
"colors" : {
"terms" : {
"field" : "color.keyword"
}
}
}
}



GET /cars/transactions/_search
{
"size" : 0,
"query" : {
"match" : {
"make" : "ford"
}
},
"aggs" : {
"single_avg_price": {
"avg" : { "field" : "price" }
},
"all": {
"global" : {},
"aggs" : {
"avg_price": {
"avg" : { "field" : "price" }
}

}
}
}
}
布尔过滤器

一个 bool 过滤器由三部分组成:

1
2
3
4
5
6
7
{
"bool" : {
"must" : [],
"should" : [],
"must_not" : [],
}
}
  • must

    所有的语句都 必须(must) 匹配,与 AND 等价。

  • must_not

    所有的语句都 不能(must not) 匹配,与 NOT 等价。

  • should

    至少有一个语句要匹配,与 OR 等价。

就这么简单! 当我们需要多个过滤器时,只须将它们置入 bool 过滤器的不同部分即可。

几个核心概念

  • 集群(Cluster)一组拥有共同的 cluster name 的节点。
  • 节点(Node) 集群中的一个 Elasticearch 实例。
  • 索引(Index) 相当于关系数据库中的database概念,一个集群中可以包含多个索引。这个是个逻辑概念。
  • 主分片(Primary shard) 索引的子集,索引可以切分成多个分片,分布到不同的集群节点上。分片对应的是 Lucene 中的索引。
  • 副本分片(Replica shard)每个主分片可以有一个或者多个副本。
  • 类型(Type)相当于数据库中的table概念,mapping是针对 Type 的。同一个索引里可以包含多个 Type。
  • Mapping 相当于数据库中的schema,用来约束字段的类型,不过 Elasticsearch 的 mapping 可以自动根据数据创建。
  • 文档(Document) 相当于数据库中的row。
  • 字段(Field)相当于数据库中的column。
  • 分配(Allocation) 将分片分配给某个节点的过程,包括分配主分片或者副本。如果是副本,还包含从主分片复制数据的过程。
  • gateway: 代表es索引快照的存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到本地硬盘。gateway对索引快照进行存储,当这个es集群关闭再重新启动时就会从gateway中读取索引备份数据。es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。

索引数据结构

传统数据库为特定列增加一个索引,例如B-Tree索引来加速检索。Elasticsearch和Lucene使用倒排索引(inverted index)来达到相同目的,倒排索引中用到的数据结构是FST树。

它的优点

  • Elasticsearch主要优势是:速度快,使用方便,分布式的,检索,功能强大。

  • ES官方的想做的是ELK结合起来做日志分析等工作。估计这也是它最多的应用场景。

  • Elasticsearch 现在的主要目标市场已经从站内搜索转移到了监控与日志数据的收集存储和分析,也就是大家常谈论的ELK。

  • Elasticsearch 现在主要的应用场景有三块。站内搜索,主要和 Solr 竞争,属于后起之秀。NoSQL json文档数据库,主要抢占 Mongo 的市场,它在读写性能上优于 Mongo,同时也支持地理位置查询,还方便地理位置和文本混合查询,属于歪打正着。监控,统计以及日志类时间序的数据的存储和分析以及可视化,这方面是引领者。

如何实现Master选举的?

Elasticsearch的选举是ZenDiscovery模块负责的,通过多播或单播技术来发现同一个集群中的其他节点并与它们连接。

一个节点如何选取它自己认为的master节点?

它会对所有可以成为master的节点(node.master: true)根据nodeId字典排序,,然后选出第一个(第0位)节点,暂且认为它是master节点。

如果对某个节点的投票数达到一定的值(可以成为master节点数n/2+1)并且该节点自己也选举自己,那这个节点就是master。否则重新选举一直到满足上述条件。

集群分片的读写操作流程

集群分片的读写操作流程

第一:路由计算(routing)和副本一致性(replica)

  • routing

Elasticsearch针对路由计算选择了一个很简单的方法,计算如下:

routing = hash(routing) % number_of_primary_shards

每个数据都有一个routing参数,默认情况下,就使用其_id值,将其_id值计算hash后,对索引的主分片数取余,就是数据实际应该存储到的分片ID

由于取余这个计算,完全依赖于分母,所以导致Elasticsearch索引有一个限制,索引的主分片数,不可以随意修改。因为一旦主分片数不一样,索引数据不可读。

  • 副本一致性(replica)

作为分布式系统,数据副本可算是一个标配。Elasticsearch数据写入流程。自然涉及副本,在有副本配置的情况下,数据从发向Elasticsearch节点,到接到Elasticsearch节点响应返回,流向如下

  • 客户端请求发送给master Node1节点,这里也可以发送给其他节点

  • Node1节点用数据的_id计算出数据应该存储在shard0上,通过cluster state信息发现shard0的主分片在Node3节点上,Node1转发请求数据给Node3,Node3完成数据的索引,索引过程在上篇博客中详细介绍了。

  • Node3并行转发数据给分配有shard0的副本分片Node1和Node2上。当收到任一节点汇报副本分片数据写入成功以后,Node3即返回给初始的接受节点Node1,宣布数据写入成功。Node1成功返回给客户端。

第二:shard的allocate配置

上文介绍了分片的索引过程,通过路由计算可以确定文本所在的分片id,那么分片在集群中的分配策略是如何确定的?

一般来说,某个shard分配在哪个节点上,是由Elasticsearch自动决定的。以下几种情况会触发分配动作:

  • 新索引生成
  • 索引的删除
  • 新增副本分片
  • 节点增减引发的数据均衡

如何集成Bboss+Echart?

如何更高效的集成一些已经成型的开源框架呢?推荐一个比较好用的es+spring的框架,而且是基于Restful方式的,支持像mybatis的写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!-- ES start -->
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
<version>6.1.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- ES end -->

<!-- 让你轻松的搞定ECharts各种域对象 -->
<dependency>
<groupId>com.github.abel533</groupId>
<artifactId>ECharts</artifactId>
<version>3.0.0.6</version>
</dependency>

后面专门来用一篇描述对于bboss-elasticsearch-spring-boot-starter的集成以及改造。

1
2
# ElasticSearch 配置
spring.elasticsearch.bboss.elasticsearch.rest.hostNames=elasticsearch-test.za.net:9200
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<properties>
<property name="pieLoanSuccessAndGroupByProduct">
<![CDATA[
{
"query": {
"bool": {
"filter": {
"range": {
"gmt_created": {
"include_lower": true,
"include_upper": true,
"from": #[from],
"to": #[to]
}
}
},
"must": {
"match": {
"status": 5
}
}
}
},
"size": 0,
"aggs": {
"list": {
"terms": {
"field": "product_code"
}
}
}
}
]]>
</property>
</properties>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testSearchAgg() throws Exception {
String mappath = "esmapper/LoanApply.xml";
//创建加载配置文件的客户端工具,用来检索文档,单实例多线程安全
ClientInterface clientInterface = bbossESStarter.getConfigRestClient(mappath);
Map<String, Object> params = new HashMap<String, Object>();
params.put("from", "2017-01-14 12:14:09");
params.put("to", "2018-05-14 12:14:09");
String path = index + "/" + type + "/_search";
ESAggDatas<LongAggHit> response = clientInterface.searchAgg(path,
"pieLoanSuccessAndGroupByProduct",
params,
LongAggHit.class,
"list");
log.info("response={}", JSONUtils.toFormatJsonString(response));
}

上面就是一个简单的例子,后面可以专门为这个开源项目做一个详细的介绍,不过人家的文档写的也是非常的Nice的。https://esdoc.bbossgroups.com/#/quickstart

参考地址

看一个最简单的CGLIB的例子,感受一下AOP是如何做到的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Created with vernon-test
* Description:
* User: chenyuan
* Date: 16/4/25
* Time: 上午9:25
*/
public class Target {
public String execute() {
String message = "----------test()----------";
System.out.println(message);
return message;
}
}

/**
* Created with vernon-test
* Description:
* User: chenyuan
* Date: 16/4/25
* Time: 上午9:25
*/
public class MyMethodInterceptor implements MethodInterceptor {

@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
System.out.println(">>>MethodInterceptor start...");
Object result = proxy.invokeSuper(obj, args);
System.out.println(">>>MethodInterceptor ending...");
return "haha";
}
}

/**
* Created with vernon-test
* Description:
* User: chenyuan
* Date: 16/4/25
* Time: 上午9:28
*/
public class CglibProxyTest {

public Object createProxy(Class targetClass) {
// 第一步
Enhancer enhancer = new Enhancer();
// 第二步
enhancer.setSuperclass(targetClass);
// 第三步
enhancer.setCallback(new MyMethodInterceptor());
// 第四步
return enhancer.create();
}

public static void main(String rags[]) {
CglibProxyTest cglibProxyTest = new CglibProxyTest();
Target proxyTarget = (Target) cglibProxyTest.createProxy(Target.class);
String res = proxyTarget.execute();
System.out.println(res);
}

}

执行后的结果显示

1
2
3
4
5
6
Connected to the target VM, address: '127.0.0.1:55868', transport: 'socket'
>>>MethodInterceptor start...
----------test()----------
>>>MethodInterceptor ending...
haha
Disconnected from the target VM, address: '127.0.0.1:55868', transport: 'socket'

实际上在执行execute()的前后就各自做了自己想要的操作。其实这个就是Spring AOP对简单的一个原型。

@Transaction的工作原理

SpringTransactionInterceptorPlatformTransactionManager这两个类是整个事务模块的核心,TransactionInterceptor负责拦截方法执行,进行判断是否需要提交或者回滚事务。PlatformTransactionManager是Spring 中的事务管理接口,真正定义了事务如何回滚和提交。我们重点研究下这两个类的源码。

TransactionInterceptor类中的代码有很多,我简化一下逻辑,方便说明:

1
2
3
4
5
6
7
//以下代码省略部分内容
public Object invoke(MethodInvocation invocation) throws Throwable {
//获取事务调用的目标方法
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
//执行带事务调用
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

其实,这里也就是因为用到了动态代理。在事务回滚这个动作的前前后后可以做自己想要的东西。这是一个非常重要的设计思想。如果我们自己要写框架,这个模式可以作为你的第一参考。

基于注解的实现机制

  • 调用注解方法
  • 生成代理对象 - CglibAopProxy 调用内部类的方法DynamicAdvisedInterceptor.intercept()
  • TransactionInterceptor.invoke()拦截器拦截,在目标方法执行之前创建并加入事务
  • AbstractPlatformTransactionManager抽象事务管理器操作数据源DataSource提交或回滚事务

我们看了解一下它是如何仿照最上面的code来写的?

仿照上面Demo的第一步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public Object getProxy(@Nullable ClassLoader classLoader) {
if (logger.isTraceEnabled()) {
logger.trace("Creating CGLIB proxy: " + this.advised.getTargetSource());
}

try {
Class<?> rootClass = this.advised.getTargetClass();
Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");

Class<?> proxySuperClass = rootClass;
if (ClassUtils.isCglibProxyClass(rootClass)) {
proxySuperClass = rootClass.getSuperclass();
Class<?>[] additionalInterfaces = rootClass.getInterfaces();
for (Class<?> additionalInterface : additionalInterfaces) {
this.advised.addInterface(additionalInterface);
}
}

// Validate the class, writing log messages as necessary.
validateClassIfNecessary(proxySuperClass, classLoader);

// 第一点:Configure CGLIB Enhancer...
Enhancer enhancer = createEnhancer();
if (classLoader != null) {
enhancer.setClassLoader(classLoader);
if (classLoader instanceof SmartClassLoader &&
((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
enhancer.setUseCache(false);
}
}
// 第二点:setSuperclass
enhancer.setSuperclass(proxySuperClass);
enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
// 第三点:获取Callback
Callback[] callbacks = getCallbacks(rootClass);
Class<?>[] types = new Class<?>[callbacks.length];
for (int x = 0; x < types.length; x++) {
types[x] = callbacks[x].getClass();
}
// fixedInterceptorMap only populated at this point, after getCallbacks call above
enhancer.setCallbackFilter(new ProxyCallbackFilter(
this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
enhancer.setCallbackTypes(types);

// Generate the proxy class and create a proxy instance.
return createProxyClassAndInstance(enhancer, callbacks);
}
catch (CodeGenerationException | IllegalArgumentException ex) {
throw new AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() +
": Common causes of this problem include using a final class or a non-visible class",
ex);
}
catch (Throwable ex) {
// TargetSource.getTarget() failed
throw new AopConfigException("Unexpected AOP exception", ex);
}
}

仿照上面Demo的第三步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
// Parameters used for optimization choices...
boolean exposeProxy = this.advised.isExposeProxy();
boolean isFrozen = this.advised.isFrozen();
boolean isStatic = this.advised.getTargetSource().isStatic();

// Choose an "aop" interceptor (used for AOP calls).
Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);

// Choose a "straight to target" interceptor. (used for calls that are
// unadvised but can return this). May be required to expose the proxy.
Callback targetInterceptor;
if (exposeProxy) {
targetInterceptor = (isStatic ?
new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
}
else {
targetInterceptor = (isStatic ?
new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
}

// Choose a "direct to target" dispatcher (used for
// unadvised calls to static targets that cannot return this).
Callback targetDispatcher = (isStatic ?
new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());

Callback[] mainCallbacks = new Callback[] {
aopInterceptor, // for normal advice
targetInterceptor, // invoke target without considering advice, if optimized
new SerializableNoOp(), // no override for methods mapped to this
targetDispatcher, this.advisedDispatcher,
new EqualsInterceptor(this.advised),
new HashCodeInterceptor(this.advised)
};

Callback[] callbacks;

// If the target is a static one and the advice chain is frozen,
// then we can make some optimizations by sending the AOP calls
// direct to the target using the fixed chain for that method.
if (isStatic && isFrozen) {
Method[] methods = rootClass.getMethods();
Callback[] fixedCallbacks = new Callback[methods.length];
this.fixedInterceptorMap = new HashMap<>(methods.length);

// TODO: small memory optimization here (can skip creation for methods with no advice)
for (int x = 0; x < methods.length; x++) {
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
this.fixedInterceptorMap.put(methods[x].toString(), x);
}

// Now copy both the callbacks from mainCallbacks
// and fixedCallbacks into the callbacks array.
callbacks = new Callback[mainCallbacks.length + fixedCallbacks.length];
System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
this.fixedInterceptorOffset = mainCallbacks.length;
}
else {
callbacks = mainCallbacks;
}
return callbacks;
}

仿照上面Demo的第四步:

1
2
3
4
5
6
7
protected Object createProxyClassAndInstance(Enhancer enhancer, Callback[] callbacks) {
enhancer.setInterceptDuringConstruction(false);
enhancer.setCallbacks(callbacks);
return (this.constructorArgs != null && this.constructorArgTypes != null ?
enhancer.create(this.constructorArgTypes, this.constructorArgs) :
enhancer.create());
}

上面的几步就完成了一个动态代理的流程,就只需要真的发生调用的时候去执行动态代理类了。

哪些场景事物会失效?

  • 1、只对public修饰方法才起作用
  • 2、@Transaction默认检测异常为RuntimeException及其子类 如果有其他异常需要回滚事务的需要自己手动配置,例如:@Transactional(rollbackFor = Exception.class)
  • 3、确保异常没有被try-catch{}catch以后也不会回滚
  • 4、检查下自己的数据库是否支持事务,如mysqlmylsam
  • 5、SpringBoot项目默认已经支持事务,不用配置;其他类型项目需要在xml中配置是否开启事务
  • 6、如果在同一个类中,一个非@Transaction的方法调用有@Transaction的方法不会生效,因为代理问题

这里说下在同一个类中,一个非@Transaction的方法调用有@Transaction的方法不会生效。如果是在同一个类中的方法调用,则不会被方法拦截器拦截到,因此事务不会起作用,必须将方法放入另外一个类中,并且该类通过Spring注入。

Spring 采用动态代理(AOP)实现对Bean的管理和切片,它为我们的每个class生成一个代理对象,只有在代理对象之间进行调用时,可以触发切面逻辑。

而在同一个类中,方法B调用A,调用的事元对象的方法,而不是通过代理对象,所以Spring无法切到这次调用,也就是无法通过注解保证事务性。

参考地址

1. 概述

一直在用SpringBoot中的@Transactional来做事务管理,但是很少没想过SpringBoot是如何实现事务管理的,今天从源码入手,看看@Transactional是如何实现事务的,最后我们结合源码的理解,自己动手写一个类似的注解来实现事务管理,帮助我们加深理解。

阅读说明:本文假设你具备Java基础,同时对事务有基本的了解和使用。

2. 事务的相关知识

开始看源码之前,我们先回顾下事务的相关知识。

2.1 事务的隔离级别

事务为什么需要隔离级别呢?这是因为在并发事务情况下,如果没有隔离级别会导致如下问题:

  • 脏读(Dirty Read) :当A事务对数据进行修改,但是这种修改还没有提交到数据库中,B事务同时在访问这个数据,由于没有隔离,B获取的数据有可能被A事务回滚,这就导致了数据不一致的问题。
  • 丢失修改(Lost To Modify): 当A事务访问数据100,并且修改为100-1=99,同时B事务读取数据也是100,修改数据100-1=99,最终两个事务的修改结果为99,但是实际是98。事务A修改的数据被丢失了。
  • 不可重复读(Unrepeatable Read):指A事务在读取数据X=100的时候,B事务把数据X=100修改为X=200,这个时候A事务第二次读取数据X的时候,发现X=200了,导致了在整个A事务期间,两次读取数据X不一致了,这就是不可重复读。
  • 幻读(Phantom Read):幻读和不可重复读类似。幻读表现在,当A事务读取表数据时候,只有3条数据,这个时候B事务插入了2条数据,当A事务再次读取的时候,发现有5条记录了,平白无故多了2条记录,就像幻觉一样。

不可重复读 VS 幻读

不可重复读的重点是修改 : 同样的条件 , 你读取过的数据 , 再次读取出来发现值不一样了,重点在更新操作。 幻读的重点在于新增或者删除:同样的条件 , 第 1 次和第 2 次读出来的记录数不一样,重点在增删操作。

所以,为了避免上述的问题,事务中就有了隔离级别的概念,在Spring中定义了五种表示隔离级别的常量:

常量 说明
TransactionDefinition.ISOLATION_DEFAULT 数据库默认的隔离级别,MySQL默认采用的 REPEATABLE_READ隔离级别
TransactionDefinition.ISOLATION_READ_UNCOMMITTED 最低的隔离级别,允许读取未提交的数据变更,可能会导致脏读、幻读或不可重复读
TransactionDefinition.ISOLATION_READ_COMMITTED 允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生
TransactionDefinition.ISOLATION_REPEATABLE_READ 对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,**可以阻止脏读和不可重复读,但幻读仍有可能发生。**MySQL中通过MVCC解决了该隔离级别下出现幻读的可能。
TransactionDefinition.ISOLATION_SERIALIZABLE 串行化隔离级别,该级别可以防止脏读、不可重复读以及幻读,但是串行化会影响性能。

2.2 Spring中事务的传播机制

为什么Spring中要搞一套事务的传播机制呢?这是Spring给我们提供的事务增强工具,主要是解决方法之间调用,事务如何处理的问题。比如有方法A、方法B和方法C,在A中调用了方法B和方法C。伪代码如下:

1
2
3
4
5
6
7
8
9
10
MethodA{
MethodB;
MethodC;
}
MethodB{

}
MethodC{

}

假设三个方法中都开启了自己的事务,那么他们之间是什么关系呢?MethodA的回滚会影响MethodBMethodC吗?Spring中的事务传播机制就是解决这个问题的。

Spring中定义了七种事务传播行为:

类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

这七种传播机制是如何影响事务的,感兴趣的同学可以阅读这篇文章

3. 如何实现异常回滚的

回顾完了事务的相关知识,接下来我们正式来研究下Spring Boot中如何通过@Transactional来管理事务的,我们重点看看它是如何实现回滚的。

在Spring中TransactionInterceptorPlatformTransactionManager这两个类是整个事务模块的核心,TransactionInterceptor负责拦截方法执行,进行判断是否需要提交或者回滚事务。PlatformTransactionManager是Spring 中的事务管理接口,真正定义了事务如何回滚和提交。我们重点研究下这两个类的源码。

TransactionInterceptor类中的代码有很多,我简化一下逻辑,方便说明:

1
2
3
4
5
6
7
//以下代码省略部分内容
public Object invoke(MethodInvocation invocation) throws Throwable {
//获取事务调用的目标方法
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
//执行带事务调用
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

invokeWithinTransaction 简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//TransactionAspectSupport.class
//省略了部分代码
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
Object retVal;
try {
//调用真正的方法体
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 如果出现异常,执行事务异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//最后做一下清理工作,主要是缓存和状态等
cleanupTransactionInfo(txInfo);
}
//如果没有异常,直接提交事务。
commitTransactionAfterReturning(txInfo);
return retVal;

}

事务出现异常回滚的逻辑completeTransactionAfterThrowing如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//省略部分代码
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
//判断是否需要回滚,判断的逻辑就是看有没有声明事务属性,同时判断是不是在目前的这个异常中执行回滚。
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
//执行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
else {
//否则不需要回滚,直接提交即可。
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

}
}
}

上面的代码已经把Spring的事务的基本原理说清楚了,如何进行判断执行事务,如何回滚。下面到了真正执行回滚逻辑的代码中PlatformTransactionManager接口的子类,我们以JDBC的事务为例,DataSourceTransactionManager就是jdbc的事务管理类。跟踪上面的代码rollback(txInfo.getTransactionStatus())可以发现最终执行的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
//调用jdbc的 rollback进行回滚事务。
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}

3.1 小结

这里小结下Spring 中事务的实现思路,Spring 主要依靠 TransactionInterceptor 来拦截执行方法体,判断是否开启事务,然后执行事务方法体,方法体中catch住异常,接着判断是否需要回滚,如果需要回滚就委托真正的TransactionManager 比如JDBC中的DataSourceTransactionManager来执行回滚逻辑。提交事务也是同样的道理。

这里用个流程图展示下思路:

http://static.cyblogs.com/1712beda531a889a.png

4. 手写一个注解实现事务回滚

我们弄清楚了Spring的事务执行流程,那我们可以模仿着自己写一个注解,实现遇到指定异常就回滚的功能。这里持久层就以最简单的JDBC为例。我们先梳理下需求,首先注解我们可以基于Spring 的AOP来实现,接着既然是JDBC,那么我们需要一个类来帮我们管理连接,用来判断异常是否回滚或者提交。梳理完就开干吧。

4.1 首先加入依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

4.2 新增一个注解

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @description:
* @author: luozhou
* @create: 2020-03-29 17:05
**/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MyTransaction {
//指定异常回滚
Class<? extends Throwable>[] rollbackFor() default {};
}

4.3 新增连接管理器

该类帮助我们管理连接,该类的核心功能是把取出的连接对象绑定到线程上,方便在AOP处理中取出,进行提交或者回滚操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* @description:
* @author: luozhou
* @create: 2020-03-29 21:14
**/
@Component
public class DataSourceConnectHolder {
@Autowired
DataSource dataSource;
/**
* 线程绑定对象
*/
ThreadLocal<Connection> resources = new NamedThreadLocal<>("Transactional resources");

public Connection getConnection() {
Connection con = resources.get();
if (con != null) {
return con;
}
try {
con = dataSource.getConnection();
//为了体现事务,全部设置为手动提交事务
con.setAutoCommit(false);
} catch (SQLException e) {
e.printStackTrace();
}
resources.set(con);
return con;
}

public void cleanHolder() {
Connection con = resources.get();
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
resources.remove();
}
}

4.4 新增一个切面

这部分是事务处理的核心,先获取注解上的异常类,然后捕获住执行的异常,判断异常是不是注解上的异常或者其子类,如果是就回滚,否则就提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @description:
* @author: luozhou
* @create: 2020-03-29 17:08
**/
@Aspect
@Component
public class MyTransactionAopHandler {
@Autowired
DataSourceConnectHolder connectHolder;
Class<? extends Throwable>[] es;

//拦截所有MyTransaction注解的方法
@org.aspectj.lang.annotation.Pointcut("@annotation(luozhou.top.annotion.MyTransaction)")
public void Transaction() {

}

@Around("Transaction()")
public Object TransactionProceed(ProceedingJoinPoint proceed) throws Throwable {
Object result = null;
Signature signature = proceed.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
if (method == null) {
return result;
}
MyTransaction transaction = method.getAnnotation(MyTransaction.class);
if (transaction != null) {
es = transaction.rollbackFor();
}
try {
result = proceed.proceed();
} catch (Throwable throwable) {
//异常处理
completeTransactionAfterThrowing(throwable);
throw throwable;
}
//直接提交
doCommit();
return result;
}
/**
* 执行回滚,最后关闭连接和清理线程绑定
*/
private void doRollBack() {
try {
connectHolder.getConnection().rollback();
} catch (SQLException e) {
e.printStackTrace();
} finally {
connectHolder.cleanHolder();
}

}
/**
*执行提交,最后关闭连接和清理线程绑定
*/
private void doCommit() {
try {
connectHolder.getConnection().commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
connectHolder.cleanHolder();
}
}
/**
*异常处理,捕获的异常是目标异常或者其子类,就进行回滚,否则就提交事务。
*/
private void completeTransactionAfterThrowing(Throwable throwable) {
if (es != null && es.length > 0) {
for (Class<? extends Throwable> e : es) {
if (e.isAssignableFrom(throwable.getClass())) {
doRollBack();
}
}
}
doCommit();
}
}

4.5 测试验证

创建一个tb_test表,表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for tb_test
-- ----------------------------
DROP TABLE IF EXISTS `tb_test`;
CREATE TABLE `tb_test` (
`id` int(11) NOT NULL,
`email` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

SET FOREIGN_KEY_CHECKS = 1;

4.5.1 编写一个Service

saveTest方法调用了2个插入语句,同时声明了@MyTransaction事务注解,遇到NullPointerException就进行回滚,最后我们执行了除以0操作,会抛出ArithmeticException。我们用单元测试看看数据是否会回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @description:
* @author: luozhou kinglaw1204@gmail.com
* @create: 2020-03-29 22:05
**/
@Service
public class MyTransactionTest implements TestService {
@Autowired
DataSourceConnectHolder holder;
//一个事务中执行两个sql插入
@MyTransaction(rollbackFor = NullPointerException.class)
@Override
public void saveTest(int id) {
saveWitharamters(id, "luozhou@gmail.com");
saveWitharamters(id + 10, "luozhou@gmail.com");
int aa = id / 0;
}
//执行sql
private void saveWitharamters(int id, String email) {
String sql = "insert into tb_test values(?,?)";
Connection connection = holder.getConnection();
PreparedStatement stmt = null;
try {
stmt = connection.prepareStatement(sql);
stmt.setInt(1, id);
stmt.setString(2, email);
stmt.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}

}

4.5.2 单元测试

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
@RunWith(SpringRunner.class)
class SpringTransactionApplicationTests {
@Autowired
private TestService service;

@Test
void contextLoads() throws SQLException {
service.saveTest(1);
}

}

http://static.cyblogs.com/1712e8cd12a0b64e.jpg

上图代码声明了事务对NullPointerException异常进行回滚,运行中遇到了ArithmeticException异常,所以是不会回滚的,我们在右边的数据库中刷新发现数据正常插入成功了,说明并没有回滚。

http://static.cyblogs.com/1712e8cd12cd8755.jpg

我们把回滚的异常类改为ArithmeticException,把原数据清空再执行一次,出现了ArithmeticException异常,这个时候查看数据库是没有记录新增成功了,这说明事物进行回滚了,表明我们的注解起作用了。

5. 总结

本文最开始回顾了事务的相关知识,并发事务会导致脏读丢失修改不可重复读幻读,为了解决这些问题,数据库中就引入了事务的隔离级别,隔离级别包括:读未提交读提交可重复读串行化

Spring中增强了事务的概念,为了解决方法A、方法B和方法C之间的事务关系,引入了事务传播机制的概念。

Spring中的@Transactional注解的事务实现主要通过TransactionInterceptor拦截器来进行实现的,拦截目标方法,然后判断异常是不是目标异常,如果是目标异常就行进行回滚,否则就进行事务提交。

最后我们自己通过JDBC结合Spring的AOP自己写了个@MyTransactional的注解,实现了遇到指定异常回滚的功能。

参考地址

简介

Lombok是一款好用顺手的工具,就像Google Guava一样,在此予以强烈推荐,每一个Java工程师都应该使用它。Lombok是一种Java™实用工具,可用来帮助开发人员消除Java的冗长代码,尤其是对于简单的Java对象(POJO)。它通过注释实现这一目的。通过在开发环境中实现Lombok,开发人员可以节省构建诸如hashCode()和equals()这样的方法以及以往用来分类各种accessor和mutator的大量时间。

IntelliJ安装Lombok

通过IntelliJ的插件中心安装

http://static.cyblogs.com/QQ20200425-164854@2x.jpg

最后需要注意的是,在使用lombok注解的时候记得要导入lombok.jar包到工程,如果使用的是Maven Project,要在pom.xml中添加依赖。

1
2
3
4
5
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.8</version>
</dependency>

Lombok用法

Lombok注解说明
  • val:用在局部变量前面,相当于将变量声明为final

  • @NonNull:给方法参数增加这个注解会自动在方法内对该参数进行是否为空的校验,如果为空,则抛出NPE(NullPointerException)

  • @Cleanup:自动管理资源,用在局部变量之前,在当前变量范围内即将执行完毕退出之前会自动清理资源,自动生成try-finally这样的代码来关闭流

  • @Getter/@Setter:用在属性上,再也不用自己手写setter和getter方法了,还可以指定访问范围

  • @ToString:用在类上,可以自动覆写toString方法,当然还可以加其他参数,例如@ToString(exclude=”id”)排除id属性,或者@ToString(callSuper=true, includeFieldNames=true)调用父类的toString方法,包含所有属性

  • @EqualsAndHashCode:用在类上,自动生成equals方法和hashCode方法

  • @NoArgsConstructor, @RequiredArgsConstructor and @AllArgsConstructor:用在类上,自动生成无参构造和使用所有参数的构造函数以及把所有@NonNull属性作为参数的构造函数,如果指定staticName = “of”参数,同时还会生成一个返回类对象的静态工厂方法,比使用构造函数方便很多

  • @Data:注解在类上,相当于同时使用了@ToString@EqualsAndHashCode@Getter@Setter@RequiredArgsConstrutor这些注解,对于POJO类十分有用

  • @Value:用在类上,是@Data的不可变形式,相当于为属性添加final声明,只提供getter方法,而不提供setter方法

  • @Builder:用在类、构造器、方法上,为你提供复杂的builder APIs,让你可以像如下方式一样调用Person.builder().name("Adam Savage").city("San Francisco").job("Mythbusters").job("Unchained Reaction").build();更多说明参考Builder

  • @SneakyThrows:自动抛受检异常,而无需显式在方法上使用throws语句

  • @Synchronized:用在方法上,将方法声明为同步的,并自动加锁,而锁对象是一个私有的属性$lock$LOCK,而java中的synchronized关键字锁对象是this,锁在this或者自己的类对象上存在副作用,就是你不能阻止非受控代码去锁this或者类对象,这可能会导致竞争条件或者其它线程错误

  • @Getter(lazy=true):可以替代经典的Double Check Lock样板代码

  • @Log:根据不同的注解生成不同类型的log对象,但是实例名称都是log,有六种可选实现类

    • @CommonsLog Creates log = org.apache.commons.logging.LogFactory.getLog(LogExample.class);
    • @Log Creates log = java.util.logging.Logger.getLogger(LogExample.class.getName());
  • @Log4j Creates log = org.apache.log4j.Logger.getLogger(LogExample.class);

    • @Log4j2 Creates log = org.apache.logging.log4j.LogManager.getLogger(LogExample.class);
  • @Slf4j Creates log = org.slf4j.LoggerFactory.getLogger(LogExample.class);

    • @XSlf4j Creates log = org.slf4j.ext.XLoggerFactory.getXLogger(LogExample.class);
Lombok代码示例
val示例
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
val sets = new HashSet<String>();
val lists = new ArrayList<String>();
val maps = new HashMap<String, String>();
//=>相当于如下
final Set<String> sets2 = new HashSet<>();
final List<String> lists2 = new ArrayList<>();
final Map<String, String> maps2 = new HashMap<>();
}
@NonNull示例
1
2
3
4
5
6
7
8
9
10
11
public void notNullExample(@NonNull String string) {
string.length();
}
//=>相当于
public void notNullExample(String string) {
if (string != null) {
string.length();
} else {
throw new NullPointerException("null");
}
}
@Cleanup示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
try {
@Cleanup InputStream inputStream = new FileInputStream(args[0]);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
//=>相当于
InputStream inputStream = null;
try {
inputStream = new FileInputStream(args[0]);
} catch (FileNotFoundException e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Getter/@Setter示例
1
2
3
4
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PROTECTED)
private int id;
private String shap;
@ToString示例
1
2
3
4
5
6
7
8
9
10
@ToString(exclude = "id", callSuper = true, includeFieldNames = true)
public class LombokDemo {
private int id;
private String name;
private int age;
public static void main(String[] args) {
//输出LombokDemo(super=LombokDemo@48524010, name=null, age=0)
System.out.println(new LombokDemo());
}
}
@EqualsAndHashCode示例
1
2
3
4
5
@EqualsAndHashCode(exclude = {"id", "shape"}, callSuper = false)
public class LombokDemo {
private int id;
private String shap;
}
@NoArgsConstructor/@RequiredArgsConstructor/@AllArgsConstructor示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@NoArgsConstructor
@RequiredArgsConstructor(staticName = "of")
@AllArgsConstructor
public class LombokDemo {
@NonNull
private int id;
@NonNull
private String shap;
private int age;
public static void main(String[] args) {
new LombokDemo(1, "circle");
//使用静态工厂方法
LombokDemo.of(2, "circle");
//无参构造
new LombokDemo();
//包含所有参数
new LombokDemo(1, "circle", 2);
}
}
@Data示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import lombok.Data;
@Data
public class Menu {
private String shopId;
private String skuMenuId;
private String skuName;
private String normalizeSkuName;
private String dishMenuId;
private String dishName;
private String dishNum;
//默认阈值
private float thresHold = 0;
//新阈值
private float newThresHold = 0;
//总得分
private float totalScore = 0;
}
@Value示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Value
public class LombokDemo {
@NonNull
private int id;
@NonNull
private String shap;
private int age;
//相当于
private final int id;
public int getId() {
return this.id;
}
...
}
@Builder示例
1
2
3
4
5
6
7
8
9
10
@Builder
public class BuilderExample {
private String name;
private int age;
@Singular
private Set<String> occupations;
public static void main(String[] args) {
BuilderExample test = BuilderExample.builder().age(11).name("test").build();
}
}
@SneakyThrows示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import lombok.SneakyThrows;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
public class Test {
@SneakyThrows()
public void read() {
InputStream inputStream = new FileInputStream("");
}
@SneakyThrows
public void write() {
throw new UnsupportedEncodingException();
}
//相当于
public void read() throws FileNotFoundException {
InputStream inputStream = new FileInputStream("");
}
public void write() throws UnsupportedEncodingException {
throw new UnsupportedEncodingException();
}
}

@Synchronized示例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SynchronizedDemo {
@Synchronized
public static void hello() {
System.out.println("world");
}
//相当于
private static final Object $LOCK = new Object[0];
public static void hello() {
synchronized ($LOCK) {
System.out.println("world");
}
}
}

@Getter(lazy = true)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class GetterLazyExample {
@Getter(lazy = true)
private final double[] cached = expensive();
private double[] expensive() {
double[] result = new double[1000000];
for (int i = 0; i < result.length; i++) {
result[i] = Math.asin(i);
}
return result;
}
}

// 相当于如下所示:
import java.util.concurrent.atomic.AtomicReference;
public class GetterLazyExample {
private final AtomicReference<java.lang.Object> cached = new AtomicReference<>();
public double[] getCached() {
java.lang.Object value = this.cached.get();
if (value == null) {
synchronized (this.cached) {
value = this.cached.get();
if (value == null) {
final double[] actualValue = expensive();
value = actualValue == null ? this.cached : actualValue;
this.cached.set(value);
}
}
}
return (double[]) (value == this.cached ? null : value);
}
private double[] expensive() {
double[] result = new double[1000000];
for (int i = 0; i < result.length; i++) {
result[i] = Math.asin(i);
}
return result;
}
}

Lombok注解原理

说道 Lombok,我们就得去提到 JSR 269: Pluggable Annotation Processing API (www.jcp.org/en/jsr/deta…) 。JSR 269 之前我们也有注解这样的神器,可是我们比如想要做什么必须使用反射,反射的方法局限性较大。首先,它必须定义@Retention为RetentionPolicy.RUNTIME,只能在运行时通过反射来获取注解值,使得运行时代码效率降低。其次,如果想在编译阶段利用注解来进行一些检查,对用户的某些不合理代码给出错误报告,反射的使用方法就无能为力了。而 JSR 269 之后我们可以在 Javac的编译期利用注解做这些事情。所以我们发现核心的区分是在 运行期 还是 编译期

http://static.cyblogs.com/16140d77d8166720.png

从上图可知,Annotation Processing 是在解析和生成之间的一个步骤。具体详细步骤如下:

http://static.cyblogs.com/16140d77d8050b6c.png

上图是 Lombok 处理流程,在Javac 解析成抽象语法树之后(AST), Lombok 根据自己的注解处理器,动态的修改 AST,增加新的节点(所谓代码),最终通过分析和生成字节码

自从Java 6起,javac就支持“JSR 269 Pluggable Annotation Processing API”规范,只要程序实现了该API,就能在javac运行的时候得到调用

  1. 常用的项目管理工具Maven所使用的java编译工具来源于配置的第三方工具,如果我们配置这个第三方工具为Oracle javac的话,那么Maven也就直接支持lombok了;
  2. Intellij Idea配置的编译工具为Oracle javac的话,也就直接支持lombok了;

IDE工具问题解决:

现在有一个A类,其中有一些字段,没有创建它们的setter和getter方法,使用了lombok的@Data注解,另外有一个B类,它调用了A类实例的相应字段的setter和getter方法

编译A类和B类所在的项目,并不会报错,因为最终生成的A类字节码文件中存在相应字段的setter和getter方法

但是,IDE发现B类源代码中所使用的A类实例的setter和getter方法在A类源代码中找不到定义,IDE会认为这是错误

要解决以上这个不是真正错误的错误,可以下载安装Intellij Idea中的”Lombok plugin”。

自定义支持JSR269的注解

一般javac的编译过程,java文件首先通过进行解析构建出一个AST,然后执行注解处理,最后经过分析优化生成二进制的.class文件。我们能做到的是,在注解处理阶段进行一些相应处理。首先我们在META-INF.services下创建如下文件:

http://static.cyblogs.com/16140d77d82abad8.png

文件中指定我们的注解处理器:com.alipay.kris.other.lombok.MyAnnotaionProcessor,然后我们接可以编写自己的注解处理器,一个简单的实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SupportedSourceVersion(SourceVersion.RELEASE_8)
@SupportedAnnotationTypes("com.alipay.kris.other.lombok.*")
public class MyAnnotaionProcessor extends AbstractProcessor {
public MyAnnotaionProcessor() {
super();
}
@Override
public boolean process(Set<? extends TypeElement> annotations,RoundEnvironment roundEnv) {
for (Element elem : roundEnv.getElementsAnnotatedWith(MyAnnotation.class)) {
MyAnnotation annotation = elem.getAnnotation(MyAnnotation.class);
String message = "annotation found in " + elem.getSimpleName()
+ " with " + annotation.value();
addToString(elem);
processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, message);
}
return true; // no further processing of this annotation type
}
}

参考地址

最近我们自己在重构项目,系统为了符合82原则(希望是80%的业务能通过穷举的方式固定下来,只有20%的允许特殊的定义),那么在固定一些标准流程以后,比如我们放大了原子服务的能力,当放大原子服务能力的时候,你就会发现,虽然抽象上看做的事情是一个意思,但是到实际去实现的时候发现还是各不相同。

在这里为了解决一个实现不同,但流程相同的问题,以及团队协作上的问题。我们引入的SPI (Service Provider Interface)

使用案例

通常情况下,使用ServiceLoader来实现SPI机制。 SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制。SPI是一种动态替换发现的机制, 比如有个接口,想运行时动态的给它添加实现,你只需要添加一个实现。

SPI机制可以归纳为如下的图:

http://static.cyblogs.com/20191111203102234.png

如果大家看过源代码或者说看过一些博客文章大概都清楚,在一些开源项目中大量的使用了SPI的方式,比如:mysql-connector-javadubbo等。

我们大概看眼MySQL的一个SPI实现

http://static.cyblogs.com/WechatIMG450.png

JDBC中的接口即为:java.sql.Driver

SPI机制的实现核心类为:java.util.ServiceLoader

Provider则为:com.mysql.jdbc.Driver

1
2
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

简单写个SPI

代码部分,接口与实现类定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.vernon.test.spi;

/**
* Created with vernon-test
*
* @description:
* @author: chenyuan
* @date: 2020/4/2
* @time: 11:08 上午
*/
public interface IRepository {
void save(String data);
}

package com.vernon.test.spi.impl;

import com.vernon.test.spi.IRepository;

/**
* Created with vernon-test
*
* @description:
* @author: chenyuan
* @date: 2020/4/2
* @time: 11:09 上午
*/
public class MongoRepository implements IRepository {
@Override
public void save(String data) {
System.out.println("Save " + data + " to Mongo");
}
}

package com.vernon.test.spi.impl;

import com.vernon.test.spi.IRepository;

/**
* Created with vernon-test
*
* @description:
* @author: chenyuan
* @date: 2020/4/2
* @time: 11:08 上午
*/
public class MysqlRepository implements IRepository {

@Override
public void save(String data) {
System.out.println("Save " + data + " to Mysql");
}

}

调用函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.vernon.test.spi;

import java.util.Iterator;
import java.util.ServiceLoader;

/**
* Created with vernon-test
*
* @description:
* @author: chenyuan
* @date: 2020/4/2
* @time: 11:12 上午
*/
public class SPIMain {

public static void main(String[] args) {
ServiceLoader<IRepository> serviceLoader = ServiceLoader.load(IRepository.class);
Iterator<IRepository> it = serviceLoader.iterator();
while (it != null && it.hasNext()) {
IRepository demoService = it.next();
System.out.println("class:" + demoService.getClass().getName());
demoService.save("tom");
}
}

}

执行结果:

1
2
3
4
5
6
Connected to the target VM, address: '127.0.0.1:58517', transport: 'socket'
class:com.vernon.test.spi.impl.MongoRepository
Save tom to Mongo
class:com.vernon.test.spi.impl.MysqlRepository
Save tom to Mysql
Disconnected from the target VM, address: '127.0.0.1:58517', transport: 'socket'

ServiceLoader类的内部实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}

public static <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader) {
return new ServiceLoader<>(service, loader);
}

private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}

public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}

SercviceLoader的初始化跑完如上代码就结束了。但是实际上联系待实现接口和实现接口的类之间的关系并不只是在构造ServiceLoader类的过程中完成的,而是在迭代器的方法hasNext()中实现的。

动态调用的实现

在使用案例中写的forEach语句内部逻辑就是迭代器,迭代器的重要方法就是hasNext()

ServiceLoader是一个实现了接口Iterable接口的类。

hasNext()方法的源代码:

1
2
3
4
5
6
7
8
9
10
public boolean hasNext() {
if (acc == null) {
return hasNextService();
} else {
PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
public Boolean run() { return hasNextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

抛出复杂的确保安全的操作,可以将上述代码看作就是调用了方法:hasNextService.

hasNextService()方法的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}

上述代码中比较重要的代码块是:

1
2
3
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);

此处PREFIX(前缀)是一个常量字符串(用于规定配置文件放置的目录,使用相对路径,说明其上层目录为以项目名为名的文件夹):

1
private static final String PREFIX = "META-INF/services/";

那么fullName会被赋值为:META-INF/services/com.vernon.test.spi.IRepository

然后调用方法getSystemResourcesgetResources将fullName参数视作为URL,返回配置文件的URL集合 。

1
pending = parse(service, configs.nextElement());

parse方法是凭借 参数1:接口的Class对象 和 参数2:配置文件的URL来解析配置文件,返回值是含有配置文件里面的内容,也就是实现类的全名(包名+类名)字符串的迭代器;

最后调用下面的代码,得到下面要加载的类的完成类路径字符串,相对路径。在使用案例中,此值就可以为:

1
2
com.vernon.test.spi.impl.MongoRepository
com.vernon.test.spi.impl.MysqlRepository

这仅仅是迭代器判断是否还有下一个迭代元素的方法,而获取每轮迭代元素的方法为:nextService()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public S next() {
if (acc == null) {
return nextService();
} else {
PrivilegedAction<S> action = new PrivilegedAction<S>() {
public S run() { return nextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
// 实例化
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

总结

1、SPI的理念:通过动态加载机制实现面向接口编程,提高了框架和底层实现的分离;
2、ServiceLoader 类提供的 SPI 实现方法只能通过遍历迭代的方法实现获得Provider的实例对象,如果要注册了多个接口的实现类,那么显得效率不高;
3、虽然通过静态方法返回,但是每一次Service.load方法的调用都会产生一个ServiceLoader实例,不属于单例设计模式;
4、ServiceLoaderClassLoader是类似的,都可以负责一定的类加载工作,但是前者只是单纯地加载特定的类,即要求实现了Service接口的特定实现类;而后者几乎是可以加载所有Java类;
5、对于SPi机制的理解有两个要点:

  • 理解动态加载的过程,知道配置文件是如何被利用,最终找到相关路径下的类文件,并加载的;
  • 理解 SPI 的设计模式:接口框架 和底层实现代码分离;

6、之所以将ServiceLoader类内部的迭代器对象称为LazyInterator,是因为在ServiceLoader对象创建完毕时,迭代器内部并没有相关元素引用,只有真正迭代的时候,才会去解析、加载、最终返回相关类(迭代的元素);

参考地址

技术架构

http://static.cyblogs.com/rocketmq_architecture_1.jpgRocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServerNameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServerBroker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护ConsumerTopic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master BrokerSlave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为MasterSlave,一个Master可以对应多个Slave,但是一个Slave只能对应一个MasterMasterSlave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示SlaveMaster也可以部署多个。每个BrokerNameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一MasterSlave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • ProducerNameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • ConsumerNameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的MasterSlave建立长连接,且定时向MasterSlave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServerNameServer起来后监听端口,等待BrokerProducerConsumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有TopicBroker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • ConsumerProducer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

事务消息

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

http://static.cyblogs.com/rocketmq_design_10.png

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

RocketMQ事务消息设计

1.事务消息在一阶段对用户不可见

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

http://static.cyblogs.com/rocketmq_design_11.png

RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。

2.Commit和Rollback操作以及Op消息的引入

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

3.Op消息的存储和对应关系

RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

http://static.cyblogs.com/rocketmq_design_12.png

4.Half消息的索引构建

在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

5.如何处理二阶段失败的消息?

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在吐 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障, 恢复后需要重新消费 1 小时前的数据,那么Broker 要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:

(1). 消息堆积在内存 Buffer,一旦超过内存 Buffer,可以根据一定的丢弃策略来丢弃消息,如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存 Buffer 大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。

Broker 的 Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小,这类 Buffer 通常大小有限,如果 Buffer 满 了以后怎么办? 下面是 CORBA Notification 规范中处理方式:

  • RejectNewEvents 拒绝新来的消息,向Producer返回RejectNewEvents错误码。

  • 按照特定策略丢弃已有消息

    • a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
    • b) FifoOrder - The first event received will be the first discarded.
    • c) LifoOrder - The last event received will be the first discarded.
    • d) PriorityOrder - Events should be discarded in priority order, such that lower priority

(2). 消息堆积到持久化存储系统中,例如 DB,KV 存储,文件记录形式。 当消息不能在内存 Cache 命中时,要不可避免的访问磁盘,会产生大量读 IO,读 IO 的吞吏量直接决定了消息堆积后的访问能力。 评估消息堆积能力主要有以下四点:

  • 消息能堆积多少条,多少字节?即消息的堆积容量。

  • 消息堆积后,收消息的吞吏量大小,是否会受堆积影响?

  • 消息堆积后,正常消费的 Consumer 是否会受影响?

  • 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?

零拷贝原理

Consumer 消费消息过程,使用了零拷贝,零拷贝包含以下两种方式 :

使用 mmap + write 方式

http://static.cyblogs.com/浅析零拷贝技术-2.png

优点:即使频繁调用,使用小块文件传输,效率也很高

缺点:不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂,需要避免 JVM Crash 问题。

使用 sendfile 方式

http://static.cyblogs.com/浅析零拷贝-03.jpg

优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题。

缺点:小块文件效率低于mmap 方式,只能是BIO方式传输,不能使用 NIO。 RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。

参考地址

第一句话:时间戳

时间不分东西南北、在地球的每一个角落都是相同的。他们都有一个相同的名字,叫时间戳时间戳 指的就是Unix时间戳(Unix timestamp)。它也被称为Unix时间(Unix time)、POSIX时间(POSIX time),是一种时间表示方式,定义为从格林威治时间1970年01月01日00时00分00秒起至现在的总秒数。

关于 时间戳, 你可以看在线时间戳 http://tool.chinaz.com/Tools/unixtime.aspx

第二句话:时区

时间戳 在地球的每一个角落都是相同的,但是在相同的时间点会有不同的表达方式,所以有了另外一个时间概念,叫时区。这里的时区地区不是同一个概念,例如我们所在的时区东八区
在设备中,可以自己手动的切换当前的系统时区:

http://static.cyblogs.com/1198135-c33bb659a21ae7d7.jpg

你会发现:当你选在不同的时区,你的当前时间是不一样的。

第三句话:时间戳与时区在Code中应用

格林威治标准时间GMT

十七世纪,格林威治皇家天文台为了海上霸权的扩张计画而进行天体观测。1675年旧皇家观测所(Old Royal Observatory) 正式成立,到了1884年决定以通过格林威治的子午线作为划分地球东西两半球的经度零度。观测所门口墙上有一个标志24小时的时钟,显示当下的时间,对全球而言,这里所设定的时间是世界时间参考点,全球都以格林威治的时间作为标准来设定时间,这就是我们耳熟能详的「格林威治标准时间(Greenwich Mean Time,简称G.M.T.)的由来,标示在手表上,则代表此表具有两地时间功能,也就是同时可以显示原居地和另一个国度的时间。

世界协调时间UTC

多数的两地时间表都以GMT来表示,但也有些两地时间表上看不到GMT字样,出现的反而是UTC这3个英文字母,究竟何谓UTC?事实上,UTC指的是Coordinated Universal Time-世界协调时间(又称世界标准时间、世界统一时间),是经过平均太阳时(以格林威治时间GMT为准)、地轴运动修正后的新时标以及以「秒」为单位的国际原子时所综合精算而成的时间,计算过程相当严谨精密,因此若以「世界标准时间」的角度来说,UTC比GMT来得更加精准。其误差值必须保持在0.9秒以内,若大于0.9秒则由位于巴黎的国际地球自转事务中央局发布闰秒,使UTC与地球自转周期一致。所以基本上UTC的本质强调的是比GMT更为精确的世界时间标准,不过对于现行表款来说,GMT与UTC的功能与精确度是没有差别的。

代码展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.vernon.test.time;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;

public class UTCTimecase {

private static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");

/**
* 得到UTC时间,类型为字符串,格式为"yyyy-MM-dd HH:mm"<br />
* 如果获取失败,返回null
*
* @return
*/
public static String getUTCTimeStr() {
StringBuffer UTCTimeBuffer = new StringBuffer();
// 1、取得本地时间:
Calendar cal = Calendar.getInstance();
// 2、取得时间偏移量:
int zoneOffset = cal.get(java.util.Calendar.ZONE_OFFSET);
// 3、取得夏令时差:
int dstOffset = cal.get(java.util.Calendar.DST_OFFSET);
// 4、从本地时间里扣除这些差量,即可以取得UTC时间:
cal.add(java.util.Calendar.MILLISECOND, -(zoneOffset + dstOffset));
int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
int day = cal.get(Calendar.DAY_OF_MONTH);
int hour = cal.get(Calendar.HOUR_OF_DAY);
int minute = cal.get(Calendar.MINUTE);
UTCTimeBuffer.append(year).append("-").append(month).append("-").append(day);
UTCTimeBuffer.append(" ").append(hour).append(":").append(minute);
try {
format.parse(UTCTimeBuffer.toString());
return UTCTimeBuffer.toString();
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}

/**
* 将UTC时间转换为东八区时间
*
* @param UTCTime
* @return
*/
public static String getLocalTimeFromUTC(String UTCTime) {
java.util.Date UTCDate = null;
String localTimeStr = null;
try {
UTCDate = format.parse(UTCTime);
format.setTimeZone(TimeZone.getTimeZone("GMT-8"));
localTimeStr = format.format(UTCDate);
} catch (ParseException e) {
e.printStackTrace();
}

return localTimeStr;
}

public static void main(String[] args) {
String UTCTimeStr = getUTCTimeStr();
System.out.println(UTCTimeStr);
System.out.println(getLocalTimeFromUTC(UTCTimeStr));
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.vernon.test.time;

import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;

@Slf4j
public class DateTimeCase {

public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";

public static String getOffsetTime(int offset, Date date) {
TimeZone timeZone = TimeZone.getTimeZone("GMT-" + offset + ":00");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat.setTimeZone(timeZone);
return simpleDateFormat.format(date);
}

public static void main(String[] args) {
// 取得本地时间:
Calendar cal1 = Calendar.getInstance();
log.info("cal1={}", new SimpleDateFormat(FORMAT_DATETIME).format(cal1.getTime()));
log.info("cal1={}", cal1.getTime().getTime());

log.info("cal2={}", getOffsetTime(9, cal1.getTime()));
}
}

总结

如果想实现国际化,数据库可以采用存放时间戳的方式,因为没有时区的影响,都是从1970年01月01日00时00分00秒计算的时间。然后根据当前的地区来显示。

参考地址

背景

2019年我们经历了一整年的各种迁移,其中包括了一项RPC框架的切换。以前我们用的HSF RPC框架,它是来自于阿里巴巴,经过了多年的双11高并发的洗礼,高性能这块儿毫无疑问没有任何的问题,而且它还同时支持TCPHTTP的方式,唯一不太好的就是它不开源,如果出现问题定位起来确实有一些问题与风险。

所以,我们为了拥抱开源,全部采用SpringCloud,系统与系统之间调用是通过FeignClient的方式来调用的,但是由于底层的部分系统由于时间、人力、历史等原因,无法在短时间内都像我们一样能积极响应。所以就出现了SpringCloud与HSF服务同时存在的情况,为了大家再编码过程中都能像本地调用(TCPFeignClient),所以就写了一个代理工具。

交互图

http://static.cyblogs.com/QQ截图20200406181706.png

如果是上面的方式,我们还是能感受到每次都是通过HttpClient等方式发起一次Http请求,写代码时候的体验不是很好。

http://static.cyblogs.com/QQ截图20200406182159.png

为了解决这个问题,那么我们的任务就是来写一个这个代理封装。

分析功能点

了解一下FeignClient

我们参考一下FeignClient的功能一个解析过程,如图:

http://static.cyblogs.com/14126519-4cc483cb15b9dc6d.png

  • 生成动态代理类
  • 解析出等的MethodHandler
  • 动态生成Request
  • Encoder
  • 拦截器处理
  • 日志处理
  • 重试机制
代理需要考虑什么?

http://static.cyblogs.com/QQ截图20200406193343.png

那我们不用说写那么完善,我们的第一个目标就是实现扫描 → 代理 → 发送请求。

因为HSF的参数与标准的Http方式不太一致,所以在发起Http请求的时候,需要特殊的构造一下报文的格式

1
2
curl -d "ArgsTypes=[\"com.cyblogs..QueryConfigReq\"]&ArgsObjects=[{\"relationCode\":\"ABCD\"}]" 
http://127.0.0.1:8083/com.cyblogs.api.ConfigServiceV2Api:1.0.0/queryNewConfig

代码框架实现

SpringBoot总入口,打开@EnableHsfClients注解

1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableHsfClients(basePackages = "com.cyblogs.client.hsf")
public class App {

public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}

这里定义好需要扫描的包,具体的类等

1
2
3
4
5
6
7
8
9
10
11
12
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import({ HsfClientsRegistrar.class })
public @interface EnableHsfClients {
String[] value() default {};

String[] basePackages() default {};

Class<?>[] clients() default {};

}

利用SpirngImportBeanDefinitionRegistrar来进行自动注入生成Bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
public class HsfClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, BeanClassLoaderAware {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
registerHsfClient(importingClassMetadata, registry);
}

public void registerHsfClient(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);

Set<String> basePackages;

Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableHsfClients.class.getName());
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(HsfClient.class);
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
if (clients == null || clients.length == 0) {
scanner.addIncludeFilter(annotationTypeFilter);
basePackages = getBasePackages(metadata);
} else {
basePackages = new HashSet<>();
for (Class<?> clazz : clients) {
basePackages.add(ClassUtils.getPackageName(clazz));
}
}

for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@HsfClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(HsfClient.class.getCanonicalName());
registerHsfClient(registry, annotationMetadata, attributes);
}
}
}
}

protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false) {

@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
if (beanDefinition.getMetadata().isIndependent()) {
if (beanDefinition.getMetadata().isInterface()
&& beanDefinition.getMetadata().getInterfaceNames().length == 1
&& Annotation.class.getName().equals(beanDefinition.getMetadata().getInterfaceNames()[0])) {
try {
Class<?> target = ClassUtils.forName(beanDefinition.getMetadata().getClassName(),
HsfClientsRegistrar.this.classLoader);
return !target.isAnnotation();
} catch (Exception ex) {
log.error("Could not load target class: " + beanDefinition.getMetadata().getClassName(),
ex);

}
}
return true;
}
return false;

}
};
}

protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata
.getAnnotationAttributes(EnableHsfClients.class.getCanonicalName());

Set<String> basePackages = new HashSet<>();
for (String pkg : (String[]) attributes.get("value")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
for (String pkg : (String[]) attributes.get("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}

if (basePackages.isEmpty()) {
basePackages.add(ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}
return basePackages;
}

private void registerHsfClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(HsfClientFactoryBean.class);
String version = resolve((String) attributes.get("version"));
String interfaceName = resolve((String) attributes.get("interfaceName"));
if (interfaceName.length() == 0) {
interfaceName = className;
}
definition.addPropertyValue("url", String.format(FORMAT, getUrl(attributes), interfaceName, version));
definition.addPropertyValue("type", className);
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_NAME);

String alias = interfaceName + "HsfClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setPrimary(true);
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

private String getUrl(Map<String, Object> attributes) {
String url = resolve((String) attributes.get("url"));
boolean secure = false;
Object securePlaceHolder = attributes.get("secure");
if (securePlaceHolder instanceof Boolean) {
secure = ((Boolean) securePlaceHolder).booleanValue();
} else {
Boolean.parseBoolean(resolve((String) attributes.get("secure")));
}
String protocol = secure ? "https" : "http";
if (!url.contains("://")) {
url = protocol + "://" + url;
}
if (url.endsWith("/")) {//避免设置的url为'schema:ip:port/'格式
url = url.substring(0, url.length() - 1);
}
try {
new URL(url);
} catch (MalformedURLException e) {
throw new IllegalArgumentException(url + " is malformed", e);
}
return url;
}
}

HsfClientFactoryBean定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Setter
@Getter
public class HsfClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
private ApplicationContext applicationContext;
private Class<?> type;
private String url;
private RestTemplate restTemplate;

@Override
public void afterPropertiesSet() throws Exception {
Assert.hasText(url, "url must be set");
Assert.notNull(type, "type must be set");
if (restTemplate == null) {
restTemplate = new RestTemplate();
restTemplate.getMessageConverters().clear();
restTemplate.getMessageConverters().add(new StringHttpMessageConverter(Charset.forName("UTF-8")));//write application/x-www-form-urlencoded request
restTemplate.getMessageConverters().add(new FastJsonHttpMessageConverter());//read and write application/json
}
}

public Object getObject() throws Exception {
Map<Method, HsfMethodHandler> methodToHandler = new LinkedHashMap<Method, HsfMethodHandler>();
for (Method method : type.getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (isDefaultMethod(method)) {
continue;//TODO 暂时忽略
} else {
methodToHandler.put(method, new HsfMethodHandler(restTemplate, type, method, url));
}
}
InvocationHandler handler = new HsfInvocationHandler(methodToHandler);
return Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[] { type }, handler);
}

@Override
public Class<?> getObjectType() {
return type;
}

@Override
public boolean isSingleton() {
return true;
}

private boolean isDefaultMethod(Method method) {
final int SYNTHETIC = 0x00001000;
return ((method.getModifiers()
& (Modifier.ABSTRACT | Modifier.PUBLIC | Modifier.STATIC | SYNTHETIC)) == Modifier.PUBLIC)
&& method.getDeclaringClass().isInterface();
}
}

代理类的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class HsfInvocationHandler implements InvocationHandler {

private final Map<Method, HsfMethodHandler> handlers;

public HsfInvocationHandler(Map<Method, HsfMethodHandler> handlers) {
this.handlers = handlers;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
log.error(e.getMessage(), e);
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
return handlers.get(method).invoke(args);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof HsfInvocationHandler) {
Map<Method, HsfMethodHandler> other = ((HsfInvocationHandler) obj).handlers;
return other.equals(handlers);
}
return false;
}

@Override
public int hashCode() {
return handlers.hashCode();
}

@Override
public String toString() {
return handlers.toString();
}

}

最后就是就是HsfMethodHandler的一个具体实现,包括上面所提到的Request参数的构造,一个invoke方法的调用。

总结

  • 其实通过HttpClient的方式去调用也不是不行,只是说如果通过参考别人的代码,做一个RPC调用底层原理的一个分析,我们是可以做到一些系统层面的封装的,而且这个jar包是可以做成plugin的方式去提供给别人用的。
  • 了解动态代理的原理,可以做到对代码项目无感知或者少感知的作用。
  • 通过该过程举一反三,其他的场景都可以复制该思路去做事情。
0%