多线程

多线程

spring boot 使用线程池

方式一:通过@Async注解调用

第一步:在Application启动类上面加上@EnableAsync

@SpringBootApplication
@EnableAsync
public class WorkApplication {
    public static void main(String[] args) {
        SpringApplication.run(WorkApplication.class, args);
    }
}

第二步:在需要异步执行的方法上加上@Async注解

@Service
@Slf4j
public class AsyncTest {
    @Async
    public void hello(String name){
    	//这里使用logger 方便查看执行的线程是什么
        log.info("异步线程启动 started."+name);
    }
}

第三步:测试类进行测试验证


@SpringBootTest
@Slf4j
class ThreadPoolApplicationTests {

    @Autowired
    AsyncTest asyncTest;
    @Test
    void contextLoads() throws InterruptedException {
        asyncTest.hello("afsasfasf");
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        Thread.sleep(1000);
    }
}

可以清楚的看到新开了一个task-1的线程执行任务。

2021-08-30 15:19:44.046 INFO 17264 --- [ main] c.e.work.ThreadPoolApplicationTests : Started ThreadPoolApplicationTests in 9.485 seconds (JVM running for 13.84)
2021-08-30 15:19:45.183 INFO 17264 --- [ task-1] com.example.work.service.AsyncTest : 异步线程启动 started.afsasfasf

方式二:直接调用ThreadPoolTaskExecutor

修改上面测试类,直接注入ThreadPoolTaskExecutor

@SpringBootTest
@Slf4j
class ThreadPoolApplicationTests {

    @Autowired
    AsyncTest asyncTest;
    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Test
    void contextLoads() throws InterruptedException {
        asyncTest.hello("async注解创建");
        threadPoolTaskExecutor.submit(new Thread(()-> log.info("threadPoolTaskExecutor 创建线程")));
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        Thread.sleep(1000);
    }
}

查看打印的日志发现都成功创建线程!!!:

2021-08-30 15:22:24.069 INFO 21868 --- [ task-2] c.e.work.ThreadPoolApplicationTests : threadPoolTaskExecutor 创建线程
2021-08-30 15:22:24.112 INFO 21868 --- [ task-1] com.example.work.service.AsyncTest : 异步线程启动 started.async注解创建

备注1:如果只使用ThreadPoolTaskExecutor, 是可以不用在Application启动类上面加上@EnableAsync注解的哦!!!

备注2:多次测试发现ThreadPoolTaskExecutor执行比@Async要快!!!

线程池默认配置信息

以下是springboot默认的线程池配置

spring:
    task:
        execution:
            pool:
                # 是否允许核心线程超时
                allow-core-thread-timeout: true
                # 核心线程数
                core-size: 8
                # 空闲线程存活时间
                keep-alive: 60s
                # 最大线程数
                max-size: 16
                # 线程队列数量
                queue-capacity: 100
            shutdown:
                # 线程关闭等待
                await-termination: false
                await-termination-period:
                # 线程名称前缀
            thread-name-prefix: myTask-

深入springboot默认的线程池

根据官方文档的说明,Spring Boot auto-configures a ThreadPoolTaskExecutor 。最终找到springboot的线程池自动装配类:TaskExecutionAutoConfiguration

@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
    Pool pool = properties.getPool();
    TaskExecutorBuilder builder = new TaskExecutorBuilder();
    builder = builder.queueCapacity(pool.getQueueCapacity());
    builder = builder.corePoolSize(pool.getCoreSize());
    builder = builder.maxPoolSize(pool.getMaxSize());
    builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
    builder = builder.keepAlive(pool.getKeepAlive());
    Shutdown shutdown = properties.getShutdown();
    builder = builder.awaitTermination(shutdown.isAwaitTermination());
    builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
    builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
    Stream var10001 = taskExecutorCustomizers.orderedStream();
    var10001.getClass();
    builder = builder.customizers(var10001::iterator);
    builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
    return builder;
}

同时在ThreadPoolTaskExecutor源码当中可以看到线程池的初始化方式是直接调用的ThreadPoolExecutor进行的初始化。

protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
    ThreadPoolExecutor executor;
    if (this.taskDecorator != null) {
        executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
            public void execute(Runnable command) {
                Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
                if (decorated != command) {
                    ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
                }
                                    super.execute(decorated);
                }
            };
        } else {
            executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }

同时会发现默认的线程池拒绝策略是: AbortPolicy 直接抛出异常!!!

private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();

自定义线程池

在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?
第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy

@Configuration
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。

2021-08-30 15:33:28.528 INFO 6624 --- [ myExecutor--2] c.e.work.ThreadPoolApplicationTests : threadPoolTaskExecutor 创建线程
2021-08-30 15:33:28.536 INFO 6624 --- [ myExecutor--1] com.example.work.service.AsyncTest : 异步线程启动 started.async注解创建

第二步:如果配置有多个线程池

@Configuration
public class ThreadPoolConfig {

@Configuration
public class ThreadPoolConfig {

       @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("poolExecutor")
    public Executor poolExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("taskPoolExecutor")
    public Executor taskPoolExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor3--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

执行测试类,直接报错说找到多个类,不知道加载哪个类

No qualifying bean of type 'org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor' available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor

由于测试类当中是这样自动注入的:

@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor; 

考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!

即修改测试类如下:

@Autowired
ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池

备注1:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:

@Async("taskPoolExecutor")
public void hello(String name){
    logger.info("异步线程启动 started."+name);
}

备注2:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池

线程池核心参数

corePoolSize 核心线程池的大小

maximumPoolSize 线程池中允许的最多线程数

keepAliveTime 当线程的数量大于核心线程数时,空闲线程等待任务的最长时间

unit keepAliveTime的时间单位,可选的有天、小时、分钟、毫秒、微秒、纳秒

workQueue 用来存储还没有执行的任务

threadFactory 线程工厂用于创建线程

handler 饱和策略

线程池饱和策略

CallerRunsPolicy 调用者运行策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了Executor的线程中执行该任务

AbortPolicy 默认策略(中止策略)新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。

DiscardPolicy 丢弃策略,丢弃尾部,即最新的线程

DiscardOldestPolicy 丢弃策略,丢弃头部,即最旧的线程

线程池类型

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

线程生命周期

初始(NEW) 新创建了一个线程对象,但还没有调用start()方法。

运行(RUNNABLE) 当线程对象调用了start()方法之后,该线程处于就绪状态。Java虚拟机会为其创建方法调用栈和程序计数器,等待调度运行

阻塞(BLOCKED) 当处于运行状态的线程失去所占用资源之后,便进入阻塞状态

等待(WAITING) 如果处于就绪状态的线程获得了CPU,开始执行run()方法的线程执行体÷

超时等待状态(TIMED_WAITING) 线程等待一个具体的时间,时间到后会被自动唤醒。

线程死亡(DEAD)