杰瑞科技汇

python threading下载

核心思想

单线程下载文件时,程序会按顺序一个字节一个字节地读取并写入,直到文件下载完成,这效率较低,尤其是在网络延迟较高或服务器限速的情况下。

python threading下载-图1
(图片来源网络,侵删)

多线程下载的核心思想是 “分而治之”

  1. 获取文件总大小:向服务器发送一个 HTTP 请求,只获取文件的头信息(HEAD 请求),从而知道文件的总大小(100MB)。
  2. 分割任务:根据你想要启动的线程数(4 个),将文件总大小分成几份,每个线程负责下载文件的一个特定部分。
    • 线程 1:下载 0MB - 25MB
    • 线程 2:下载 25MB - 50MB
    • 线程 3:下载 50MB - 75MB
    • 线程 4:下载 75MB - 100MB
  3. 并发下载:创建多个线程,每个线程负责下载自己分配到的那个文件片段,所有线程同时运行,从服务器的不同位置下载数据。
  4. 合并文件:当所有线程都下载完成后,将所有下载好的文件片段按照正确的顺序合并成一个完整的文件。

实现步骤与代码

下面是一个完整的多线程下载器实现,包含了详细的注释。

第 1 步:获取文件大小

我们需要使用 requests 库来发送 HEAD 请求,这比下载整个文件要快得多。

import requests
def get_file_size(url):
    """获取文件的总大小"""
    try:
        # 发送 HEAD 请求,只获取头部信息,不下载文件内容
        response = requests.head(url, allow_redirects=True, timeout=10)
        # 检查响应状态码
        response.raise_for_status()
        # 从响应头中获取 Content-Length,即文件大小(字节)
        file_size = int(response.headers.get('Content-Length', 0))
        return file_size
    except requests.exceptions.RequestException as e:
        print(f"获取文件大小失败: {e}")
        return 0

第 2 步:创建下载线程

这是核心部分,我们将创建一个 DownloadThread 类,每个实例代表一个下载线程。

python threading下载-图2
(图片来源网络,侵删)
import threading
import os
import requests
class DownloadThread(threading.Thread):
    def __init__(self, url, start_byte, end_byte, file_name, part_num):
        """
        初始化下载线程
        :param url: 文件URL
        :param start_byte: 该线程下载的起始字节位置
        :param end_byte: 该线程下载的结束字节位置
        :param file_name: 最终要保存的文件名
        :param part_num: 该线程的编号,用于生成临时文件名
        """
        super().__init__()
        self.url = url
        self.start_byte = start_byte
        self.end_byte = end_byte
        self.file_name = file_name
        self.part_num = part_num
        self.temp_file_name = f"{file_name}.part{part_num}"
        self.downloaded_bytes = 0
    def run(self):
        """线程执行的核心下载逻辑"""
        print(f"线程 {self.part_num} 开始下载,范围: {self.start_byte} - {self.end_byte}")
        try:
            # 使用 'Range' 请求头告诉服务器我们要下载文件的哪个部分
            headers = {'Range': f'bytes={self.start_byte}-{self.end_byte}'}
            response = requests.get(self.url, headers=headers, stream=True, timeout=30)
            response.raise_for_status()
            # 以二进制写入模式打开临时文件
            with open(self.temp_file_name, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192): # 8KB的块
                    if chunk: # 过滤掉保持连接的新块
                        f.write(chunk)
                        self.downloaded_bytes += len(chunk)
            print(f"线程 {self.part_num} 下载完成!")
        except requests.exceptions.RequestException as e:
            print(f"线程 {self.part_num} 下载出错: {e}")

第 3 步:合并文件

所有线程下载完成后,我们需要一个函数来将这些临时文件片段合并成一个完整的文件。

def merge_files(file_name, num_threads):
    """
    合并所有下载好的文件片段
    :param file_name: 最终要保存的文件名
    :param num_threads: 线程数量
    """
    print("开始合并文件...")
    with open(file_name, 'wb') as final_file:
        for i in range(1, num_threads + 1):
            part_file_name = f"{file_name}.part{i}"
            if os.path.exists(part_file_name):
                with open(part_file_name, 'rb') as part_file:
                    final_file.write(part_file.read())
                # 删除临时文件
                os.remove(part_file_name)
                print(f"已合并并删除 {part_file_name}")
    print(f"文件 {file_name} 合并完成!")

第 4 步:主函数(协调整个流程)

我们写一个主函数来调用上述所有功能。

def multi_threaded_download(url, file_name=None, num_threads=4):
    """
    启动多线程下载
    :param url: 文件URL
    :param file_name: 保存的文件名,如果为None则从URL中提取
    :param num_threads: 下载线程数
    """
    if not file_name:
        # 从URL中提取文件名
        file_name = url.split('/')[-1]
        if not file_name:
            file_name = "downloaded_file"
    # 1. 获取文件大小
    file_size = get_file_size(url)
    if file_size == 0:
        print("无法获取文件大小,退出下载。")
        return
    print(f"文件总大小: {file_size / (1024 * 1024):.2f} MB")
    print(f"将使用 {num_threads} 个线程进行下载...")
    # 2. 计算每个线程下载的区间
    chunk_size = file_size // num_threads
    threads = []
    for i in range(1, num_threads + 1):
        start_byte = (i - 1) * chunk_size
        # 最后一个线程下载剩余的所有字节
        end_byte = file_size - 1 if i == num_threads else i * chunk_size - 1
        thread = DownloadThread(url, start_byte, end_byte, file_name, i)
        threads.append(thread)
        thread.start()
    # 3. 等待所有线程下载完成
    for thread in threads:
        thread.join()
    # 4. 合并文件
    merge_files(file_name, num_threads)
# --- 使用示例 ---
if __name__ == "__main__":
    # 请替换成一个有效的、支持 Range 请求的文件下载链接
    # GitHub 上的文件、一些大文件托管服务等通常都支持
    download_url = "https://raw.githubusercontent.com/psf/requests/main/README.rst"
    # 运行下载器
    multi_threaded_download(download_url, file_name="requests_readme.rst", num_threads=4)

代码解析与注意事项

  1. requests.head(): 这是获取文件大小的关键,它只获取响应头,不下载文件体,非常高效。
  2. Range 请求头: 这是实现断点续传和多线程下载的核心 HTTP 头。bytes=start-end 告诉服务器我们只需要从 startend 的字节范围,服务器在响应中会返回 206 Partial Content 状态码。
  3. stream=True: 在 requests.get() 中设置 stream=True 可以让我们以流的方式接收数据,避免一次性将整个文件加载到内存中,这对于大文件至关重要。
  4. iter_content(chunk_size=8192): 这是处理流数据的推荐方式,它会将数据分割成固定大小的块(这里是 8KB),逐块处理,内存占用非常低。
  5. 临时文件: 每个线程下载自己的部分到独立的临时文件(如 file.part1, file.part2),避免了多线程同时写入同一个文件时可能出现的混乱和竞争条件。
  6. thread.join(): 主线程调用 join() 会等待所有子线程执行完毕,这是确保所有下载任务完成后再进行合并操作的关键。
  7. 线程数不是越多越好:
    • 服务器限制: 很多服务器对来自同一 IP 的并发连接数有限制,如果线程数太多,可能会被服务器拒绝(返回 429 Too Many Requests)或导致连接变慢。
    • 网络带宽: 你的网络带宽是有限的,过多的线程会产生大量的网络开销(如 TCP 握手、HTTP 头等),反而可能降低整体下载速度。
    • 推荐: 4 到 8 个线程是一个比较合理的范围,可以根据实际情况调整。

如何改进和扩展?

  • 断点续传: 可以在程序启动时检查是否存在临时文件片段,如果存在,则读取已下载的大小,调整 Range 请求头,从断点处继续下载。
  • 下载进度条: 使用 tqdm 库可以轻松地为每个线程或总下载进度添加一个美观的进度条。
  • 更优雅的线程管理: 使用 concurrent.futures.ThreadPoolExecutor 可以更简洁地管理线程池,代码会更现代化。
  • 错误重试机制: 在 DownloadThreadrun 方法中加入循环,当下载失败时等待一段时间后进行重试。
  • GUI 界面: 可以使用 PyQtTkinter 等库为下载器创建图形用户界面,让用户可以输入 URL、选择保存路径和设置线程数。

希望这个详细的教程能帮助你理解并使用 Python threading 模块进行多线程下载!

分享:
扫描分享到社交APP
上一篇
下一篇