杰瑞科技汇

Python updatebyquery如何正确更新数据?

Of course! The term "update by query" in Python almost always refers to performing bulk update operations on a database based on a search query. The most common tool for this in the Python ecosystem is Elasticsearch, but the concept also applies to other databases like MongoDB and SQL databases.

Python updatebyquery如何正确更新数据?-图1
(图片来源网络,侵删)

Let's break down how to do this for the most popular databases.


Elasticsearch (The Most Common Use Case)

Elasticsearch has a dedicated API called Update By Query that is perfect for this. It allows you to run a query to find documents and then apply a script or a partial document update to all of them in a single, efficient operation.

Key Libraries:

  • elasticsearch: The official low-level client.
  • elasticsearch-dsl: A higher-level, more "Pythonic" client that is often easier to use.

Scenario:

Let's say we have a product index and we want to give a 10% discount to all products in the "electronics" category.


Method A: Using the Official elasticsearch Client

This method is more direct and uses the raw Elasticsearch API calls.

Python updatebyquery如何正确更新数据?-图2
(图片来源网络,侵删)

Installation:

pip install elasticsearch

Python Code:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# --- Setup ---
# Replace with your Elasticsearch node's URL
es = Elasticsearch(["http://localhost:9200"])
# Define the query to find the documents we want to update
query = {
    "query": {
        "term": {
            "category.keyword": "electronics" # Using .keyword for exact match
        }
    }
}
# Define the update script to apply
# The script gets the current price, multiplies it by 0.9, and sets the new price
script = {
    "source": "ctx._source.price = ctx._source.price * params.discount_factor",
    "lang": "painless", # The default scripting language
    "params": {
        "discount_factor": 0.9
    }
}
# --- Execution ---
# The update_by_query function performs the operation
# It's better to use `requests_per_second` to avoid overloading the cluster
response = es.update_by_query(
    index="products",       # The index to update
    body=query,             # The query to find documents
    script=script,          # The script to apply
    conflicts="proceed",    # How to handle version conflicts (optional, but good)
    requests_per_second=-1  # -1 for no throttling, or set a value (e.g., 100)
)
# --- Verification ---
print(f"Total hits found for update: {response['total']}")
print(f"Updated successfully: {response['updated']}")
print(f"Errors: {response['failures']}")
# You can also verify by searching for a product
updated_product = es.get(index="products", id="some_electronics_product_id")
print(f"New price: {updated_product['_source']['price']}")

Method B: Using the elasticsearch-dsl Client (Recommended)

This method is more object-oriented and often cleaner to write.

Installation:

Python updatebyquery如何正确更新数据?-图3
(图片来源网络,侵删)
pip install elasticsearch-dsl

Python Code:

from elasticsearch_dsl import connections, Document, Text, Keyword, Float
from elasticsearch_dsl.update import UpdateByQuery
# --- Setup ---
# Define the default connection
connections.create_connection(hosts=['localhost'], timeout=60)
# Define a model for your documents (optional but good practice)
class Product(Document):
    name = Text()
    category = Keyword() # Use Keyword for exact matches and aggregations
    price = Float()
    in_stock = Boolean()
    class Index:
        name = 'products'
# (Optional) You would need to initialize the index and mapping if it doesn't exist
# Product.init()
# --- Execution ---
# Create an UpdateByQuery object
ubq = Product.update_by_query()
# Set the query filter
ubq = ubq.query('term', category__keyword='electronics')
# Define the update using the .update() method
# This is a more Pythonic way to apply the script
ubq = ubq.script(
    source='ctx._source.price = ctx._source.price * params.discount_factor',
    params={'discount_factor': 0.9}
)
# Execute the update
response = ubq.execute()
# --- Verification ---
print(f"Total hits found for update: {response.total}")
print(f"Updated successfully: {response.updated}")
print(f"Errors: {response.errors}")

MongoDB

In MongoDB, the equivalent of "update by query" is update_many(). It's part of the PyMongo driver and is very straightforward.

Key Library:

  • pymongo: The official Python client for MongoDB.

Scenario:

Update all documents in the users collection where status is "inactive" to set last_login to the current time.

Installation:

pip install pymongo

Python Code:

from pymongo import MongoClient
from datetime import datetime
# --- Setup ---
# Replace with your MongoDB connection string
client = MongoClient('mongodb://localhost:27017/')
db = client['my_database'] # Your database name
users_collection = db['users'] # Your collection name
# --- Execution ---
# Define the filter to find the documents
filter_query = {'status': 'inactive'}
# Define the update operation using the $set operator
# This will set the 'last_login' field to the current UTC time
update_operation = {
    '$set': {
        'last_login': datetime.utcnow()
    }
}
# Use update_many to apply the operation to all matching documents
result = users_collection.update_many(filter_query, update_operation)
# --- Verification ---
print(f"Documents matched: {result.matched_count}")
print(f"Documents modified: {result.modified_count}")
# Verify by finding one of the updated documents
updated_user = users_collection.find_one({'status': 'inactive'})
if updated_user:
    print(f"Updated user's last_login: {updated_user['last_login']}")

SQL Databases (e.g., PostgreSQL, MySQL)

For SQL databases, you don't need a special function. You can perform an UPDATE statement with a WHERE clause directly using a database library like psycopg2 (for PostgreSQL) or mysql-connector-python.

Key Libraries:

  • psycopg2: For PostgreSQL.
  • mysql-connector-python: For MySQL.
  • SQLAlchemy: A powerful ORM that can work with many SQL databases.

Scenario:

Update all orders in the orders table where the status is "pending" to change the status to "processing".

Installation (for PostgreSQL):

pip install psycopg2-binary

Python Code:

import psycopg2
from psycopg2 import sql
# --- Setup ---
# Replace with your database connection details
DB_NAME = "mydatabase"
DB_USER = "myuser"
DB_PASS = "mypassword"
DB_HOST = "localhost"
DB_PORT = "5432"
# --- Execution ---
try:
    # Establish a connection to the database
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASS,
        host=DB_HOST,
        port=DB_PORT
    )
    cursor = conn.cursor()
    # Define the SQL query
    # Using sql.Identifier and sql.Placeholder is safer to prevent SQL injection
    update_query = sql.SQL("UPDATE {} SET {} = {} WHERE {} = {}").format(
        sql.Identifier('orders'),
        sql.Identifier('status'),
        sql.Literal('processing'),
        sql.Identifier('status'),
        sql.Literal('pending')
    )
    # Execute the query
    cursor.execute(update_query)
    # Commit the transaction to make the changes permanent
    conn.commit()
    # --- Verification ---
    # Get the number of rows affected
    rows_updated = cursor.rowcount
    print(f"{rows_updated} rows were updated.")
except (Exception, psycopg2.Error) as error:
    print(f"Error while connecting to PostgreSQL or executing query: {error}")
    # If an error occurs, roll back any changes
    if 'conn' in locals() and conn is not None:
        conn.rollback()
finally:
    # Close the cursor and connection
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()
    print("PostgreSQL connection is closed.")

Summary and Best Practices

Feature Elasticsearch MongoDB SQL
Method update_by_query API update_many() UPDATE ... WHERE
Key Concept Query + Script Filter + Update Operator Filter + Set Clause
Use Case Complex updates across many documents, re-indexing. Updating fields based on a match query. Standard relational data updates.
Python Lib elasticsearch, elasticsearch-dsl pymongo psycopg2, mysql-connector, SQLAlchemy
分享:
扫描分享到社交APP
上一篇
下一篇