杰瑞科技汇

Java如何连接MongoDB集群?

目录

  1. 前提条件:理解 MongoDB 集群
  2. 第一步:Java 项目依赖
  3. 第二步:连接字符串 的构建
  4. 第三步:Java 代码实现
    • 1 创建 MongoClient
    • 2 执行基本操作
    • 3 处理集群特有概念(读写分离)
  5. 第四步:最佳实践与高级配置
    • 1 连接池配置
    • 2 异常处理
    • 3 SSL/TLS 安全连接
  6. 完整示例代码
  7. 故障排查

前提条件:理解 MongoDB 集群

在开始编码前,你必须对 MongoDB 集群(副本集 或分片集群)有基本了解,Java 驱动程序通过 连接字符串 来连接集群。

Java如何连接MongoDB集群?-图1
(图片来源网络,侵删)
  • 副本集: 提供高可用性,至少有 3 个节点(1个Primary, 2个Secondary),客户端会自动连接到 Primary,并在 Primary 宕机时自动切换到新的 Primary。
  • 分片集群: 提供高扩展性,由 Config Servers (存储集群元数据)、Shards (数据分片存储节点) 和 Routers (mongos, 客户端入口点) 组成,客户端通常连接到 mongos 节点。

关键点:Java 驱动程序足够智能,你只需要提供一个包含所有节点(或至少一个 mongos 节点)地址的列表,它会自动发现集群中的其他节点并进行管理。


第一步:Java 项目依赖

使用 Maven 或 Gradle 来管理 mongodb-driver 依赖。

Maven (pom.xml)

<dependencies>
    <!-- MongoDB Java 驱动核心库 -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-sync</artifactId>
        <version>4.11.1</version> <!-- 请使用最新版本 -->
    </dependency>
    <!-- 如果你需要使用 Reactive Streams 风格的驱动 -->
    <!-- <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-reactivestreams</artifactId>
        <version>4.11.1</version>
    </dependency> -->
    <!-- 如果你需要使用异步风格的驱动 -->
    <!-- <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>4.11.1</version>
    </dependency> -->
</dependencies>

Gradle (build.gradle)

dependencies {
    // MongoDB Java 驱动核心库
    implementation 'org.mongodb:mongodb-driver-sync:4.11.1' // 请使用最新版本
    // 如果你需要使用 Reactive Streams 风格的驱动
    // implementation 'org.mongodb:mongodb-driver-reactivestreams:4.11.1'
    // 如果你需要使用异步风格的驱动
    // implementation 'org.mongodb:mongodb-driver-async:4.11.1'
}

第二步:连接字符串 的构建

这是连接集群的核心,连接字符串定义了如何连接到数据库。

副本集连接字符串示例

假设你的副本集名为 myReplicaSet,节点地址为 node1:27017, node2:27017, node3:27017

String connectionString = "mongodb://node1:27017,node2:27017,node3:27017/" +
                          "testDB?replicaSet=myReplicaSet&" +
                          "readPreference=secondaryPreferred&" +
                          "maxIdleTimeMS=120000&" +
                          "connectTimeoutMS=10000&" +
                          "socketTimeoutMS=30000";

分片集群 连接字符串示例

假设你有一个 mongos 路由器在 mongos1:27017

String connectionString = "mongodb://mongos1:27017/" +
                          "testDB?readPreference=primary&" +
                          "maxIdleTimeMS=120000&" +
                          "connectTimeoutMS=10000&" +
                          "socketTimeoutMS=30000";

连接字符串参数解析

  • mongodb://node1:27017,node2:27017,...: 节点列表,驱动会尝试连接列表中的任意一个,并自动发现整个集群。
  • testDB: 要连接的数据库名称(可选,也可以在代码中指定)。
  • replicaSet=myReplicaSet: 副本集必须指定,告诉驱动这是一个副本集集群。
  • readPreference=...: 读写分离的关键。
    • primary: (默认) 只从 Primary 读取,保证强一致性。
    • secondary: 只从 Secondary 读取,适用于读多写少且对数据一致性要求不高的场景。
    • primaryPreferred: 优先从 Primary 读,Primary 不可用则从 Secondary 读。
    • secondaryPreferred: 优先从 Secondary 读,如果所有 Secondary 不可用则从 Primary 读。这是最常用的读偏好之一
    • nearest: 从延迟最低的节点(Primary 或 Secondary)读取。
  • maxIdleTimeMS=120000: 连接在池中的最大空闲时间,超时后将被回收。
  • connectTimeoutMS=10000: 建立连接的超时时间(毫秒)。
  • socketTimeoutMS=30000: 发送/接收数据包的超时时间(毫秒)。

第三步:Java 代码实现

我们将使用同步驱动 mongodb-driver-sync 进行演示。

1 创建 MongoClient

MongoClient 是所有操作的入口,它内部管理了连接池和集群状态。MongoClient 应该是单例的,在你的应用生命周期内只创建一次。

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
public class MongoClusterConnector {
    // 1. 定义连接字符串
    private static final String connectionString = "mongodb://node1:27017,node2:27017,node3:27017/testDB?replicaSet=myReplicaSet";
    public static void main(String[] args) {
        // 2. 创建 MongoClient (单例模式)
        try (MongoClient mongoClient = MongoClients.create(connectionString)) {
            System.out.println("成功连接到 MongoDB 集群!");
            // 3. 获取数据库
            MongoDatabase database = mongoClient.getDatabase("testDB");
            // 4. 执行操作...
            performOperations(database);
        } catch (Exception e) {
            System.err.println("连接 MongoDB 集群失败: " + e.getMessage());
            e.printStackTrace();
        }
    }
    // 其他操作方法...
}

2 执行基本操作

获取 MongoCollection 后,操作与单机 MongoDB 完全一样。

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
// 在 performOperations 方法中
public static void performOperations(MongoDatabase database) {
    // 获取集合 (Collection)
    MongoCollection<Document> collection = database.getCollection("users");
    // 插入文档
    Document doc1 = new Document("name", "Alice")
                     .append("age", 30)
                     .append("city", "New York");
    collection.insertOne(doc1);
    System.out.println("文档插入成功: " + doc1.toJson());
    // 查询文档
    System.out.println("--- 查询所有用户 ---");
    collection.find().forEach(document -> System.out.println(document.toJson()));
    // 更新文档
    collection.updateOne(
        new Document("name", "Alice"),
        new Document("$set", new Document("age", 31))
    );
    System.out.println("--- 更新后的 Alice ---");
    collection.find(new Document("name", "Alice")).forEach(document -> System.out.println(document.toJson()));
    // 删除文档
    collection.deleteOne(new Document("name", "Alice"));
    System.out.println("--- 删除后的所有用户 ---");
    collection.find().forEach(document -> System.out.println(document.toJson()));
}

3 处理集群特有概念(读写分离)

通过在连接字符串中设置 readPreference,驱动会自动为你选择正确的读取节点。

// 在连接字符串中设置
String connectionString = "mongodb://.../testDB?replicaSet=myReplicaSet&readPreference=secondaryPreferred";
// 当你执行 find() 操作时,驱动会自动将请求发送到一个 Secondary 节点
MongoCollection<Document> collection = database.getCollection("logs");
collection.find().forEach(doc -> System.out.println(doc.toJson())); // 这个查询会从 Secondary 读
// 写操作 (insert, update, delete) 始终发送到 Primary 节点
collection.insertOne(new Document("log", "New user logged in"));

第四步:最佳实践与高级配置

1 连接池配置

在高并发应用中,必须配置连接池。MongoClientSettings 是配置的核心。

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.util.concurrent.TimeUnit;
public class MongoConnectorWithPool {
    public static MongoClient createClient() {
        String connectionString = "mongodb://...";
        // 创建连接字符串对象
        ConnectionString connString = new ConnectionString(connectionString);
        // 构建客户端设置
        MongoClientSettings settings = MongoClientSettings.builder()
                .applyConnectionString(connString)
                .applyToClusterSettings(builder ->
                        builder.serverSelectionTimeout(5000, TimeUnit.MILLISECONDS)) // 选择服务器的超时时间
                .applyToSocketSettings(builder ->
                        builder.connectTimeout(10, TimeUnit.SECONDS)) // 连接超时
                .applyToConnectionPoolSettings(builder ->
                        // 连接池配置
                        builder.maxSize(100) // 最大连接数
                               .minSize(10)  // 最小连接数
                               .maxWaitTime(2, TimeUnit.SECONDS) // 获取连接的最大等待时间
                               .maxIdleTime(10, TimeUnit.MINUTES) // 连接在池中的最大空闲时间
                               .maxLifeTime(30, TimeUnit.MINUTES)) // 连接的最大生命周期
                .build();
        // 使用设置创建客户端
        return MongoClients.create(settings);
    }
}

2 异常处理

网络不稳定或节点切换时可能会抛出异常。

  • MongoTimeoutException: 操作超时。
  • MongoCommandException: 命令执行失败(如写冲突、权限问题)。
  • MongoSocketException: 网络问题。
  • MongoException: 所有 MongoDB 相关错误的基类。

策略

  • 重试机制: 对于瞬时网络错误或主节点切换期间的 MongoTimeoutException,可以实现自动重试逻辑。
  • 日志记录: 记录所有异常,以便排查问题。
  • 优雅降级: 对于非核心业务,可以捕获异常并记录,而不是让整个应用崩溃。

3 SSL/TLS 安全连接

在生产环境中,强烈建议启用 SSL/TLS 加密连接。

String connectionString = "mongodb://node1:27017,node2:27017/testDB?replicaSet=myReplicaSet&ssl=true";
// 或者使用 MongoClientSettings 进行更精细的配置
MongoClientSettings settings = MongoClientSettings.builder()
        .applyConnectionString(new ConnectionString(connectionString))
        .applyToSslSettings(builder -> builder.enabled(true)) // 启用 SSL
        // .applyToSslSettings(builder -> builder.invalidHostNameAllowed(true)) // 如果使用自签名证书且主机名不匹配
        .build();

完整示例代码

这是一个结合了上述所有要点的完整、可运行的示例。

import com.mongodb.ConnectionString;
import com.MongoClientSettings;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import org.bson.Document;
import java.util.concurrent.TimeUnit;
public class FullMongoClusterExample {
    // 建议:将连接字符串放在配置文件中
    private static final String CONNECTION_STRING = "mongodb://node1:27017,node2:27017,node3:27017/testClusterDB?replicaSet=myReplicaSet&readPreference=secondaryPreferred&ssl=false";
    public static void main(String[] args) {
        // 使用 try-with-resources 确保 MongoClient 被正确关闭
        try (MongoClient mongoClient = createConfiguredClient()) {
            System.out.println("成功连接到 MongoDB 集群!");
            // 获取数据库
            MongoDatabase database = mongoClient.getDatabase("testClusterDB");
            // 执行操作
            performCrudOperations(database);
        } catch (Exception e) {
            System.err.println("发生错误: " + e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 创建一个配置了连接池的 MongoClient
     */
    private static MongoClient createConfiguredClient() {
        ConnectionString connString = new ConnectionString(CONNECTION_STRING);
        MongoClientSettings settings = MongoClientSettings.builder()
                .applyConnectionString(connString)
                .applyToConnectionPoolSettings(builder ->
                        builder.maxSize(50)
                               .minSize(10)
                               .maxWaitTime(30, TimeUnit.SECONDS)
                               .maxIdleTime(60, TimeUnit.SECONDS))
                .applyToSocketSettings(builder ->
                        builder.connectTimeout(10, TimeUnit.SECONDS))
                .build();
        return MongoClients.create(settings);
    }
    /**
     * 执行基本的 CRUD 操作
     */
    private static void performCrudOperations(MongoDatabase database) {
        MongoCollection<Document> collection = database.getCollection("products");
        // 清空集合,方便测试
        collection.deleteMany(new Document());
        // 1. 创建 (Insert)
        Document product1 = new Document("name", "Laptop")
                .append("price", 1200.50)
                .append("category", "Electronics");
        collection.insertOne(product1);
        System.out.println("插入产品: " + product1.toJson());
        Document product2 = new Document("name", "Coffee Mug")
                .append("price", 15.00)
                .append("category", "Home Goods");
        collection.insertOne(product2);
        System.out.println("插入产品: " + product2.toJson());
        // 2. 读取 (Read)
        System.out.println("\n--- 所有产品 ---");
        collection.find().forEach(doc -> System.out.println(doc.toJson()));
        System.out.println("\n--- 查询电子产品 ---");
        collection.find(new Document("category", "Electronics"))
                .forEach(doc -> System.out.println(doc.toJson()));
        // 3. 更新 (Update)
        collection.updateOne(
                new Document("name", "Laptop"),
                new Document("$set", new Document("price", 1150.00))
        );
        System.out.println("\n--- 更新后的笔记本电脑价格 ---");
        collection.find(new Document("name", "Laptop")).forEach(doc -> System.out.println(doc.toJson()));
        // 4. 删除 (Delete)
        collection.deleteOne(new Document("name", "Coffee Mug"));
        System.out.println("\n--- 删除马克杯后的所有产品 ---");
        collection.find().forEach(doc -> System.out.println(doc.toJson()));
    }
}

故障排查

  1. 连接超时 (MongoTimeoutException):

    • 原因: 防火墙阻止了端口、网络延迟过高、节点未启动。
    • 解决: 检查网络连通性 (telnet <node_ip> 27017),确认所有节点都在运行,检查防火墙规则,适当增加 connectTimeoutMSsocketTimeoutMS
  2. 无法连接到副本集 (MongoCommandException: No replica set primary found):

    • 原因: replicaSet 参数错误,或者副本集中没有一个节点能成为 Primary (仲裁节点数量不对,或所有节点都处于 RECOVERING 状态)。
    • 解决: 检查 replicaSet 名称是否正确,登录到任意一个 MongoDB 节点,使用 rs.status() 命令查看副本集状态。
  3. 认证失败:

    • 原因: 用户名、密码或认证数据库错误。
    • 解决: 确保连接字符串中的认证信息正确,格式为 mongodb://username:password@host/db?...
  4. 驱动版本与 MongoDB 版本不兼容:

    • 原因: 你使用的 Java 驱动版本可能太旧,不支持你 MongoDB 版本的新特性,或者存在已修复的 Bug。
    • 解决: 检查 MongoDB 官方文档 中推荐的驱动版本。

希望这份详细的指南能帮助你顺利地在 Java 中使用 MongoDB 集群!

分享:
扫描分享到社交APP
上一篇
下一篇