- 为什么使用多线程进行数据库插入?
- 核心挑战与风险
- 主流实现方案(从简单到复杂)
- 最佳实践与优化建议
- 完整代码示例
为什么使用多线程进行数据库插入?
主要目的是提高性能,具体体现在:

- 绕过 I/O 等待:数据库插入操作通常是一个 I/O 密集型任务,当一个线程执行
INSERT语句时,它会等待数据库将数据写入磁盘并返回响应,在此期间,CPU 是空闲的,通过多线程,当一个线程等待 I/O 时,其他线程可以继续使用 CPU,从而显著提高吞吐量。 - 并行处理:如果你有大量的数据需要插入,多线程可以将这个大任务分解成多个小任务并行执行,缩短总执行时间。
核心挑战与风险
在决定使用多线程之前,必须清楚地认识到它带来的问题:
- 数据库连接池耗尽:每个线程都需要一个数据库连接,如果线程数过多,可能会耗尽连接池中的所有连接,导致后续线程无法获取连接而挂起。
- 数据库性能瓶颈:数据库本身(如 MySQL、PostgreSQL)有处理并发请求的能力上限,当并发请求数量过大时,数据库服务器可能会因为 CPU、内存或磁盘 I/O 达到瓶颈,反而导致性能下降,甚至拒绝服务。
- 事务管理复杂性:如果多个线程需要协同完成一个“事务”,事务的管理会变得非常复杂,一个线程失败,其他线程是否需要回滚?
- 数据一致性问题:如果插入逻辑不严谨,可能会出现重复数据、数据损坏等问题,多个线程同时插入相同主键的数据,会违反唯一约束。
- 资源竞争与死锁:多个线程同时修改同一张表的不同行,可能会因为数据库锁的机制(如间隙锁)导致死锁。
主流实现方案
根据业务场景和性能要求,可以选择不同的实现方案。
直接使用 ExecutorService + 原生 JDBC
这是最基础的方式,适合简单的批量插入场景,但需要非常小心地管理连接。
核心思想:

- 创建一个固定大小的线程池,防止无限创建线程。
- 创建一个与线程池大小相匹配的数据库连接池。
- 将每个或每批插入任务作为一个
Runnable提交到线程池中。
优点:
- 实现简单,依赖少。
缺点:
- 需要手动管理连接和事务,容易出错。
- 性能受限于线程池和连接池的配置。
使用 Spring @Async + 事务管理
如果你在使用 Spring 框架,这是最推荐、最优雅的方式。
核心思想:

- 在 Spring 配置中启用异步任务支持。
- 创建一个服务方法,并用
@Async注解标记它,表示该方法将在一个独立的线程中执行。 - 在该服务方法上使用
@Transactional注解来管理事务。
优点:
- 声明式事务:
@Transactional自动管理事务的开启、提交和回滚,非常方便。 - 简化代码:无需手动创建和管理线程池,Spring 会为你处理。
- 集成度高:与 Spring 生态完美集成。
缺点:
- 依赖 Spring 框架。
使用专门的批处理框架(如 Spring Batch)
对于超大规模的数据导入任务(从文件导入数百万条数据),Spring Batch 是更专业的选择。
核心思想:
- ItemReader:负责从数据源(如文件、数据库)读取数据。
- ItemProcessor:对读取到的数据进行处理或转换。
- ItemWriter:将处理后的数据批量写入目标数据库。
Chunk处理:框架会读取Chunk Size数量的数据,然后交给ItemProcessor处理,最后再由ItemWriter一次性写入数据库,这个过程在事务中完成。
优点:
- 高性能:通过
Chunk机制,实现了高效的批处理。 - 健壮性:内置了失败重试、跳过记录、作业状态管理等功能。
- 可扩展性:可以处理复杂的批处理流程。
缺点:
- 学习曲线较陡,概念较多。
最佳实践与优化建议
无论选择哪种方案,以下几点都至关重要:
-
使用批处理:这是数据库插入性能优化的第一原则,不要在循环中一条一条地执行
INSERT,应该使用 JDBC 的addBatch()和executeBatch(),或者 MyBatis/JPA 的批处理功能。- 坏例子:
for (Data data : dataList) { jdbcTemplate.update("INSERT INTO ...", params); } - 好例子:
jdbcTemplate.batchUpdate("INSERT INTO my_table (col1, col2) VALUES (?, ?)", dataList.stream() .map(data -> new Object[]{data.getCol1(), data.getCol2()}) .collect(Collectors.toList()));
- 坏例子:
-
合理配置线程池和连接池:
- 线程池大小:通常设置为
CPU核心数 * (1 + 平均等待时间 / 平均计算时间),对于 I/O 密集型任务,可以设置得大一些,但不宜过大,以免压垮数据库,建议先从CPU核心数 * 2开始测试。 - 连接池大小:连接池的最大连接数应该大于或等于线程池的核心线程数,可以设置为
(线程池核心线程数 * 2)或更多,以提供足够的并发连接。
- 线程池大小:通常设置为
-
调整数据库参数:
rewriteBatchedStatements=true(MySQL):这是 MySQL JDBC 驱动的一个关键性能参数,开启后,驱动会将多个INSERT语句合并成一个大的 SQL 语句发送给数据库,极大减少网络开销和数据库解析开销。batchSize:在 MyBatis 或 JPA 中,合理设置每次批处理的大小,太小则效率低,太大则可能占用过多内存或导致数据库事务超时。100到1000之间是不错的选择。
-
事务隔离级别:根据业务需求选择合适的事务隔离级别,如果允许读到“未提交”的数据(脏读),可以提高并发性能,但通常我们会使用
READ_COMMITTED或REPEATABLE_READ。 -
监控与调优:使用数据库的监控工具(如 MySQL 的
SHOW PROCESSLIST,SHOW STATUS)和应用的性能分析工具(如 Arthas, JProfiler)来监控线程状态、SQL 执行时间和数据库负载,不断进行调优。
完整代码示例(推荐方案:Spring @Async)
这是一个使用 Spring Boot 演示多线程插入的完整示例。
项目依赖 (pom.xml)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
数据库配置 (application.properties)
# DataSource Configuration spring.datasource.url=jdbc:mysql://localhost:3306/test_db?rewriteBatchedStatements=true&useSSL=false&serverTimezone=UTC spring.datasource.username=root spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver # JPA/Hibernate Configuration spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true
注意:rewriteBatchedStatements=true 是关键!
实体类
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import lombok.Data;
@Entity
@Data
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String email;
}
数据访问层
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}
服务层(核心)
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
/**
* 异步批量插入用户
* @param userList 用户列表
* @return CompletableFuture
*/
@Async
@Transactional // 确保整个方法在一个事务中执行
public CompletableFuture<Void> batchInsertUsersAsync(List<User> userList) {
System.out.println(Thread.currentThread().getName() + " 开始批量插入 " + userList.size() + " 个用户...");
// Spring Data JPA 的 saveAll 方法在内部会进行优化,对于大多数实现(如 Hibernate)它就是批处理。
userRepository.saveAll(userList);
System.out.println(Thread.currentThread().getName() + " 批量插入完成。");
return CompletableFuture.completedFuture(null);
}
}
配置异步任务
创建一个配置类来启用 Spring 的异步功能。
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 启用异步任务
public class AsyncConfig {
/**
* 自定义线程池
* @return Executor
*/
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(10); // 最大线程数
executor.setQueueCapacity(25); // 队列容量
executor.setThreadNamePrefix("User-Insert-Thread-");
executor.initialize();
return executor;
}
}
控制器层
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@RestController
public class UserController {
@Autowired
private UserService userService;
@PostMapping("/users/batch")
public String batchInsert(@RequestBody List<User> users) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
// 模拟一个大的用户列表
if (users == null || users.isEmpty()) {
users = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
User user = new User();
user.setName("User " + i);
user.setEmail("user" + i + "@example.com");
users.add(user);
}
}
// 提交异步任务
CompletableFuture<Void> future = userService.batchInsertUsersAsync(users);
// 可以在这里做其他事情...
System.out.println("主线程继续执行其他任务...");
// 等待异步任务完成
future.get();
long end = System.currentTimeMillis();
return "批量插入任务已提交并完成!总耗时: " + (end - start) + " ms";
}
}
主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
如何运行和验证:
-
启动 Spring Boot 应用。
-
使用 Postman 或
curl发送一个POST请求到http://localhost:8080/users/batch,可以发送一个空的 JSON[],它会使用代码中模拟的 1000 个用户。 -
观察控制台输出,你会看到类似下面的日志,证明任务确实是在不同的线程中执行的:
User-Insert-Thread-1 开始批量插入 1000 个用户... 主线程继续执行其他任务... User-Insert-Thread-1 批量插入完成。 批量插入任务已提交并完成!总耗时: 456 ms
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| ExecutorService + JDBC | 简单、无框架依赖的小型任务 | 实现直接,依赖少 | 代码复杂,易出错,需手动管理一切 |
Spring @Async |
(推荐) 大多数基于 Spring 的应用 | 代码简洁,声明式事务,易于集成 | 依赖 Spring 框架 |
| Spring Batch | 超大规模、复杂的批处理作业 | 高性能、高健壮性、功能强大 | 学习曲线陡,重量级 |
对于绝大多数 Java 应用,使用 Spring 的 @Async 注解结合批处理是最佳实践,它在开发效率、代码可维护性和性能之间取得了很好的平衡,请务必记住,“批处理” 是提升数据库插入性能的核心,而“多线程”是利用多核 CPU 并行执行这些批处理的手段。
