杰瑞科技汇

Java多线程插入数据库如何保证线程安全?

下面我将从 为什么用多线程核心挑战常见方案代码示例最佳实践 几个方面进行详细讲解。

Java多线程插入数据库如何保证线程安全?-图1
(图片来源网络,侵删)

为什么使用多线程插入数据库?

单线程插入数据,尤其是在数据量大的情况下,会非常慢,主要瓶颈在于 I/O 操作(网络延迟和磁盘写入),数据库的写入能力往往远超单条 SQL 的处理速度。

使用多线程可以:

  • 提高吞吐量:多个线程可以同时向数据库发送请求,利用数据库的并行处理能力,从而在单位时间内插入更多数据。
  • 减少总耗时:通过并行化,将总任务时间从串行的 N * 单次耗时 降低到接近 单次耗时(受限于数据库连接池大小和并发处理能力)。

核心挑战与风险

直接使用多线程插入,如果不加控制,会遇到几个严重问题:

  1. 数据库连接池耗尽

    Java多线程插入数据库如何保证线程安全?-图2
    (图片来源网络,侵删)
    • 问题:每个线程都需要一个数据库连接,如果线程数远大于连接池的最大连接数,大量线程会一直等待获取连接,导致程序卡死,甚至抛出 SQLException
    • 解决:合理配置连接池大小(如 HikariCP 的 maximum-pool-size),并确保线程数不超过这个限制。
  2. 事务管理混乱

    • 问题:如果每个线程执行自己的事务,那么某个线程的失败不会影响其他线程,但有时我们希望一批数据作为一个整体,要么全部成功,要么全部失败。
    • 解决:明确事务的边界,可以每个线程一个独立的小事务,也可以使用一个主线程来协调一个大的分布式事务(后者非常复杂,不推荐)。
  3. 数据库性能瓶颈

    • 问题:数据库本身能处理的并发请求数是有限的,当并发线程数超过数据库的承受能力时,数据库会变慢,甚至拒绝连接(too many connections),反而导致性能下降。
    • 解决:进行压力测试,找到数据库的最佳并发度,使用“生产者-消费者”模式可以有效控制并发提交的速度。
  4. 数据一致性问题

    • 问题:如果插入的数据有依赖关系(如外键),或者业务逻辑要求插入顺序,多线程会破坏这种顺序,导致数据不一致或插入失败。
    • 解决:评估业务逻辑,如果对顺序有要求,可能需要牺牲部分并行性,或者使用队列来保证顺序。

常见方案与代码示例

这里介绍两种最主流的方案:方案一(简单并行)方案二(生产者-消费者模式,推荐)

Java多线程插入数据库如何保证线程安全?-图3
(图片来源网络,侵删)

准备工作:引入依赖

HikariCP (高性能连接池) 和 MySQL 驱动为例。

Maven (pom.xml):

<dependencies>
    <!-- MySQL Driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>
    <!-- HikariCP Connection Pool -->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>5.0.1</version>
    </dependency>
</dependencies>

数据库配置 (db.properties):

db.url=jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC
db.username=root
db.password=root
db.pool.size=20  # 连接池大小,根据你的数据库性能调整

简单并行(每个线程一个事务)

这种方案最简单,适用于数据之间没有依赖关系,且允许部分插入失败的场景。

核心思想:创建一个线程池,每个任务(Runnable)负责插入一部分数据,并拥有自己的数据库连接和事务。

代码示例:

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimpleBatchInsert {
    // 1. 初始化连接池
    private static HikariDataSource dataSource;
    static {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("root");
        config.setMaximumPoolSize(20); // 设置连接池大小
        dataSource = new HikariDataSource(config);
    }
    // 模拟要插入的数据
    public static List<String> generateData(int count) {
        List<String> data = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            data.add("User-" + i);
        }
        return data;
    }
    // 插入任务
    static class InsertTask implements Runnable {
        private final List<String> dataList;
        public InsertTask(List<String> dataList) {
            this.dataList = dataList;
        }
        @Override
        public void run() {
            String sql = "INSERT INTO users (name) VALUES (?)";
            try (Connection conn = dataSource.getConnection();
                 PreparedStatement pstmt = conn.prepareStatement(sql)) {
                conn.setAutoCommit(false); // 开启事务
                for (String name : dataList) {
                    pstmt.setString(1, name);
                    pstmt.addBatch(); // 添加到批处理
                }
                pstmt.executeBatch(); // 执行批处理
                conn.commit(); // 提交事务
                System.out.println(Thread.currentThread().getName() + " 插入 " + dataList.size() + " 条数据成功。");
            } catch (SQLException e) {
                System.err.println(Thread.currentThread().getName() + " 插入失败: " + e.getMessage());
                // try-catch 中回滚事务,注意这里 conn 已经在 try-with-resources 中关闭了
                // 更健壮的做法是在 catch 块里手动回滚,但 try-with-resources 会自动关闭连接
                // 事务的回滚通常在连接关闭前由数据库完成,如果事务未提交。
            }
        }
    }
    public static void main(String[] args) {
        int totalRecords = 10000;
        int threadCount = 10;
        int batchSizePerThread = totalRecords / threadCount;
        List<String> allData = generateData(totalRecords);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        long startTime = System.currentTimeMillis();
        // 将数据分割,分配给不同线程
        for (int i = 0; i < threadCount; i++) {
            int fromIndex = i * batchSizePerThread;
            int toIndex = (i == threadCount - 1) ? allData.size() : fromIndex + batchSizePerThread;
            List<String> subList = allData.subList(fromIndex, toIndex);
            executor.execute(new InsertTask(subList));
        }
        // 关闭线程池
        executor.shutdown();
        try {
            // 等待所有任务完成
            executor.awaitTermination(1, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("所有插入任务完成,总耗时: " + (endTime - startTime) + " ms");
    }
}

生产者-消费者模式(推荐)

这种模式更优雅,能有效控制数据库的并发压力,是处理大批量数据插入的最佳实践

核心思想

  • 生产者:一个或多个线程,只负责生成数据并放入一个阻塞队列中。
  • 消费者:一个固定大小的线程池,从阻塞队列中取出数据,然后批量插入数据库。
  • 优势:消费者线程的数量是固定的,可以防止压垮数据库,生产者和消费者解耦,生产者可以快速生成数据,消费者以可控的速度消费。

代码示例:

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerConsumerBatchInsert {
    // 1. 初始化连接池
    private static HikariDataSource dataSource;
    static {
        HikariConfig config = new HaxxerConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("root");
        config.setMaximumPoolSize(20);
        dataSource = new HikariDataSource(config);
    }
    // 2. 创建阻塞队列作为缓冲区
    private static final BlockingQueue<List<String>> dataQueue = new LinkedBlockingQueue<>(100);
    private static final int TOTAL_RECORDS = 10000;
    private static final int BATCH_SIZE = 500; // 每次从队列取多少条数据插入一次
    private static final int CONSUMER_THREADS = 5; // 消费者线程数
    // 3. 生产者
    static class DataProducer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < TOTAL_RECORDS; i++) {
                    String data = "User-PC-" + i;
                    // put() 方法如果队列满了会阻塞
                    dataQueue.put(List.of(data));
                }
                System.out.println("生产者已生成所有数据。");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("生产者被中断。");
            }
        }
    }
    // 4. 消费者
    static class DataConsumer implements Runnable {
        @Override
        public void run() {
            String sql = "INSERT INTO users (name) VALUES (?)";
            List<String> batchData = new ArrayList<>(BATCH_SIZE);
            while (true) {
                try {
                    // drainTo 可以一次性从队列中取出最多 BATCH_SIZE 个元素
                    // 比循环调用 take() 效率更高
                    int drained = dataQueue.drainTo(batchData, BATCH_SIZE);
                    if (drained == 0 && dataQueue.isEmpty()) {
                        // 如果没取到数据,并且队列已空,说明生产者已完成
                        break;
                    }
                    if (drained > 0) {
                        try (Connection conn = dataSource.getConnection();
                             PreparedStatement pstmt = conn.prepareStatement(sql)) {
                            conn.setAutoCommit(false);
                            for (String name : batchData) {
                                pstmt.setString(1, name);
                                pstmt.addBatch();
                            }
                            pstmt.executeBatch();
                            conn.commit();
                            System.out.println(Thread.currentThread().getName() + " 成功插入 " + drained + " 条数据。");
                        } catch (SQLException e) {
                            System.err.println(Thread.currentThread().getName() + " 插入失败: " + e.getMessage());
                        } finally {
                            batchData.clear(); // 清空,准备下一次消费
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("消费者 " + Thread.currentThread().getName() + " 被中断。");
                    break;
                }
            }
            System.out.println("消费者 " + Thread.currentThread().getName() + " 退出。");
        }
    }
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        // 启动生产者
        executor.execute(new DataProducer());
        // 启动消费者
        for (int i = 0; i < CONSUMER_THREADS; i++) {
            executor.execute(new DataConsumer());
        }
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.HOURS);
            System.out.println("所有任务完成!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最佳实践与总结

  1. 使用连接池:永远不要手动创建和关闭连接,使用 HikariCP 等高性能连接池是标配。
  2. 选择合适的模式
    • 简单并行:适用于数据量不大、无依赖关系、快速实现。
    • 生产者-消费者强烈推荐用于大批量数据导入,它解耦了数据生成和持久化,并能有效控制对数据库的压力。
  3. 控制并发度:消费者线程的数量 (CONSUMER_THREADS) 是一个关键调优参数,它应该小于或等于数据库连接池的大小,通过压力测试找到最佳值。
  4. 使用批处理:无论是哪种方案,都要使用 PreparedStatement.addBatch()executeBatch(),这比单条插入性能高几个数量级。
  5. 管理事务:明确每个事务的范围,在生产者-消费者模式中,每个消费者线程处理一批数据就是一个事务,保证了这批数据的原子性。
  6. 考虑 JDBC 批处理大小BATCH_SIZE 也是一个调优参数,太小了,提交次数多;太大了,占用内存多,且单个事务失败时重试成本高,通常在几百到几千之间。
  7. 处理异常:务必捕获 SQLException,并在失败时记录日志,对于关键业务,可能需要实现失败重试机制。
  8. 终极武器:如果数据量达到百万、千万级别,多线程应用层插入可能仍然不是最优解,应优先考虑数据库原生工具,如:
    • MySQL: LOAD DATA INFILE 命令,速度极快。
    • PostgreSQL: COPY 命令。
    • Oracle: SQL*Loader 工具。

这些工具直接操作文件,绕过了 JDBC 的开销,是大数据量导入的首选,多线程方案通常在这些工具不适用或需要与现有业务逻辑集成时使用。

分享:
扫描分享到社交APP
上一篇
下一篇