简栈

拥抱AI,持续成长

前言

线程池ThreadPoolExecutor在运行的过程中,业务并发量变动,需要不停服务调整线程池的线程数,ThreadPoolExecutor支持动态调整corePoolSizemaximumPoolSize的值。

示例demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ThreadChangeTest {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
10,
10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
int count = 0;
while (true) {
Thread.sleep(1000l);
for (int i = 0; i < 9; i++) {
executor.execute(() -> {
/*try {
Thread.sleep(1l);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("------------core:\t" + executor.getCorePoolSize() + "\tactive:\t" + executor.getActiveCount() + "\tmax:\t" + executor.getMaximumPoolSize());
});
}

count++;
if (count == 20) {
executor.setCorePoolSize(2);
executor.setMaximumPoolSize(9);
System.out.println("----------------------------------------");
}

if (count == 100) {
executor.shutdown();
System.out.println("=============================================");
break;
}
}
Thread.currentThread().join();
}
}

在程序运行中动态修改线程池corePoolSizemaximumPoolSize的值

源码分析

线程池参数调大
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//核心线程调小,中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
//核心线程数调大后,从队列取任务
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
//队列大小是否可以取任务
int k = Math.min(delta, workQueue.size());
//队列有任务就取,否则break
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}

public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

源码看出:线程池的调节时直接设置corePoolSizemaximumPoolSize的值

其中

阅读全文 »

在数据仓库的建设中,一般都会围绕着星型模型和雪花模型来设计表关系或者结构。下面我们先来理解这两种模型的概念。

星型模型

http://static.cyblogs.com/1345516634_6388.jpg

​ 星型模是一种多维的数据关系,它由一个事实表和一组维表组成。每个维表都有一个维作为主键,所有这些维的主键组合成事实表的主键。强调的是对维度进行预处理,将多个维度集合到一个事实表,形成一个宽表。这也是我们在使用hive时,经常会看到一些大宽表的原因,大宽表一般都是事实表,包含了维度关联的主键和一些度量信息,而维度表则是事实表里面维度的具体信息,使用时候一般通过join来组合数据,相对来说对OLAP的分析比较方便。

雪花模型

http://static.cyblogs.com/1345516734_4305.jpg

​ 当有一个或多个维表没有直接连接到事实表上,而是通过其他维表连接到事实表上时,其图解就像多个雪花连接在一起,故称雪花模型。雪花模型是对星型模型的扩展。它对星型模型的维表进一步层次化,原有的各维表可能被扩展为小的事实表,形成一些局部的 “层次 “ 区域,这些被分解的表都连接到主维度表而不是事实表。雪花模型更加符合数据库范式,减少数据冗余,但是在分析数据的时候,操作比较复杂,需要join的表比较多所以其性能并不一定比星型模型高。

星型模型和雪花模型对比

属性 星型模型 雪花模型
数据总量
可读性 容易
表个数
查询速度
冗余度
对实时表的情况 增加宽度 字段比较少,冗余底
扩展性

应用场景

阅读全文 »

聚集(clustered)索引,也叫聚簇索引

定义:数据行的物理顺序与列值(一般是主键的那一列)的逻辑顺序相同,一个表中只能拥有一个聚集索引。

http://static.cyblogs.com/20181225211503670.png

注:第一列的地址表示该行数据在磁盘中的物理地址,后面三列才是我们SQL里面用的表里的列,其中id是主键,建立了聚集索引。

结合上面的表格就可以理解这句话了吧:数据行的物理顺序与列值的顺序相同,如果我们查询id比较靠后的数据,那么这行数据的地址在磁盘中的物理地址也会比较靠后。而且由于物理排列方式与聚集索引的顺序相同,所以也就只能建立一个聚集索引了。

聚集索引实际存放的示意图

从上图可以看出聚集索引的好处了,索引的叶子节点就是对应的数据节点(MySQL的MyISAM除外,此存储引擎的聚集索引和非聚集索引只多了个唯一约束,其他没什么区别),可以直接获取到对应的全部列的数据,而非聚集索引在索引没有覆盖到对应的列的时候需要进行二次查询,后面会详细讲。因此在查询方面,聚集索引的速度往往会更占优势。

创建聚集索引

如果不创建索引,系统会自动创建一个隐含列作为表的聚集索引。

  • 创建表的时候指定主键(注意:SQL Sever默认主键为聚集索引,也可以指定为非聚集索引,而MySQL里主键就是聚集索引)
阅读全文 »

前提

近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理的便于后续业务数据统计、标签系统构建等扩展功能的数据模型。基于当前团队的资源和能力,优先调研了Alibaba开源中间件Canal的使用。

http://static.cyblogs.com/m-w-c-1.png

这篇文章简单介绍一下如何快速地搭建一套Canal相关的组件。

关于Canal

简介

下面的简介和下一节的原理均来自于Canal项目的README

img

Canal[kə'næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

阅读全文 »

1. Kafka简介

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  1. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能
  2. 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输
  3. 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输
  4. 同时支持离线数据处理和实时数据处理
  5. Scale out:支持在线水平扩展

2. Kafka架构

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.

Producer:负责发布消息到Kafka broker

Consumer:消息消费者,向Kafka broker读取消息的客户端。

Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

阅读全文 »

背景

为了后续的沟通方便,在20200213的早上创建了一个**《简栈-Java技术交流群》**,也方便大家通过扫二维码积极的参与进来。

http://static.cyblogs.com/简栈-Java技术交流群.JPG

如果是二维码已经过期,大家可以添加简栈文化-小助手的微信号(lastpass4u),然后让他拉大家进群进群。我们保持着小而美的精神,宁缺毋滥。

http://static.cyblogs.com/简栈文化-小助手.jpg

然后早上群里就有人提了一个问题:

执行计划里面的扫描函数跟执行时间不匹配,比如查询优化器发现,扫描a索引行数更多,所以更慢,因此优化器选择了索引b, 但实际上走b索引的时候比a更慢,走a索引大概是4秒左右,b是8秒。

这个问题激发起了大家的讨论,有的人建议说:

1、这种可以强制指定索引执行的吧

2、这个扫描行数都是预估的不一定准的,能操作shell的话执行analyse table看看。

3、看一下你的index,DDL,explain等等

但提问者明显这些都是已经自己搞清楚了的,他关心的是底层的优化器成本规则等。这类我才意识到EXPLAIN出来的是结果,其实数据库底层本身是有优化器的,而最终选择谁,是否过索引等都是有它的规则的。这其中都涉及到效率与成本问题。

阅读全文 »

作者:吃饭睡觉撸代码

来源:https://fangjian0423.github.io/2017/05/10/springboot-context-refresh/

前言

Spring容器创建之后,会调用它的refresh方法,refresh的时候会做很多事情:比如完成配置类的解析、各种BeanFactoryPostProcessor和BeanPostProcessor的注册、国际化配置的初始化、web内置容器的构造等等。

我们来分析一下这个refresh过程。

还是以web程序为例,那么对应的Spring容器为AnnotationConfigEmbeddedWebApplicationContext。它的refresh方法调用了父类AbstractApplicationContext的refresh方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void refresh() throws BeansException, IllegalStateException {
// refresh过程只能一个线程处理,不允许并发执行
synchronized (this.startupShutdownMonitor) {
prepareRefresh();
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
prepareBeanFactory(beanFactory);
try {
postProcessBeanFactory(beanFactory);
invokeBeanFactoryPostProcessors(beanFactory);
registerBeanPostProcessors(beanFactory);
initMessageSource();
initApplicationEventMulticaster();
onRefresh();
registerListeners();
finishBeanFactoryInitialization(beanFactory);
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
destroyBeans();
cancelRefresh(ex);
throw ex;
}
finally {
resetCommonCaches();
}
}
}

prepareRefresh方法

表示在真正做refresh操作之前需要准备做的事情:

  1. 设置Spring容器的启动时间,撤销关闭状态,开启活跃状态。
  2. 初始化属性源信息(Property)
  3. 验证环境信息里一些必须存在的属性
阅读全文 »

1. 背景

1.1. 惊人的性能数据

最近一个圈内朋友通过私信告诉我,通过使用 Netty4 + Thrift 压缩二进制编解码技术,他们实现了 10W TPS(1K 的复杂 POJO 对象)的跨节点远程服务调用。相比于传统基于 Java 序列化 +BIO(同步阻塞 IO)的通信框架,性能提升了 8 倍多。

事实上,我对这个数据并不感到惊讶,根据我 5 年多的 NIO 编程经验,通过选择合适的 NIO 框架,加上高性能的压缩二进制编解码技术,精心的设计 Reactor 线程模型,达到上述性能指标是完全有可能的。

下面我们就一起来看下 Netty 是如何支持 10W TPS 的跨节点远程服务调用的,在正式开始讲解之前,我们先简单介绍下 Netty。

1.2. Netty 基础入门

Netty 是一个高性能、异步事件驱动的 NIO 框架,它提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于 Netty 的 NIO 框架构建。

2. Netty 高性能之道

2.1. RPC 调用的性能模型分析

阅读全文 »

Shadowsocks PAC规则

ShadowSocks默认使用GFWList规则和使用adblock plus的引擎。要想自己添加自定义的用户规则,最好熟悉一下其规则:

中文版:Adblock Plus过滤规则

自定义代理规则的设置语法与GFWlist相同,语法规则如下:

  • 通配符支持。
    • 比如 *.example.com/*
    • 实际书写时可省略 * , 如.example.com/*.example.com/* 效果一样
  • 正则表达式支持。
    • \ 开始和结束, 如 \[\w]+:\/\/example.com\
  • 例外规则 @@
    • @@*.example.com/* 满足 @@ 后规则的地址不使用代理
  • 匹配地址开始和结尾 |
    • |http://example.comexample.com| 分别表示以 http://example.com 开始和以 example.com 结束的地址
  • ||标记
    • ||example.comhttp://example.comhttps://example.comftp://example.com 等地址址满足条件。
  • 注释 !
    • !我是注释
  • 分隔符^
    • 表示除了字母、数字或者 _ - . % 之外的任何字符。如 http://example.com^http://example.com/http://example.com:8000/ 均满足条件,而 http://example.com.ar/ 不满足条件。

什么是PAC

  • 维基百科摘录的关于PAC的解释:
    • 代理自动配置(英语:Proxy auto-config,简称PAC)是一种网页浏览器技术,用于定义浏览器该如何自动选择适当的代理服务器来访问一个网址。
    • 一个PAC文件包含一个JavaScript形式的函数FindProxyForURL(url, host)
    • 这个函数返回一个包含一个或多个访问规则的字符串。
    • 用户代理根据这些规则适用一个特定的代理其或者直接访问。
    • 当一个代理服务器无法响应的时候,多个访问规则提供了其他的后备访问方法。
    • 浏览器在访问其他页面以前,首先访问这个PAC文件。
    • PAC文件中的URL可能是手工配置的,也可能是是通过网页的网络代理自发现协议(Web Proxy Autodiscovery Protocol)自动配置的。
  • 源自网络的图解:

http://static.cyblogs.com/PAC.png

简单说来,PAC就是一种配置规则,它能让你的浏览器智能判断哪些网站走代理,哪些不需要走代理。

pac.txt

阅读全文 »

init-method方法

init-method方法,初始化bean的时候执行,可以针对某个具体的bean进行配置。init-method需要在applicationContext.xml配置文档中bean的定义里头写明。例如:

1
<bean id="TestBean" class="nju.software.xkxt.util.TestBean" init-method="init"></bean>

这样,当TestBean在初始化的时候会执行TestBean中定义的init方法。

afterPropertiesSet方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface InitializingBean {

/**
* Invoked by the containing {@code BeanFactory} after it has set all bean properties
* and satisfied {@link BeanFactoryAware}, {@code ApplicationContextAware} etc.
* <p>This method allows the bean instance to perform validation of its overall
* configuration and final initialization when all bean properties have been set.
* @throws Exception in the event of misconfiguration (such as failure to set an
* essential property) or if initialization fails for any other reason
*/
void afterPropertiesSet() throws Exception;

}

afterPropertiesSet方法,初始化bean的时候执行,可以针对某个具体的bean进行配置。afterPropertiesSet 必须实现 InitializingBean接口。实现 InitializingBean接口必须实现afterPropertiesSet方法。

BeanPostProcessor类

BeanPostProcessor,针对所有Spring上下文中所有的bean,可以在配置文档applicationContext.xml中配置一个BeanPostProcessor,然后对所有的bean进行一个初始化之前和之后的代理。BeanPostProcessor接口中有两个方法: postProcessBeforeInitializationpostProcessAfterInitializationpostProcessBeforeInitialization方法在bean初始化之前执行, postProcessAfterInitialization方法在bean初始化之后执行。

前置后置处理器
阅读全文 »
0%