杰瑞科技汇

Python线程互斥锁如何避免死锁?

Of course! Let's break down Python threads and mutexes (mutual exclusion locks) in a clear, practical way.

Python线程互斥锁如何避免死锁?-图1
(图片来源网络,侵删)

The Problem: Why Do We Need Mutexes?

Imagine you and a friend are sharing a single notebook. You both want to add a line to the end of a story.

  • Without a rule: You both grab the pen at the same time. You write "The dragon...", and at the exact same time, your friend writes "ate the cake...". The result is a garbled mess: "The dragoneat the cake...". This is a race condition.
  • With a rule (Mutex): You agree on a system. Before writing, you must say "I'm writing!". If the other person hears that, they wait until you say "I'm done!". This ensures that only one person can write to the notebook at a time. This "rule" is a mutex.

In programming, a mutex is a lock that protects a shared resource (like a variable, a file, or a database connection) to prevent race conditions when multiple threads try to access it simultaneously.


Python's threading Module

Python provides a built-in threading module to create and manage threads. Here's a simple example to demonstrate the problem.

Example 1: The Race Condition

Let's create a shared counter and have multiple threads increment it. We'll expect the final value to be 100,000 (10 threads * 10,000 increments each). However, due to race conditions, the result will be unpredictable and almost always less than 100,000.

Python线程互斥锁如何避免死锁?-图2
(图片来源网络,侵删)
import threading
# A shared resource that multiple threads will try to modify
shared_counter = 0
# The number of times each thread will increment the counter
COUNT = 100000
def worker():
    """A function that increments the shared counter."""
    global shared_counter
    for _ in range(COUNT):
        # This is NOT an atomic operation. It involves:
        # 1. Read shared_counter
        # 2. Add 1 to it
        # 3. Write the new value back to shared_counter
        shared_counter += 1
# Create and start 10 threads
threads = []
for i in range(10):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()
# Wait for all threads to complete
for thread in threads:
    thread.join()
print(f"Final counter value: {shared_counter}")
# Expected: 1000000
# Actual (likely): A number much less than 1000000

Why does this happen? Even though shared_counter += 1 looks like a single line, Python has to perform several steps to execute it. If one thread is interrupted after reading the value but before writing it back, another thread can read the old value, leading to a lost update.


The Solution: The threading.Lock

A Lock is the Python implementation of a mutex. It ensures that only one thread can execute a specific block of code at a time.

How it works:

  1. lock.acquire(): A thread calls this to request the lock.
    • If the lock is unlocked, the thread acquires it, and the code inside the with statement or the try...finally block runs.
    • If the lock is already locked by another thread, the calling thread will block (wait) until the lock is released.
  2. lock.release(): When the thread is done with the critical section, it releases the lock. This allows another waiting thread to acquire it.

The best and safest way to use a lock is with the with statement, as it automatically releases the lock when the block is exited, even if an error occurs.

Python线程互斥锁如何避免死锁?-图3
(图片来源网络,侵删)

Example 2: Using a Lock to Fix the Race Condition

Now, let's wrap the critical section (shared_counter += 1) in a with lock: block.

import threading
shared_counter = 0
COUNT = 100000
# Create a Lock object
lock = threading.Lock()
def worker_with_lock():
    """A function that increments the shared counter safely using a lock."""
    global shared_counter
    for _ in range(COUNT):
        with lock:  # Acquire the lock, execute, then release it automatically
            shared_counter += 1
# Create and start 10 threads
threads = []
for i in range(10):
    thread = threading.Thread(target=worker_with_lock)
    threads.append(thread)
    thread.start()
# Wait for all threads to complete
for thread in threads:
    thread.join()
print(f"Final counter value: {shared_counter}")
# Output: Final counter value: 1000000

Now, the result is always correct! The lock ensures that only one thread can execute shared_counter += 1 at any given moment, preventing the race condition.


Important Concepts and Best Practices

Deadlock

A deadlock is a situation where two or more threads are blocked forever, each waiting for the other to release a resource.

Classic Example: The Dining Philosophers Imagine two threads, Thread A and Thread B.

  1. Thread A acquires Lock 1.
  2. Thread B acquires Lock 2.
  3. Thread A now tries to acquire Lock 2 (but it's held by Thread B), so it blocks.
  4. Thread B now tries to acquire Lock 1 (but it's held by Thread A), so it blocks.

Now both threads are waiting for each other, and the program is stuck.

How to Avoid Deadlocks:

  • Lock in a consistent order: Always acquire locks in the same order across all threads.
  • Use try...finally: If you can't use with, ensure you always release the lock in a finally block.
  • Use RLock (Reentrant Lock): A thread can re-acquire a lock it already holds. This is useful if a function that needs a lock calls another function that also needs the same lock.

RLock (Reentrant Lock)

An RLock allows a thread to acquire the same lock multiple times without causing a deadlock. The lock is only released when the thread has called release() the same number of times it called acquire().

import threading
lock = threading.RLock()
def outer_function():
    with lock:
        print("Outer function acquired the lock")
        inner_function()
def inner_function():
    with lock:
        print("Inner function acquired the lock")
# This will work without a deadlock
outer_function()

Performance Considerations

Locks are powerful but can slow down your program.

  • Lock Contention: If many threads are frequently trying to acquire the same lock, they will spend most of their time waiting, not working. This is called "contention."
  • Granularity: Be careful not to make your critical sections too large. If you lock a large block of code, you might be unnecessarily blocking other threads that could be working on unrelated parts of the data. Strive for fine-grained locking.

Summary: Lock vs. RLock

Feature threading.Lock threading.RLock (Reentrant Lock)
Basic Use Protects a shared resource from concurrent access. Protects a resource, but allows the same thread to re-acquire it.
Re-entry No. If a thread tries to acquire a lock it already holds, it will deadlock. Yes. A thread can acquire the lock multiple times.
Release Requires one release() for each acquire(). Requires one release() for each acquire(). The lock is only released when the count reaches zero.
When to Use Most common cases. When a thread just needs exclusive access to a block of code. When a function that is already protected by a lock calls another function that also needs to use the same lock.

Final Example: A Thread-Safe Queue

Here's a more practical example of a thread-safe queue using a Lock and a Condition (which itself uses a lock internally).

import threading
import time
import random
class ThreadSafeQueue:
    def __init__(self, max_size=5):
        self.items = []
        self.max_size = max_size
        self.lock = threading.Lock()
        # A Condition variable is useful for waiting for a specific state
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
    def put(self, item):
        with self.not_full: # Use the 'not_full' condition
            while len(self.items) >= self.max_size:
                print(f"Queue full. {threading.current_thread().name} waiting...")
                self.not_full.wait() # Wait until the queue is not full
            self.items.append(item)
            print(f"{threading.current_thread().name} put: {item}")
            self.not_empty.notify() # Notify a waiting consumer that the queue is not empty
    def get(self):
        with self.not_empty: # Use the 'not_empty' condition
            while not self.items:
                print(f"Queue empty. {threading.current_thread().name} waiting...")
                self.not_empty.wait() # Wait until the queue is not empty
            item = self.items.pop(0)
            print(f"{threading.current_thread().name} got: {item}")
            self.not_full.notify() # Notify a waiting producer that the queue is not full
            return item
# --- Producer and Consumer Threads ---
queue = ThreadSafeQueue()
def producer():
    for i in range(10):
        time.sleep(random.uniform(0.1, 0.5))
        queue.put(f"Item-{i}")
def consumer():
    for _ in range(10):
        time.sleep(random.uniform(0.2, 0.7))
        item = queue.get()
        # Simulate work
        time.sleep(random.uniform(0.1, 0.3))
p1 = threading.Thread(target=producer, name="Producer-1")
c1 = threading.Thread(target=consumer, name="Consumer-1")
p2 = threading.Thread(target=producer, name="Producer-2")
c2 = threading.Thread(target=consumer, name="Consumer-2")
p1.start(); c1.start(); p2.start(); c2.start()
p1.join(); c1.join(); p2.join(); c2.join()
print("All threads finished.")

This example shows how locks and conditions work together to coordinate threads safely, preventing race conditions and managing wait states efficiently.

分享:
扫描分享到社交APP
上一篇
下一篇