杰瑞科技汇

Python MongoDB 封装,如何高效简化操作?

我们将采用 单例模式 来管理 MongoDB 连接,确保在整个应用中只创建一个连接池,提高资源利用率和性能。

Python MongoDB 封装,如何高效简化操作?-图1
(图片来源网络,侵删)

项目结构

为了保持代码的整洁和可维护性,我们建议采用以下结构:

mongo_project/
├── config.py          # 配置文件,存储数据库连接信息
├── mongo_client.py    # 核心 MongoDB 客户端封装类
├── models.py          # 示例数据模型(可选,推荐使用 Pydantic)
└── main.py            # 示例使用脚本

配置文件 (config.py)

将数据库连接信息与业务代码分离,是一个好习惯。

# config.py
MONGO_CONFIG = {
    "host": "localhost",
    "port": 27017,
    "username": "",  # 如果没有用户名,留空
    "password": "",  # 如果没有密码,留空
    "auth_source": "admin",  # 认证数据库,通常是 "admin"
    "database": "test_db",    # 默认数据库名
    "max_pool_size": 100,     # 连接池最大连接数
    "min_pool_size": 0,       # 连接池最小连接数
    "max_idle_time_ms": 30000 # 连接在池中的最大空闲时间 (ms)
}

核心封装类 (mongo_client.py)

这是封装的核心,我们将在这里实现所有功能。

# mongo_client.py
import pymongo
from pymongo import MongoClient, UpdateOne
from pymongo.errors import PyMongoError
from typing import Optional, Dict, List, Any, Union
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MongoDBClient:
    """
    MongoDB 客户端封装类,使用单例模式管理连接。
    提供基本的 CRUD 操作、批量操作、聚合和事务功能。
    """
    _instance = None
    _client = None
    _db = None
    def __new__(cls, *args, **kwargs):
        """单例模式实现"""
        if not cls._instance:
            cls._instance = super(MongoDBClient, cls).__new__(cls)
        return cls._instance
    def __init__(self, config: Dict[str, Any]):
        """
        初始化 MongoDB 客户端。
        :param config: MongoDB 连接配置字典
        """
        if not self._client:
            try:
                # 构建连接字符串
                connection_string = self._build_connection_string(config)
                # 创建 MongoClient
                self._client = MongoClient(
                    connection_string,
                    maxPoolSize=config.get('max_pool_size', 100),
                    minPoolSize=config.get('min_pool_size', 0),
                    maxIdleTimeMS=config.get('max_idle_time_ms', 30000)
                )
                # 选择默认数据库
                self._db = self._client[config.get('database')]
                # 测试连接是否成功
                self._client.admin.command('ping')
                logger.info("MongoDB 连接成功!")
            except PyMongoError as e:
                logger.error(f"MongoDB 连接失败: {e}")
                raise
    def _build_connection_string(self, config: Dict[str, Any]) -> str:
        """根据配置构建 MongoDB 连接字符串"""
        auth_part = ""
        if config.get('username') and config.get('password'):
            auth_part = f"{config['username']}:{config['password']}@"
        host = config.get('host', 'localhost')
        port = config.get('port', 27017)
        return f"mongodb://{auth_part}{host}:{port}/"
    def get_collection(self, collection_name: str):
        """
        获取指定集合的 Collection 对象。
        :param collection_name: 集合名称
        :return: pymongo.Collection 对象
        """
        if not self._db:
            raise RuntimeError("数据库未初始化")
        return self._db[collection_name]
    # --- CRUD 操作 ---
    def insert_one(self, collection_name: str, document: Dict[str, Any]) -> str:
        """
        向集合中插入一个文档。
        :param collection_name: 集合名称
        :param document: 要插入的文档 (字典)
        :return: 插入文档的 _id (str)
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.insert_one(document)
            logger.info(f"成功插入文档,ID: {result.inserted_id}")
            return str(result.inserted_id)
        except PyMongoError as e:
            logger.error(f"插入文档失败: {e}")
            raise
    def insert_many(self, collection_name: str, documents: List[Dict[str, Any]]) -> List[str]:
        """
        向集合中插入多个文档。
        :param collection_name: 集合名称
        :param documents: 要插入的文档列表
        :return: 插入文档的 _id 列表 (List[str])
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.insert_many(documents)
            logger.info(f"成功插入 {len(result.inserted_ids)} 个文档")
            return [str(_id) for _id in result.inserted_ids]
        except PyMongoError as e:
            logger.error(f"批量插入文档失败: {e}")
            raise
    def find_one(self, collection_name: str, query: Dict[str, Any], 
                 projection: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
        """
        查找并返回一个文档。
        :param collection_name: 集合名称
        :param query: 查询条件
        :param projection: 投影,指定返回哪些字段
        :return: 匹配的文档,未找到则返回 None
        """
        collection = self.get_collection(collection_name)
        try:
            return collection.find_one(query, projection)
        except PyMongoError as e:
            logger.error(f"查找文档失败: {e}")
            raise
    def find(self, collection_name: str, query: Dict[str, Any], 
             projection: Optional[Dict[str, Any]] = None,
             sort: Optional[List[tuple]] = None,
             limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        查找并返回多个文档。
        :param collection_name: 集合名称
        :param query: 查询条件
        :param projection: 投影
        :param sort: 排序,[('name', 1)] 表示按 name 升序
        :param limit: 限制返回结果数量
        :return: 匹配的文档列表
        """
        collection = self.get_collection(collection_name)
        try:
            cursor = collection.find(query, projection)
            if sort:
                cursor = cursor.sort(sort)
            if limit:
                cursor = cursor.limit(limit)
            return list(cursor)
        except PyMongoError as e:
            logger.error(f"查找文档列表失败: {e}")
            raise
    def update_one(self, collection_name: str, query: Dict[str, Any], 
                   update: Dict[str, Any], upsert: bool = False) -> Dict[str, Any]:
        """
        更新一个文档。
        :param collection_name: 集合名称
        :param query: 查询条件,定位要更新的文档
        :param update: 更新操作,{'$set': {'name': 'New Name'}}
        :param upsert: 如果不存在是否插入新文档
        :return: 更新结果,包含 matched_count, modified_count 等
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.update_one(query, update, upsert=upsert)
            logger.info(f"更新操作: 匹配 {result.matched_count} 个, 修改 {result.modified_count} 个")
            return {
                "matched_count": result.matched_count,
                "modified_count": result.modified_count,
                "upserted_id": str(result.upserted_id) if result.upserted_id else None
            }
        except PyMongoError as e:
            logger.error(f"更新文档失败: {e}")
            raise
    def update_many(self, collection_name: str, query: Dict[str, Any], 
                    update: Dict[str, Any], upsert: bool = False) -> Dict[str, Any]:
        """
        更新多个文档。
        :param collection_name: 集合名称
        :param query: 查询条件
        :param update: 更新操作
        :param upsert: 如果不存在是否插入新文档
        :return: 更新结果
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.update_many(query, update, upsert=upsert)
            logger.info(f"批量更新操作: 匹配 {result.matched_count} 个, 修改 {result.modified_count} 个")
            return {
                "matched_count": result.matched_count,
                "modified_count": result.modified_count,
                "upserted_id": str(result.upserted_id) if result.upserted_id else None
            }
        except PyMongoError as e:
            logger.error(f"批量更新文档失败: {e}")
            raise
    def delete_one(self, collection_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
        """
        删除一个文档。
        :param collection_name: 集合名称
        :param query: 查询条件
        :return: 删除结果,包含 deleted_count
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.delete_one(query)
            logger.info(f"删除操作: 删除了 {result.deleted_count} 个文档")
            return {"deleted_count": result.deleted_count}
        except PyMongoError as e:
            logger.error(f"删除文档失败: {e}")
            raise
    def delete_many(self, collection_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
        """
        删除多个文档。
        :param collection_name: 集合名称
        :param query: 查询条件
        :return: 删除结果
        """
        collection = self.get_collection(collection_name)
        try:
            result = collection.delete_many(query)
            logger.info(f"批量删除操作: 删除了 {result.deleted_count} 个文档")
            return {"deleted_count": result.deleted_count}
        except PyMongoError as e:
            logger.error(f"批量删除文档失败: {e}")
            raise
    # --- 高级操作 ---
    def bulk_write(self, collection_name: str, operations: List[Union[UpdateOne, Dict]]):
        """
        执行批量写入操作(如插入、更新、删除)。
        :param collection_name: 集合名称
        :param operations: 操作列表,可以是 UpdateOne 对象或字典形式的操作
        """
        collection = self.get_collection(collection_name)
        try:
            # 如果传入的是字典列表,需要转换成 pymongo 的操作对象
            if isinstance(operations[0], dict):
                # 示例: [{'update_one': {'filter': {'_id': 1}, 'update': {'$set': {'a': 1}}}}]
                # 这是一个简化的示例,实际应用中需要更复杂的解析
                # 这里为了演示,我们假设传入的是 UpdateOne 对象
                pass
            # 更常见的用法是直接传入 pymongo 的操作对象
            #  [UpdateOne({'_id': 1}, {'$set': {'a': 1}}), ...]
            # 注意:这里我们只演示 UpdateOne,其他操作类似
            # 需要根据传入操作类型创建对应的 BulkWriteOperation 对象
            # 这是一个简化的实现,实际中可能需要更复杂的逻辑来处理不同类型的操作
            # 为了简化,我们假设 operations 已经是 pymongo 的 BulkWriteOperation 对象列表
            # 使用 UpdateOne
            if operations and isinstance(operations[0], UpdateOne):
                 result = collection.bulk_write(operations, ordered=False)
                 logger.info(f"批量写入完成: 插入 {result.inserted_count}, 更新 {result.modified_count}, 删除 {result.deleted_count}")
                 return result
            # 如果是自定义的字典操作,需要转换
            # 这里提供一个转换示例,实际应用中需要完善
            bulk_operations = []
            for op_dict in operations:
                if op_dict.get('type') == 'update_one':
                    bulk_operations.append(UpdateOne(op_dict['filter'], op_dict['update']))
            if bulk_operations:
                result = collection.bulk_write(bulk_operations, ordered=False)
                logger.info(f"批量写入完成: 插入 {result.inserted_count}, 更新 {result.modified_count}, 删除 {result.deleted_count}")
                return result
        except PyMongoError as e:
            logger.error(f"批量写入失败: {e}")
            raise
    def aggregate(self, collection_name: str, pipeline: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        执行聚合操作。
        :param collection_name: 集合名称
        :param pipeline: 聚合管道,[{'$match': {'age': {'$gt': 20}}}, {'$group': {'_id': '$city', 'count': {'$sum': 1}}}]
        :return: 聚合结果列表
        """
        collection = self.get_collection(collection_name)
        try:
            return list(collection.aggregate(pipeline))
        except PyMongoError as e:
            logger.error(f"聚合操作失败: {e}")
            raise
    # --- 事务处理 ---
    def start_session(self):
        """启动一个客户端会话,用于事务"""
        return self._client.start_session()
    def with_transaction(self, callback, session=None):
        """
        执行一个带事务的回调函数。
        :param callback: 一个接受 session 作为参数的函数,包含事务内的操作
        :param session: 可选,传入已存在的会话
        :return: 回调函数的返回值
        """
        if session is None:
            session = self.start_session()
        try:
            with session.start_transaction():
                logger.info("事务已开始。")
                # 执行回调函数,并将 session 传入
                result = callback(session)
                logger.info("事务提交成功。")
                return result
        except Exception as e:
            logger.error(f"事务失败,已回滚: {e}")
            raise
        finally:
            session.end_session()
    def close(self):
        """关闭 MongoDB 连接"""
        if self._client:
            self._client.close()
            logger.info("MongoDB 连接已关闭。")

示例数据模型 (models.py) - 可选但推荐

使用 Pydantic 可以定义数据结构,提供数据验证和类型提示。

Python MongoDB 封装,如何高效简化操作?-图2
(图片来源网络,侵删)
pip install pydantic
# models.py
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime
class User(BaseModel):
    name: str
    email: EmailStr
    age: int
    is_active: bool = True
    tags: List[str] = []
    created_at: datetime = datetime.utcnow()
class Product(BaseModel):
    name: str
    price: float
    stock: int
    description: Optional[str] = None

使用示例 (main.py)

现在我们来演示如何使用这个封装好的 MongoDBClient

# main.py
import sys
import os
from datetime import datetime
# 将项目根目录添加到 Python 路径,以便导入模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from mongo_client import MongoDBClient
from config import MONGO_CONFIG
from models import User, Product
def main():
    # 1. 初始化客户端 (单例模式,多次初始化也只会创建一次连接)
    print("--- 1. 初始化 MongoDB 客户端 ---")
    mongo_client = MongoDBClient(MONGO_CONFIG)
    db = mongo_client._db  # 获取数据库对象 (内部使用,推荐通过 client.get_collection 访问)
    # 2. 准备测试数据
    print("\n--- 2. 准备测试数据 ---")
    users_collection_name = "users"
    products_collection_name = "products"
    # 清空集合,确保每次运行都是干净的
    db[users_collection_name].delete_many({})
    db[products_collection_name].delete_many({})
    user_data = [
        {"name": "Alice", "email": "alice@example.com", "age": 30, "tags": ["python", "mongodb"]},
        {"name": "Bob", "email": "bob@example.com", "age": 25, "tags": ["java", "spring"]},
        {"name": "Charlie", "email": "charlie@example.com", "age": 35, "tags": ["javascript", "react"]},
    ]
    product_data = [
        {"name": "Laptop", "price": 1200.50, "stock": 50},
        {"name": "Mouse", "price": 25.00, "stock": 200},
        {"name": "Keyboard", "price": 75.00, "stock": 150},
    ]
    # 3. 插入操作
    print("\n--- 3. 插入操作 ---")
    mongo_client.insert_many(users_collection_name, user_data)
    mongo_client.insert_many(products_collection_name, product_data)
    print("用户和产品数据插入成功。")
    # 4. 查询操作
    print("\n--- 4. 查询操作 ---")
    # 查找所有用户
    all_users = mongo_client.find(users_collection_name, {})
    print(f"所有用户数量: {len(all_users)}")
    # 查找年龄大于 28 的用户
    older_users = mongo_client.find(users_collection_name, {"age": {"$gt": 28}})
    print(f"年龄大于 28 的用户: {[u['name'] for u in older_users]}")
    # 查找一个特定用户
    alice = mongo_client.find_one(users_collection_name, {"name": "Alice"})
    print(f"Alice 的信息: {alice}")
    # 5. 更新操作
    print("\n--- 5. 更新操作 ---")
    # 更新 Alice 的年龄
    update_result = mongo_client.update_one(
        users_collection_name, 
        {"name": "Alice"}, 
        {"$set": {"age": 31}}
    )
    print(f"更新 Alice 的年龄结果: {update_result}")
    # 再次查找 Alice 验证更新
    updated_alice = mongo_client.find_one(users_collection_name, {"name": "Alice"})
    print(f"更新后的 Alice 信息: {updated_alice}")
    # 6. 删除操作
    print("\n--- 6. 删除操作 ---")
    # 删除 Charlie
    delete_result = mongo_client.delete_one(users_collection_name, {"name": "Charlie"})
    print(f"删除 Charlie 的结果: {delete_result}")
    # 验证 Charlie 是否被删除
    charlie = mongo_client.find_one(users_collection_name, {"name": "Charlie"})
    print(f"查找 Charlie (应不存在): {charlie}")
    # 7. 聚合操作
    print("\n--- 7. 聚合操作 ---")
    # 计算每个年龄段的人数
    age_pipeline = [
        {"$group": {"_id": "$age", "count": {"$sum": 1}}},
        {"$sort": {"_id": 1}}
    ]
    age_counts = mongo_client.aggregate(users_collection_name, age_pipeline)
    print(f"各年龄段人数统计: {age_counts}")
    # 8. 批量操作
    print("\n--- 8. 批量操作 ---")
    from pymongo import UpdateOne
    # 批量更新产品库存
    bulk_operations = [
        UpdateOne({"name": "Laptop"}, {"$inc": {"stock": -10}}),
        UpdateOne({"name": "Mouse"}, {"$inc": {"stock": 50}}),
        UpdateOne({"name": "Keyboard"}, {"$set": {"stock": 140}}),
    ]
    bulk_result = mongo_client.bulk_write(products_collection_name, bulk_operations)
    print(f"批量更新库存结果: {bulk_result}")
    # 9. 事务处理
    print("\n--- 9. 事务处理 ---")
    def transfer_funds(session):
        # 在一个事务中,从一个用户账户扣款,向另一个用户账户加款
        # 这里我们用一个简单的集合来模拟账户
        accounts_collection = db["accounts"]
        # Alice 转账 100 给 Bob
        from_user = "Alice"
        to_user = "Bob"
        amount = 100
        # 扣款
        accounts_collection.update_one(
            {"name": from_user},
            {"$inc": {"balance": -amount}},
            session=session
        )
        # 加款
        accounts_collection.update_one(
            {"name": to_user},
            {"$inc": {"balance": amount}},
            session=session
        )
        print(f"事务内: {from_user} 向 {to_user} 转账 {amount}")
    # 初始化账户
    db["accounts"].delete_many({})
    db["accounts"].insert_many([
        {"name": "Alice", "balance": 500},
        {"name": "Bob", "balance": 300}
    ])
    # 执行事务
    try:
        mongo_client.with_transaction(transfer_funds)
        print("转账事务成功执行。")
    except Exception as e:
        print(f"转账事务失败: {e}")
    # 验证账户余额
    alice_balance = db["accounts"].find_one({"name": "Alice"})
    bob_balance = db["accounts"].find_one({"name": "Bob"})
    print(f"事务后, Alice 余额: {alice_balance['balance']}, Bob 余额: {bob_balance['balance']}")
    # 10. 关闭连接
    print("\n--- 10. 关闭连接 ---")
    mongo_client.close()
if __name__ == "__main__":
    main()

如何运行

  1. 确保你已经安装了 pymongopydantic
    pip install pymongo pydantic
  2. 确保 MongoDB 服务正在运行(通过 Docker docker run -d -p 27017:27017 mongo)。
  3. 将上述所有代码文件保存到 mongo_project 目录下。
  4. 在终端中进入 mongo_project 目录,然后运行:
    python main.py

这个封装方案提供了以下优点:

  1. 易于使用:将复杂的 PyMongo 操作封装成简单的函数调用,如 insert_one, find
  2. 连接管理:使用单例模式,自动管理连接池,避免重复创建和销毁连接。
  3. 配置分离:数据库配置与业务代码分离,便于在不同环境(开发、测试、生产)间切换。
  4. 功能全面:覆盖了 CRUD、批量操作、聚合和事务等核心功能。
  5. 健壮性:内置了错误处理和日志记录,方便调试和监控。
  6. 可扩展性:你可以基于这个基础框架,轻松地添加更多高级功能,如索引管理、分片操作等。

你可以根据自己项目的具体需求,对这个封装进行进一步的定制和优化。

Python MongoDB 封装,如何高效简化操作?-图3
(图片来源网络,侵删)
分享:
扫描分享到社交APP
上一篇
下一篇