杰瑞科技汇

MongoDB Java聚合如何实现?

MongoDB 的聚合管道是一种强大的数据处理工具,它允许你将多个阶段(stage)的数据处理操作链接在一起,像工厂的流水线一样,对文档进行一系列的转换和组合,最终得到你想要的结果。

MongoDB Java聚合如何实现?-图1
(图片来源网络,侵删)

核心概念:聚合管道

聚合管道由一系列的阶段组成,每个阶段都会接收上一阶段的输出,并将其作为自己的输入,你可以将管道想象成一个数据处理的流水线。

  • 文档:每个阶段处理的是文档集合。
  • 阶段:每个阶段对文档执行一个操作,如过滤、分组、排序、计算等。
  • 操作符:在阶段内部,可以使用操作符来定义具体的逻辑,如 $match(匹配)、$group(分组)、$sum(求和)等。

Java 驱动中的核心类

在 MongoDB Java 驱动中,聚合操作主要通过以下几个类来完成:

  • Aggregates: 这是一个工具类,用于创建各种聚合阶段。Aggregates.match(), Aggregates.group() 等,使用它可以使代码更清晰、更易于阅读。
  • Bson: 这是 MongoDB Java 驱动中表示 BSON (Binary JSON) 的核心接口,一个聚合阶段(如 $match)就是一个 Bson 对象,你可以使用 Aggregates 工具类来创建它,也可以手动构建。
  • AggregateIterable<T>: 这是执行聚合操作后返回的结果迭代器,你可以对它进行遍历,获取最终的文档。<T> 指定了你希望结果文档映射到的 Java 类型(Document.class 或你自定义的 POJO 类)。

准备工作:添加依赖

确保你的项目中包含了 MongoDB Java 驱动的依赖,如果你使用 Maven,在 pom.xml 中添加:

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
    <version>4.11.1</version> <!-- 请使用最新稳定版本 -->
</dependency>

聚合操作步骤

  1. 连接到 MongoDB:创建 MongoClientMongoDatabaseMongoCollection 对象。
  2. 构建聚合管道:创建一个 List<Bson>,其中每个元素代表一个聚合阶段。
  3. 执行聚合:在 MongoCollection 对象上调用 aggregate() 方法,并将你的管道列表作为参数传入。
  4. 处理结果:遍历返回的 AggregateIterable,获取处理后的文档。

完整示例

假设我们有一个 orders 集合,存储了订单信息,数据结构如下:

MongoDB Java聚合如何实现?-图2
(图片来源网络,侵删)
// orders 集合中的文档示例
{
  "_id": 1,
  "orderDate": ISODate("2025-10-01T08:00:00Z"),
  "customerId": "c1001",
  "items": [
    { "productId": "p1001", "quantity": 2, "price": 50.00 },
    { "productId": "p1002", "quantity": 1, "price": 120.00 }
  ],
  "status": "completed"
}
{
  "_id": 2,
  "orderDate": ISODate("2025-10-02T09:30:00Z"),
  "customerId": "c1002",
  "items": [
    { "productId": "p1001", "quantity": 1, "price": 50.00 }
  ],
  "status": "completed"
}
{
  "_id": 3,
  "orderDate": ISODate("2025-10-03T10:00:00Z"),
  "customerId": "c1001",
  "items": [
    { "productId": "p1003", "quantity": 3, "price": 25.00 }
  ],
  "status": "shipped"
}

示例 1:计算每个客户的总消费金额

需求:找出每个客户完成的订单总金额。

SQL 类比: SELECT customerId, SUM(totalAmount) FROM orders WHERE status = 'completed' GROUP BY customerId;

Java 代码:

import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Arrays;
public class MongoAggregationExample {
    public static void main(String[] args) {
        // 1. 连接到 MongoDB
        String uri = "mongodb://localhost:27017";
        try (MongoClient mongoClient = MongoClients.create(uri)) {
            MongoDatabase database = mongoClient.getDatabase("testDB");
            MongoCollection<Document> collection = database.getCollection("orders");
            // 2. 构建聚合管道
            // 阶段1: $match - 只筛选状态为 "completed" 的订单
            Bson matchStage = Aggregates.match(Filters.eq("status", "completed"));
            // 阶段2: $project - 重塑文档,计算订单总金额
            // $map 遍历 items 数组,$multiply 计算每个商品的小计
            // $sum 将所有小计相加,得到订单总金额
            Bson projectStage = Aggregates.project(
                new Document("customerId", 1)
                    .append("totalAmount", new Document("$sum",
                        new Document("$map", new Document()
                            .append("input", "$items")
                            .append("in", new Document("$multiply", Arrays.asList("$$this.quantity", "$$this.price")))
                        )
                    ))
            );
            // 阶段3: $group - 按 customerId 分组,并累加总金额
            Bson groupStage = Aggregates.group(
                "$customerId",
                new Document("totalSpent", new Document("$sum", "$totalAmount"))
            );
            // 将所有阶段添加到管道列表中
            List<Bson> pipeline = Arrays.asList(matchStage, projectStage, groupStage);
            // 3. 执行聚合
            System.out.println("聚合查询结果:");
            AggregateIterable<Document> result = collection.aggregate(pipeline);
            // 4. 处理结果
            for (Document doc : result) {
                System.out.println(doc.toJson());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输出结果:

MongoDB Java聚合如何实现?-图3
(图片来源网络,侵删)
聚合查询结果:
{
  "_id": "c1001",
  "totalSpent": 220.0
}
{
  "_id": "c1002",
  "totalSpent": 50.0
}

示例 2:查找每个客户购买次数最多的商品

需求:对于每个客户,找出他们购买次数最多的那个商品的 ID。

SQL 类比: 这个查询比较复杂,用 SQL 实现起来很繁琐,但用 MongoDB 聚合管道则非常直观。

Java 代码:

import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Sorts;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Arrays;
public class MongoAggregationComplexExample {
    public static void main(String[] args) {
        String uri = "mongodb://localhost:27017";
        try (MongoClient mongoClient = MongoClients.create(uri)) {
            MongoDatabase database = mongoClient.getDatabase("testDB");
            MongoCollection<Document> collection = database.getCollection("orders");
            // 阶段1: $unwind - 解构 items 数组,将每个商品变成一个独立的文档
            Bson unwindStage = Aggregates.unwind("$items");
            // 阶段2: $group - 按 customerId 和 productId 分组,计算每个商品的购买次数
            Bson groupStage1 = Aggregates.group(
                new Document("customerId", "$customerId")
                    .append("productId", "$items.productId"),
                Accumulators.sum("purchaseCount", "$items.quantity")
            );
            // 阶段3: $group - 再次分组,这次只按 customerId,并找出购买次数最多的商品
            // $max 找出最大的 purchaseCount
            // $first 获取该最大值对应的 productId
            Bson groupStage2 = Aggregates.group(
                "$customerId",
                Accumulators.max("maxPurchaseCount", "$purchaseCount"),
                Accumulators.first("mostFrequentProductId", "$productId")
            );
            // 阶段4: $sort - 按购买次数降序排列
            Bson sortStage = Aggregates.sort(Sorts.descending("maxPurchaseCount"));
            List<Bson> pipeline = Arrays.asList(unwindStage, groupStage1, groupStage2, sortStage);
            System.out.println("复杂聚合查询结果:");
            AggregateIterable<Document> result = collection.aggregate(pipeline);
            for (Document doc : result) {
                System.out.println(doc.toJson());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输出结果:

复杂聚合查询结果:
{
  "_id": "c1001",
  "maxPurchaseCount": 3,
  "mostFrequentProductId": "p1003"
}
{
  "_id": "c1002",
  "maxPurchaseCount": 2,
  "mostFrequentProductId": "p1001"
}

(注意:c1002 的结果取决于 p1001quantity,这里假设 p1001quantity 是 2)


使用 POJO (Plain Old Java Object) 作为结果类型

为了获得类型安全性和更好的代码可读性,你可以将聚合结果映射到自定义的 Java 类中。

步骤:

  1. 创建一个 POJO 类,其字段名与聚合结果文档的键名匹配。
  2. 在调用 aggregate() 方法时,使用 YourPojo.class 作为泛型参数。

示例: 对于第一个例子,我们可以创建一个 CustomerSpending 类。

// CustomerSpending.java
public class CustomerSpending {
    private String customerId;
    private double totalSpent;
    // Getters and Setters (必须要有)
    public String getCustomerId() { return customerId; }
    public void setCustomerId(String customerId) { this.customerId = customerId; }
    public double getTotalSpent() { return totalSpent; }
    public void setTotalSpent(double totalSpent) { this.totalSpent = totalSpent; }
    @Override
    public String toString() {
        return "CustomerSpending{" +
                "customerId='" + customerId + '\'' +
                ", totalSpent=" + totalSpent +
                '}';
    }
}

修改后的 Java 代码:

// ... (连接部分相同)
// 3. 执行聚合,并指定返回类型为 CustomerSpending.class
List<CustomerSpending> resultList = new ArrayList<>();
collection.aggregate(pipeline).into(resultList); // .into() 方法将结果直接填充到 List 中
// 或者使用 forEach 遍历
// collection.aggregate(pipeline).forEach(doc -> resultList.add(mongoClient.getMapper().decode(doc, CustomerSpending.class)));
// 4. 处理结果
System.out.println("使用 POJO 的聚合查询结果:");
for (CustomerSpending spending : resultList) {
    System.out.println(spending);
}
// ... (关闭连接)

输出结果:

使用 POJO 的聚合查询结果:
CustomerSpending{customerId='c1001', totalSpent=220.0}
CustomerSpending{customerId='c1002', totalSpent=50.0}

注意:较新版本的 Java 驱动对 POJO 的支持非常完善,通常会自动进行映射,如果遇到问题,可能需要配置 CodecRegistry

常用聚合阶段和操作符速查

阶段 描述 示例
$match 过滤文档,只让符合条件的文档进入下一阶段,类似 WHERE Aggregates.match(Filters.gt("age", 18))
$group 将文档分组,并对每个组的文档执行聚合操作(如求和、平均值),类似 GROUP BY Aggregates.group("$category", Accumulators.sum("total", "$price"))
$project 重塑文档,可以包含、排除、添加或计算新字段,类似 SELECT Aggregates.project(new Document("name", 1).append("initial", new Document("$substr", Arrays.asList("$name", 0, 1))))
$sort 对文档进行排序。 Aggregates.sort(Sorts.ascending("name"))
$limit 限制通过管道的文档数量。 Aggregates.limit(10)
$skip 跳过指定数量的文档。 Aggregates.skip(5)
$unwind 将数组类型的字段拆分成多个独立的文档。 Aggregates.unwind("tags")
$lookup 执行左外连接,将来自另一个集合的文档合并进来,类似 JOIN Aggregates.lookup("localField", "foreignField", "as")
$facet 在同一个聚合管道中创建多个并行的聚合分支。 复杂,用于同时返回多个不同维度的聚合结果。
  • Aggregates 工具类是你的好朋友,用它来构建管道阶段最方便。
  • 管道是列表List<Bson>,按顺序添加阶段。
  • Bson 是核心:每个阶段都是 Bson 对象。
  • AggregateIterable 是结果:遍历它来获取最终数据。
  • 善用 POJO:对于生产环境,使用自定义 Java 类来接收结果可以大大提高代码的健壮性和可维护性。

掌握 MongoDB 的聚合管道是进行复杂数据分析的关键,而 Java 驱动为它提供了简洁而强大的 API,希望这个详细的指南能帮助你入门!

分享:
扫描分享到社交APP
上一篇
下一篇