目录
- 前提条件:理解 MongoDB 集群
- 第一步:Java 项目依赖
- 第二步:连接字符串 的构建
- 第三步:Java 代码实现
- 1 创建 MongoClient
- 2 执行基本操作
- 3 处理集群特有概念(读写分离)
- 第四步:最佳实践与高级配置
- 1 连接池配置
- 2 异常处理
- 3 SSL/TLS 安全连接
- 完整示例代码
- 故障排查
前提条件:理解 MongoDB 集群
在开始编码前,你必须对 MongoDB 集群(副本集 或分片集群)有基本了解,Java 驱动程序通过 连接字符串 来连接集群。

- 副本集: 提供高可用性,至少有 3 个节点(1个Primary, 2个Secondary),客户端会自动连接到 Primary,并在 Primary 宕机时自动切换到新的 Primary。
- 分片集群: 提供高扩展性,由 Config Servers (存储集群元数据)、Shards (数据分片存储节点) 和 Routers (
mongos, 客户端入口点) 组成,客户端通常连接到mongos节点。
关键点:Java 驱动程序足够智能,你只需要提供一个包含所有节点(或至少一个 mongos 节点)地址的列表,它会自动发现集群中的其他节点并进行管理。
第一步:Java 项目依赖
使用 Maven 或 Gradle 来管理 mongodb-driver 依赖。
Maven (pom.xml)
<dependencies>
<!-- MongoDB Java 驱动核心库 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.11.1</version> <!-- 请使用最新版本 -->
</dependency>
<!-- 如果你需要使用 Reactive Streams 风格的驱动 -->
<!-- <dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>4.11.1</version>
</dependency> -->
<!-- 如果你需要使用异步风格的驱动 -->
<!-- <dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-async</artifactId>
<version>4.11.1</version>
</dependency> -->
</dependencies>
Gradle (build.gradle)
dependencies {
// MongoDB Java 驱动核心库
implementation 'org.mongodb:mongodb-driver-sync:4.11.1' // 请使用最新版本
// 如果你需要使用 Reactive Streams 风格的驱动
// implementation 'org.mongodb:mongodb-driver-reactivestreams:4.11.1'
// 如果你需要使用异步风格的驱动
// implementation 'org.mongodb:mongodb-driver-async:4.11.1'
}
第二步:连接字符串 的构建
这是连接集群的核心,连接字符串定义了如何连接到数据库。
副本集连接字符串示例
假设你的副本集名为 myReplicaSet,节点地址为 node1:27017, node2:27017, node3:27017。
String connectionString = "mongodb://node1:27017,node2:27017,node3:27017/" +
"testDB?replicaSet=myReplicaSet&" +
"readPreference=secondaryPreferred&" +
"maxIdleTimeMS=120000&" +
"connectTimeoutMS=10000&" +
"socketTimeoutMS=30000";
分片集群 连接字符串示例
假设你有一个 mongos 路由器在 mongos1:27017。
String connectionString = "mongodb://mongos1:27017/" +
"testDB?readPreference=primary&" +
"maxIdleTimeMS=120000&" +
"connectTimeoutMS=10000&" +
"socketTimeoutMS=30000";
连接字符串参数解析
mongodb://node1:27017,node2:27017,...: 节点列表,驱动会尝试连接列表中的任意一个,并自动发现整个集群。testDB: 要连接的数据库名称(可选,也可以在代码中指定)。replicaSet=myReplicaSet: 副本集必须指定,告诉驱动这是一个副本集集群。readPreference=...: 读写分离的关键。primary: (默认) 只从 Primary 读取,保证强一致性。secondary: 只从 Secondary 读取,适用于读多写少且对数据一致性要求不高的场景。primaryPreferred: 优先从 Primary 读,Primary 不可用则从 Secondary 读。secondaryPreferred: 优先从 Secondary 读,如果所有 Secondary 不可用则从 Primary 读。这是最常用的读偏好之一。nearest: 从延迟最低的节点(Primary 或 Secondary)读取。
maxIdleTimeMS=120000: 连接在池中的最大空闲时间,超时后将被回收。connectTimeoutMS=10000: 建立连接的超时时间(毫秒)。socketTimeoutMS=30000: 发送/接收数据包的超时时间(毫秒)。
第三步:Java 代码实现
我们将使用同步驱动 mongodb-driver-sync 进行演示。
1 创建 MongoClient
MongoClient 是所有操作的入口,它内部管理了连接池和集群状态。MongoClient 应该是单例的,在你的应用生命周期内只创建一次。
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
public class MongoClusterConnector {
// 1. 定义连接字符串
private static final String connectionString = "mongodb://node1:27017,node2:27017,node3:27017/testDB?replicaSet=myReplicaSet";
public static void main(String[] args) {
// 2. 创建 MongoClient (单例模式)
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
System.out.println("成功连接到 MongoDB 集群!");
// 3. 获取数据库
MongoDatabase database = mongoClient.getDatabase("testDB");
// 4. 执行操作...
performOperations(database);
} catch (Exception e) {
System.err.println("连接 MongoDB 集群失败: " + e.getMessage());
e.printStackTrace();
}
}
// 其他操作方法...
}
2 执行基本操作
获取 MongoCollection 后,操作与单机 MongoDB 完全一样。
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
// 在 performOperations 方法中
public static void performOperations(MongoDatabase database) {
// 获取集合 (Collection)
MongoCollection<Document> collection = database.getCollection("users");
// 插入文档
Document doc1 = new Document("name", "Alice")
.append("age", 30)
.append("city", "New York");
collection.insertOne(doc1);
System.out.println("文档插入成功: " + doc1.toJson());
// 查询文档
System.out.println("--- 查询所有用户 ---");
collection.find().forEach(document -> System.out.println(document.toJson()));
// 更新文档
collection.updateOne(
new Document("name", "Alice"),
new Document("$set", new Document("age", 31))
);
System.out.println("--- 更新后的 Alice ---");
collection.find(new Document("name", "Alice")).forEach(document -> System.out.println(document.toJson()));
// 删除文档
collection.deleteOne(new Document("name", "Alice"));
System.out.println("--- 删除后的所有用户 ---");
collection.find().forEach(document -> System.out.println(document.toJson()));
}
3 处理集群特有概念(读写分离)
通过在连接字符串中设置 readPreference,驱动会自动为你选择正确的读取节点。
// 在连接字符串中设置
String connectionString = "mongodb://.../testDB?replicaSet=myReplicaSet&readPreference=secondaryPreferred";
// 当你执行 find() 操作时,驱动会自动将请求发送到一个 Secondary 节点
MongoCollection<Document> collection = database.getCollection("logs");
collection.find().forEach(doc -> System.out.println(doc.toJson())); // 这个查询会从 Secondary 读
// 写操作 (insert, update, delete) 始终发送到 Primary 节点
collection.insertOne(new Document("log", "New user logged in"));
第四步:最佳实践与高级配置
1 连接池配置
在高并发应用中,必须配置连接池。MongoClientSettings 是配置的核心。
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.util.concurrent.TimeUnit;
public class MongoConnectorWithPool {
public static MongoClient createClient() {
String connectionString = "mongodb://...";
// 创建连接字符串对象
ConnectionString connString = new ConnectionString(connectionString);
// 构建客户端设置
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(connString)
.applyToClusterSettings(builder ->
builder.serverSelectionTimeout(5000, TimeUnit.MILLISECONDS)) // 选择服务器的超时时间
.applyToSocketSettings(builder ->
builder.connectTimeout(10, TimeUnit.SECONDS)) // 连接超时
.applyToConnectionPoolSettings(builder ->
// 连接池配置
builder.maxSize(100) // 最大连接数
.minSize(10) // 最小连接数
.maxWaitTime(2, TimeUnit.SECONDS) // 获取连接的最大等待时间
.maxIdleTime(10, TimeUnit.MINUTES) // 连接在池中的最大空闲时间
.maxLifeTime(30, TimeUnit.MINUTES)) // 连接的最大生命周期
.build();
// 使用设置创建客户端
return MongoClients.create(settings);
}
}
2 异常处理
网络不稳定或节点切换时可能会抛出异常。
MongoTimeoutException: 操作超时。MongoCommandException: 命令执行失败(如写冲突、权限问题)。MongoSocketException: 网络问题。MongoException: 所有 MongoDB 相关错误的基类。
策略:
- 重试机制: 对于瞬时网络错误或主节点切换期间的
MongoTimeoutException,可以实现自动重试逻辑。 - 日志记录: 记录所有异常,以便排查问题。
- 优雅降级: 对于非核心业务,可以捕获异常并记录,而不是让整个应用崩溃。
3 SSL/TLS 安全连接
在生产环境中,强烈建议启用 SSL/TLS 加密连接。
String connectionString = "mongodb://node1:27017,node2:27017/testDB?replicaSet=myReplicaSet&ssl=true";
// 或者使用 MongoClientSettings 进行更精细的配置
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.applyToSslSettings(builder -> builder.enabled(true)) // 启用 SSL
// .applyToSslSettings(builder -> builder.invalidHostNameAllowed(true)) // 如果使用自签名证书且主机名不匹配
.build();
完整示例代码
这是一个结合了上述所有要点的完整、可运行的示例。
import com.mongodb.ConnectionString;
import com.MongoClientSettings;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import org.bson.Document;
import java.util.concurrent.TimeUnit;
public class FullMongoClusterExample {
// 建议:将连接字符串放在配置文件中
private static final String CONNECTION_STRING = "mongodb://node1:27017,node2:27017,node3:27017/testClusterDB?replicaSet=myReplicaSet&readPreference=secondaryPreferred&ssl=false";
public static void main(String[] args) {
// 使用 try-with-resources 确保 MongoClient 被正确关闭
try (MongoClient mongoClient = createConfiguredClient()) {
System.out.println("成功连接到 MongoDB 集群!");
// 获取数据库
MongoDatabase database = mongoClient.getDatabase("testClusterDB");
// 执行操作
performCrudOperations(database);
} catch (Exception e) {
System.err.println("发生错误: " + e.getMessage());
e.printStackTrace();
}
}
/**
* 创建一个配置了连接池的 MongoClient
*/
private static MongoClient createConfiguredClient() {
ConnectionString connString = new ConnectionString(CONNECTION_STRING);
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(connString)
.applyToConnectionPoolSettings(builder ->
builder.maxSize(50)
.minSize(10)
.maxWaitTime(30, TimeUnit.SECONDS)
.maxIdleTime(60, TimeUnit.SECONDS))
.applyToSocketSettings(builder ->
builder.connectTimeout(10, TimeUnit.SECONDS))
.build();
return MongoClients.create(settings);
}
/**
* 执行基本的 CRUD 操作
*/
private static void performCrudOperations(MongoDatabase database) {
MongoCollection<Document> collection = database.getCollection("products");
// 清空集合,方便测试
collection.deleteMany(new Document());
// 1. 创建 (Insert)
Document product1 = new Document("name", "Laptop")
.append("price", 1200.50)
.append("category", "Electronics");
collection.insertOne(product1);
System.out.println("插入产品: " + product1.toJson());
Document product2 = new Document("name", "Coffee Mug")
.append("price", 15.00)
.append("category", "Home Goods");
collection.insertOne(product2);
System.out.println("插入产品: " + product2.toJson());
// 2. 读取 (Read)
System.out.println("\n--- 所有产品 ---");
collection.find().forEach(doc -> System.out.println(doc.toJson()));
System.out.println("\n--- 查询电子产品 ---");
collection.find(new Document("category", "Electronics"))
.forEach(doc -> System.out.println(doc.toJson()));
// 3. 更新 (Update)
collection.updateOne(
new Document("name", "Laptop"),
new Document("$set", new Document("price", 1150.00))
);
System.out.println("\n--- 更新后的笔记本电脑价格 ---");
collection.find(new Document("name", "Laptop")).forEach(doc -> System.out.println(doc.toJson()));
// 4. 删除 (Delete)
collection.deleteOne(new Document("name", "Coffee Mug"));
System.out.println("\n--- 删除马克杯后的所有产品 ---");
collection.find().forEach(doc -> System.out.println(doc.toJson()));
}
}
故障排查
-
连接超时 (
MongoTimeoutException):- 原因: 防火墙阻止了端口、网络延迟过高、节点未启动。
- 解决: 检查网络连通性 (
telnet <node_ip> 27017),确认所有节点都在运行,检查防火墙规则,适当增加connectTimeoutMS和socketTimeoutMS。
-
无法连接到副本集 (
MongoCommandException: No replica set primary found):- 原因:
replicaSet参数错误,或者副本集中没有一个节点能成为 Primary (仲裁节点数量不对,或所有节点都处于RECOVERING状态)。 - 解决: 检查
replicaSet名称是否正确,登录到任意一个 MongoDB 节点,使用rs.status()命令查看副本集状态。
- 原因:
-
认证失败:
- 原因: 用户名、密码或认证数据库错误。
- 解决: 确保连接字符串中的认证信息正确,格式为
mongodb://username:password@host/db?...。
-
驱动版本与 MongoDB 版本不兼容:
- 原因: 你使用的 Java 驱动版本可能太旧,不支持你 MongoDB 版本的新特性,或者存在已修复的 Bug。
- 解决: 检查 MongoDB 官方文档 中推荐的驱动版本。
希望这份详细的指南能帮助你顺利地在 Java 中使用 MongoDB 集群!
