MongoDB 的聚合管道是一种强大的数据处理工具,它允许你将多个阶段(stage)的数据处理操作链接在一起,像工厂的流水线一样,对文档进行一系列的转换和组合,最终得到你想要的结果。

核心概念:聚合管道
聚合管道由一系列的阶段组成,每个阶段都会接收上一阶段的输出,并将其作为自己的输入,你可以将管道想象成一个数据处理的流水线。
- 文档:每个阶段处理的是文档集合。
- 阶段:每个阶段对文档执行一个操作,如过滤、分组、排序、计算等。
- 操作符:在阶段内部,可以使用操作符来定义具体的逻辑,如
$match(匹配)、$group(分组)、$sum(求和)等。
Java 驱动中的核心类
在 MongoDB Java 驱动中,聚合操作主要通过以下几个类来完成:
Aggregates: 这是一个工具类,用于创建各种聚合阶段。Aggregates.match(),Aggregates.group()等,使用它可以使代码更清晰、更易于阅读。Bson: 这是 MongoDB Java 驱动中表示 BSON (Binary JSON) 的核心接口,一个聚合阶段(如$match)就是一个Bson对象,你可以使用Aggregates工具类来创建它,也可以手动构建。AggregateIterable<T>: 这是执行聚合操作后返回的结果迭代器,你可以对它进行遍历,获取最终的文档。<T>指定了你希望结果文档映射到的 Java 类型(Document.class或你自定义的 POJO 类)。
准备工作:添加依赖
确保你的项目中包含了 MongoDB Java 驱动的依赖,如果你使用 Maven,在 pom.xml 中添加:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.11.1</version> <!-- 请使用最新稳定版本 -->
</dependency>
聚合操作步骤
- 连接到 MongoDB:创建
MongoClient、MongoDatabase和MongoCollection对象。 - 构建聚合管道:创建一个
List<Bson>,其中每个元素代表一个聚合阶段。 - 执行聚合:在
MongoCollection对象上调用aggregate()方法,并将你的管道列表作为参数传入。 - 处理结果:遍历返回的
AggregateIterable,获取处理后的文档。
完整示例
假设我们有一个 orders 集合,存储了订单信息,数据结构如下:

// orders 集合中的文档示例
{
"_id": 1,
"orderDate": ISODate("2025-10-01T08:00:00Z"),
"customerId": "c1001",
"items": [
{ "productId": "p1001", "quantity": 2, "price": 50.00 },
{ "productId": "p1002", "quantity": 1, "price": 120.00 }
],
"status": "completed"
}
{
"_id": 2,
"orderDate": ISODate("2025-10-02T09:30:00Z"),
"customerId": "c1002",
"items": [
{ "productId": "p1001", "quantity": 1, "price": 50.00 }
],
"status": "completed"
}
{
"_id": 3,
"orderDate": ISODate("2025-10-03T10:00:00Z"),
"customerId": "c1001",
"items": [
{ "productId": "p1003", "quantity": 3, "price": 25.00 }
],
"status": "shipped"
}
示例 1:计算每个客户的总消费金额
需求:找出每个客户完成的订单总金额。
SQL 类比: SELECT customerId, SUM(totalAmount) FROM orders WHERE status = 'completed' GROUP BY customerId;
Java 代码:
import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Arrays;
public class MongoAggregationExample {
public static void main(String[] args) {
// 1. 连接到 MongoDB
String uri = "mongodb://localhost:27017";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("testDB");
MongoCollection<Document> collection = database.getCollection("orders");
// 2. 构建聚合管道
// 阶段1: $match - 只筛选状态为 "completed" 的订单
Bson matchStage = Aggregates.match(Filters.eq("status", "completed"));
// 阶段2: $project - 重塑文档,计算订单总金额
// $map 遍历 items 数组,$multiply 计算每个商品的小计
// $sum 将所有小计相加,得到订单总金额
Bson projectStage = Aggregates.project(
new Document("customerId", 1)
.append("totalAmount", new Document("$sum",
new Document("$map", new Document()
.append("input", "$items")
.append("in", new Document("$multiply", Arrays.asList("$$this.quantity", "$$this.price")))
)
))
);
// 阶段3: $group - 按 customerId 分组,并累加总金额
Bson groupStage = Aggregates.group(
"$customerId",
new Document("totalSpent", new Document("$sum", "$totalAmount"))
);
// 将所有阶段添加到管道列表中
List<Bson> pipeline = Arrays.asList(matchStage, projectStage, groupStage);
// 3. 执行聚合
System.out.println("聚合查询结果:");
AggregateIterable<Document> result = collection.aggregate(pipeline);
// 4. 处理结果
for (Document doc : result) {
System.out.println(doc.toJson());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出结果:

聚合查询结果:
{
"_id": "c1001",
"totalSpent": 220.0
}
{
"_id": "c1002",
"totalSpent": 50.0
}
示例 2:查找每个客户购买次数最多的商品
需求:对于每个客户,找出他们购买次数最多的那个商品的 ID。
SQL 类比: 这个查询比较复杂,用 SQL 实现起来很繁琐,但用 MongoDB 聚合管道则非常直观。
Java 代码:
import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Sorts;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Arrays;
public class MongoAggregationComplexExample {
public static void main(String[] args) {
String uri = "mongodb://localhost:27017";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("testDB");
MongoCollection<Document> collection = database.getCollection("orders");
// 阶段1: $unwind - 解构 items 数组,将每个商品变成一个独立的文档
Bson unwindStage = Aggregates.unwind("$items");
// 阶段2: $group - 按 customerId 和 productId 分组,计算每个商品的购买次数
Bson groupStage1 = Aggregates.group(
new Document("customerId", "$customerId")
.append("productId", "$items.productId"),
Accumulators.sum("purchaseCount", "$items.quantity")
);
// 阶段3: $group - 再次分组,这次只按 customerId,并找出购买次数最多的商品
// $max 找出最大的 purchaseCount
// $first 获取该最大值对应的 productId
Bson groupStage2 = Aggregates.group(
"$customerId",
Accumulators.max("maxPurchaseCount", "$purchaseCount"),
Accumulators.first("mostFrequentProductId", "$productId")
);
// 阶段4: $sort - 按购买次数降序排列
Bson sortStage = Aggregates.sort(Sorts.descending("maxPurchaseCount"));
List<Bson> pipeline = Arrays.asList(unwindStage, groupStage1, groupStage2, sortStage);
System.out.println("复杂聚合查询结果:");
AggregateIterable<Document> result = collection.aggregate(pipeline);
for (Document doc : result) {
System.out.println(doc.toJson());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出结果:
复杂聚合查询结果:
{
"_id": "c1001",
"maxPurchaseCount": 3,
"mostFrequentProductId": "p1003"
}
{
"_id": "c1002",
"maxPurchaseCount": 2,
"mostFrequentProductId": "p1001"
}
(注意:c1002 的结果取决于 p1001 的 quantity,这里假设 p1001 的 quantity 是 2)
使用 POJO (Plain Old Java Object) 作为结果类型
为了获得类型安全性和更好的代码可读性,你可以将聚合结果映射到自定义的 Java 类中。
步骤:
- 创建一个 POJO 类,其字段名与聚合结果文档的键名匹配。
- 在调用
aggregate()方法时,使用YourPojo.class作为泛型参数。
示例:
对于第一个例子,我们可以创建一个 CustomerSpending 类。
// CustomerSpending.java
public class CustomerSpending {
private String customerId;
private double totalSpent;
// Getters and Setters (必须要有)
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public double getTotalSpent() { return totalSpent; }
public void setTotalSpent(double totalSpent) { this.totalSpent = totalSpent; }
@Override
public String toString() {
return "CustomerSpending{" +
"customerId='" + customerId + '\'' +
", totalSpent=" + totalSpent +
'}';
}
}
修改后的 Java 代码:
// ... (连接部分相同)
// 3. 执行聚合,并指定返回类型为 CustomerSpending.class
List<CustomerSpending> resultList = new ArrayList<>();
collection.aggregate(pipeline).into(resultList); // .into() 方法将结果直接填充到 List 中
// 或者使用 forEach 遍历
// collection.aggregate(pipeline).forEach(doc -> resultList.add(mongoClient.getMapper().decode(doc, CustomerSpending.class)));
// 4. 处理结果
System.out.println("使用 POJO 的聚合查询结果:");
for (CustomerSpending spending : resultList) {
System.out.println(spending);
}
// ... (关闭连接)
输出结果:
使用 POJO 的聚合查询结果:
CustomerSpending{customerId='c1001', totalSpent=220.0}
CustomerSpending{customerId='c1002', totalSpent=50.0}
注意:较新版本的 Java 驱动对 POJO 的支持非常完善,通常会自动进行映射,如果遇到问题,可能需要配置 CodecRegistry。
常用聚合阶段和操作符速查
| 阶段 | 描述 | 示例 |
|---|---|---|
$match |
过滤文档,只让符合条件的文档进入下一阶段,类似 WHERE。 |
Aggregates.match(Filters.gt("age", 18)) |
$group |
将文档分组,并对每个组的文档执行聚合操作(如求和、平均值),类似 GROUP BY。 |
Aggregates.group("$category", Accumulators.sum("total", "$price")) |
$project |
重塑文档,可以包含、排除、添加或计算新字段,类似 SELECT。 |
Aggregates.project(new Document("name", 1).append("initial", new Document("$substr", Arrays.asList("$name", 0, 1)))) |
$sort |
对文档进行排序。 | Aggregates.sort(Sorts.ascending("name")) |
$limit |
限制通过管道的文档数量。 | Aggregates.limit(10) |
$skip |
跳过指定数量的文档。 | Aggregates.skip(5) |
$unwind |
将数组类型的字段拆分成多个独立的文档。 | Aggregates.unwind("tags") |
$lookup |
执行左外连接,将来自另一个集合的文档合并进来,类似 JOIN。 |
Aggregates.lookup("localField", "foreignField", "as") |
$facet |
在同一个聚合管道中创建多个并行的聚合分支。 | 复杂,用于同时返回多个不同维度的聚合结果。 |
Aggregates工具类是你的好朋友,用它来构建管道阶段最方便。- 管道是列表:
List<Bson>,按顺序添加阶段。 Bson是核心:每个阶段都是Bson对象。AggregateIterable是结果:遍历它来获取最终数据。- 善用 POJO:对于生产环境,使用自定义 Java 类来接收结果可以大大提高代码的健壮性和可维护性。
掌握 MongoDB 的聚合管道是进行复杂数据分析的关键,而 Java 驱动为它提供了简洁而强大的 API,希望这个详细的指南能帮助你入门!
