Of course! The term "update by query" in Python almost always refers to performing bulk update operations on a database based on a search query. The most common tool for this in the Python ecosystem is Elasticsearch, but the concept also applies to other databases like MongoDB and SQL databases.

Let's break down how to do this for the most popular databases.
Elasticsearch (The Most Common Use Case)
Elasticsearch has a dedicated API called Update By Query that is perfect for this. It allows you to run a query to find documents and then apply a script or a partial document update to all of them in a single, efficient operation.
Key Libraries:
elasticsearch: The official low-level client.elasticsearch-dsl: A higher-level, more "Pythonic" client that is often easier to use.
Scenario:
Let's say we have a product index and we want to give a 10% discount to all products in the "electronics" category.
Method A: Using the Official elasticsearch Client
This method is more direct and uses the raw Elasticsearch API calls.

Installation:
pip install elasticsearch
Python Code:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# --- Setup ---
# Replace with your Elasticsearch node's URL
es = Elasticsearch(["http://localhost:9200"])
# Define the query to find the documents we want to update
query = {
"query": {
"term": {
"category.keyword": "electronics" # Using .keyword for exact match
}
}
}
# Define the update script to apply
# The script gets the current price, multiplies it by 0.9, and sets the new price
script = {
"source": "ctx._source.price = ctx._source.price * params.discount_factor",
"lang": "painless", # The default scripting language
"params": {
"discount_factor": 0.9
}
}
# --- Execution ---
# The update_by_query function performs the operation
# It's better to use `requests_per_second` to avoid overloading the cluster
response = es.update_by_query(
index="products", # The index to update
body=query, # The query to find documents
script=script, # The script to apply
conflicts="proceed", # How to handle version conflicts (optional, but good)
requests_per_second=-1 # -1 for no throttling, or set a value (e.g., 100)
)
# --- Verification ---
print(f"Total hits found for update: {response['total']}")
print(f"Updated successfully: {response['updated']}")
print(f"Errors: {response['failures']}")
# You can also verify by searching for a product
updated_product = es.get(index="products", id="some_electronics_product_id")
print(f"New price: {updated_product['_source']['price']}")
Method B: Using the elasticsearch-dsl Client (Recommended)
This method is more object-oriented and often cleaner to write.
Installation:

pip install elasticsearch-dsl
Python Code:
from elasticsearch_dsl import connections, Document, Text, Keyword, Float
from elasticsearch_dsl.update import UpdateByQuery
# --- Setup ---
# Define the default connection
connections.create_connection(hosts=['localhost'], timeout=60)
# Define a model for your documents (optional but good practice)
class Product(Document):
name = Text()
category = Keyword() # Use Keyword for exact matches and aggregations
price = Float()
in_stock = Boolean()
class Index:
name = 'products'
# (Optional) You would need to initialize the index and mapping if it doesn't exist
# Product.init()
# --- Execution ---
# Create an UpdateByQuery object
ubq = Product.update_by_query()
# Set the query filter
ubq = ubq.query('term', category__keyword='electronics')
# Define the update using the .update() method
# This is a more Pythonic way to apply the script
ubq = ubq.script(
source='ctx._source.price = ctx._source.price * params.discount_factor',
params={'discount_factor': 0.9}
)
# Execute the update
response = ubq.execute()
# --- Verification ---
print(f"Total hits found for update: {response.total}")
print(f"Updated successfully: {response.updated}")
print(f"Errors: {response.errors}")
MongoDB
In MongoDB, the equivalent of "update by query" is update_many(). It's part of the PyMongo driver and is very straightforward.
Key Library:
pymongo: The official Python client for MongoDB.
Scenario:
Update all documents in the users collection where status is "inactive" to set last_login to the current time.
Installation:
pip install pymongo
Python Code:
from pymongo import MongoClient
from datetime import datetime
# --- Setup ---
# Replace with your MongoDB connection string
client = MongoClient('mongodb://localhost:27017/')
db = client['my_database'] # Your database name
users_collection = db['users'] # Your collection name
# --- Execution ---
# Define the filter to find the documents
filter_query = {'status': 'inactive'}
# Define the update operation using the $set operator
# This will set the 'last_login' field to the current UTC time
update_operation = {
'$set': {
'last_login': datetime.utcnow()
}
}
# Use update_many to apply the operation to all matching documents
result = users_collection.update_many(filter_query, update_operation)
# --- Verification ---
print(f"Documents matched: {result.matched_count}")
print(f"Documents modified: {result.modified_count}")
# Verify by finding one of the updated documents
updated_user = users_collection.find_one({'status': 'inactive'})
if updated_user:
print(f"Updated user's last_login: {updated_user['last_login']}")
SQL Databases (e.g., PostgreSQL, MySQL)
For SQL databases, you don't need a special function. You can perform an UPDATE statement with a WHERE clause directly using a database library like psycopg2 (for PostgreSQL) or mysql-connector-python.
Key Libraries:
psycopg2: For PostgreSQL.mysql-connector-python: For MySQL.SQLAlchemy: A powerful ORM that can work with many SQL databases.
Scenario:
Update all orders in the orders table where the status is "pending" to change the status to "processing".
Installation (for PostgreSQL):
pip install psycopg2-binary
Python Code:
import psycopg2
from psycopg2 import sql
# --- Setup ---
# Replace with your database connection details
DB_NAME = "mydatabase"
DB_USER = "myuser"
DB_PASS = "mypassword"
DB_HOST = "localhost"
DB_PORT = "5432"
# --- Execution ---
try:
# Establish a connection to the database
conn = psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS,
host=DB_HOST,
port=DB_PORT
)
cursor = conn.cursor()
# Define the SQL query
# Using sql.Identifier and sql.Placeholder is safer to prevent SQL injection
update_query = sql.SQL("UPDATE {} SET {} = {} WHERE {} = {}").format(
sql.Identifier('orders'),
sql.Identifier('status'),
sql.Literal('processing'),
sql.Identifier('status'),
sql.Literal('pending')
)
# Execute the query
cursor.execute(update_query)
# Commit the transaction to make the changes permanent
conn.commit()
# --- Verification ---
# Get the number of rows affected
rows_updated = cursor.rowcount
print(f"{rows_updated} rows were updated.")
except (Exception, psycopg2.Error) as error:
print(f"Error while connecting to PostgreSQL or executing query: {error}")
# If an error occurs, roll back any changes
if 'conn' in locals() and conn is not None:
conn.rollback()
finally:
# Close the cursor and connection
if 'cursor' in locals() and cursor is not None:
cursor.close()
if 'conn' in locals() and conn is not None:
conn.close()
print("PostgreSQL connection is closed.")
Summary and Best Practices
| Feature | Elasticsearch | MongoDB | SQL |
|---|---|---|---|
| Method | update_by_query API |
update_many() |
UPDATE ... WHERE |
| Key Concept | Query + Script | Filter + Update Operator | Filter + Set Clause |
| Use Case | Complex updates across many documents, re-indexing. | Updating fields based on a match query. | Standard relational data updates. |
| Python Lib | elasticsearch, elasticsearch-dsl |
pymongo |
psycopg2, mysql-connector, SQLAlchemy |
