杰瑞科技汇

Python memcache CAS如何实现乐观锁?

Of course! Let's dive into cas (Compare-And-Swap) in Python with the pymemcache library, which is the most popular and robust Memcached client for Python.

Python memcache CAS如何实现乐观锁?-图1
(图片来源网络,侵删)

What is CAS and Why Do You Need It?

Imagine a classic web application scenario: a user profile page.

  1. Initial Load: The server fetches the user's profile data from the database and stores it in Memcache for fast subsequent access. Let's say the user's name is "Alice".
  2. Concurrent Requests: Two different parts of your application (or two different servers) need to update the user's name.
    • Process A reads the data from Memcache: {"name": "Alice", "email": "alice@example.com"}.
    • Process B also reads the data from Memcache: {"name": "Alice", "email": "alice@example.com"}.
  3. The Problem: Both processes now have the "old" data. They both decide the name should be "Alice Smith".
    • Process A sets the new data: {"name": "Alice Smith", "email": "alice@example.com"}.
    • Process B also sets the new data: {"name": "Alice Smith", "email": "alice@example.com"}.

The result is a lost update. The fact that the email was changed by another process between the read and the write is completely lost. This is a classic race condition.

CAS solves this.

CAS provides a mechanism for atomic operations. When you read a value from Memcache, the server also gives you a unique token (a 64-bit number). When you write the value back, you must provide that same token. The write will only succeed if the token in the cache has not been changed since you read the value.

Python memcache CAS如何实现乐观锁?-图2
(图片来源网络,侵删)

This ensures that you are only overwriting data that you know hasn't been modified by another process in the meantime.


How to Use CAS with pymemcache

The pymemcache library provides a cas() method to perform this operation. The key is to use the gets() method for reading, which returns both the value and the CAS token.

Step 1: Installation

First, make sure you have pymemcache installed. If not, install it via pip:

pip install pymemcache

You'll also need a running Memcached server.

Python memcache CAS如何实现乐观锁?-图3
(图片来源网络,侵删)

Step 2: The Basic gets() and cas() Workflow

Here is a complete, runnable example demonstrating the CAS mechanism.

from pymemcache.client.base import Client
import time
import threading
# --- Setup ---
# Connect to your Memcached server
client = Client(('localhost', 11211))
# A key to work with
USER_PROFILE_KEY = 'user:1001'
def run_cas_operation(thread_id):
    """
    A function that simulates a thread trying to update a user's profile.
    It will retry until it successfully performs a CAS operation.
    """
    print(f"--- Thread {thread_id} starting ---")
    retries = 0
    max_retries = 5
    while retries < max_retries:
        retries += 1
        print(f"Thread {thread_id}: Attempt {retries}")
        # 1. GET the value and the CAS token using gets()
        # This is the crucial step. 'gets' returns (value, cas_token)
        current_value, cas_token = client.gets(USER_PROFILE_KEY)
        if current_value is None:
            # Key doesn't exist, we can set it directly
            print(f"Thread {thread_id}: Key not found, setting initial value.")
            client.set(USER_PROFILE_KEY, b'{"name": "Initial User"}')
            time.sleep(1) # Let the other thread have a chance
            continue
        # Decode the value for easy manipulation
        profile = current_value.decode('utf-8')
        print(f"Thread {thread_id}: Read profile: {profile}")
        # 2. Modify the value locally
        # Let's pretend this thread wants to add a 'last_updated' field
        import json
        data = json.loads(profile)
        data['last_updated_by'] = f'thread-{thread_id}'
        data['timestamp'] = int(time.time())
        new_value = json.dumps(data).encode('utf-8')
        print(f"Thread {thread_id}: Preparing to write: {new_value.decode('utf-8')}")
        # 3. CAS the new value back
        # The 'cas' method returns True on success, False on failure (token mismatch)
        success = client.cas(USER_PROFILE_KEY, new_value, cas_token)
        if success:
            print(f"Thread {thread_id}: CAS SUCCESS! Profile updated.")
            break  # Exit the loop on success
        else:
            print(f"Thread {thread_id}: CAS FAILED! Value was modified by another thread. Retrying...")
            time.sleep(0.5) # Wait a bit before retrying
    if retries >= max_retries:
        print(f"Thread {thread_id}: Max retries reached. Giving up.")
# --- Main Execution ---
# Clean up the key to start fresh
client.delete(USER_PROFILE_KEY)
# Create two threads that will try to update the same key
thread1 = threading.Thread(target=run_cas_operation, args=(1,))
thread2 = threading.Thread(target=run_cas_operation, args=(2,))
# Start both threads
thread1.start()
thread2.start()
# Wait for both threads to complete
thread1.join()
thread2.join()
# --- Final Check ---
# See the final state of the key
final_value, final_cas = client.gets(USER_PROFILE_KEY)
print("\n--- Final Result ---")
print(f"Final profile in cache: {final_value.decode('utf-8')}")
print(f"Final CAS token: {final_cas}")
# Clean up
client.delete(USER_PROFILE_KEY)

How to Run the Example

  1. Save the code as cas_example.py.
  2. Make sure your Memcached server is running (memcached -d).
  3. Run the script: python cas_example.py.

You will see output similar to this (the exact timing and order may vary):

--- Thread 1 starting ---
Thread 1: Attempt 1
Thread 1: Key not found, setting initial value.
--- Thread 2 starting ---
Thread 2: Attempt 1
Thread 2: Key not found, setting initial value.
Thread 1: Read profile: {"name": "Initial User"}
Thread 1: Preparing to write: {"name": "Initial User", "last_updated_by": "thread-1", "timestamp": 167...}
Thread 1: CAS SUCCESS! Profile updated.
Thread 2: Read profile: {"name": "Initial User", "last_updated_by": "thread-1", "timestamp": 167...}
Thread 2: Preparing to write: {"name": "Initial User", "last_updated_by": "thread-2", "timestamp": 167...}
Thread 2: CAS FAILED! Value was modified by another thread. Retrying...
Thread 2: Attempt 2
Thread 2: Read profile: {"name": "Initial User", "last_updated_by": "thread-1", "timestamp": 167...}
Thread 2: Preparing to write: {"name": "Initial User", "last_updated_by": "thread-2", "timestamp": 167...}
Thread 2: CAS FAILED! Value was modified by another thread. Retrying...
Thread 2: Attempt 3
Thread 2: Read profile: {"name": "Initial User", "last_updated_by": "thread-1", "timestamp": 167...}
Thread 2: Preparing to write: {"name": "Initial User", "last_updated_by": "thread-2", "timestamp": 167...}
Thread 2: CAS FAILED! Value was modified by another thread. Retrying...
... (retries continue until one wins) ...
--- Final Result ---
Final profile in cache: {"name": "Initial User", "last_updated_by": "thread-1", "timestamp": 167...}
Final CAS token: 123456789

Notice how Thread 2's first attempt failed because the token it had was no longer valid after Thread 1 successfully updated the value. Thread 2 had to re-read the data (getting a new token) and try again.


Key pymemcache Methods for CAS

Method Description Return Value
gets(key) Retrieves a value and its corresponding CAS token. (value, cas_token) or (None, None) if the key doesn't exist. The value is bytes.
cas(key, value, cas_token) Sets a value only if the cas_token matches the one in the cache. True on success, False on failure (the token was stale).
set(key, value, ...) A standard unconditional set. It does not use a CAS token and will always overwrite the value. True on success.

Important Considerations

  1. Token is a Number, Not a String: The cas_token returned by gets() is a 64-bit integer. You don't need to worry about its format, just pass it back to cas() as is.
  2. No CAS for add or replace: The add command only sets a key if it doesn't exist. The replace command only sets a key if it does exist. Neither of these use a CAS token to check for concurrent modifications. cas is the right tool for the "read-modify-write" pattern.
  3. Retries are Essential: A cas operation can fail. Your application logic must be prepared to handle this failure by re-reading the data, re-applying its changes, and trying the cas operation again. This is often called an "optimistic locking" pattern.
  4. Performance: gets and cas are very fast, but they do involve an extra network round-trip compared to a simple get/set. The performance gain from avoiding race conditions and database hits almost always outweighs this cost.
  5. Alternatives: For more complex atomic operations, Memcached also supports incr (increment) and decr (decrement), which are atomic and don't require CAS tokens. Use them for counters. For everything else, cas is your best friend.
分享:
扫描分享到社交APP
上一篇
下一篇