python多线程下载的实现代码

发布时间:2020-09-09编辑:脚本学堂
本文分享一例python代码,使用python的多线程下载文件,学习下python中urllib2、socket、os等模块的用法,有需要的朋友作个参考。

本节主要内容:
掌握python多线程下载的方法。

代码:
 

复制代码 代码示例:

#!/bin/python
#
#site: www.jb200.com
import os
import urllib2
import time
import multiprocessing.dummy as multiprocessing
import string
from random import choice
import socket
from ctypes import c_int
import tempfile

import dummy
from logger import log

"Smart Downloading Module. Written by Itay Brandes."

shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

def DownloadFile(url, path, startByte=0, endByte=None, ShowProgress=True):
    '''
    Function downloads file.
    @param url: File url address.
    @param path: Destination file path.
    @param startByte: Start byte.
    @param endByte: End byte. Will work only if server supports HTTPRange headers.
    @param ShowProgress: If true, shows textual progress bar.

    @return path: Destination file path.
    '''
    url = url.replace(' ', '%20')
    headers = {}
    if endByte is not None:
        headers['Range'] = 'bytes=%d-%d' % (startByte,endByte)
    req = urllib2.Request(url, headers=headers)

    try:
        urlObj = urllib2.urlopen(req, timeout=4)
    except urllib2.HTTPError, e:
        if "HTTP Error 416" in str(e):
            # HTTP 416 Error: Requested Range Not Satisfiable. Happens when we ask
            # for a range that is not available on the server. It will happen when
            # the server will try to send us a .html page that means something like
            # "you opened too many connections to our server". If this happens, we
            # will wait for the other threads to finish their connections and try again.

            log.warning("Thread didn't got the file it was expecting. Retrying...")
            time.sleep(5)
            return DownloadFile(url, path, startByte, endByte, ShowProgress)
        else:
            raise e

    f = open(path, 'wb')
    meta = urlObj.info()
    try:
        filesize = int(meta.getheaders("Content-Length")[0])
    except IndexError:
        log.warning("Server did not send Content-Length.")
        ShowProgress=False

    filesize_dl = 0
    block_sz = 8192
    while True:
        try:
            buff = urlObj.read(block_sz)
        except (socket.timeout, socket.error, urllib2.HTTPError), e:
            dummy.shared_bytes_var.value -= filesize_dl
            raise e

        if not buff:
            break

        filesize_dl += len(buff)
        try:
            dummy.shared_bytes_var.value += len(buff)
        except AttributeError:
            pass
        f.write(buff)

        if ShowProgress:
            status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (filesize_dl / 1024.0 / 1024.0,
                    filesize / 1024.0 / 1024.0, progress_bar(1.0*filesize_dl/filesize),
                    filesize_dl * 100.0 / filesize)
            status += chr(8)*(len(status)+1)
            print status,
    if ShowProgress:
        print "n"

    f.close()
    return path

def DownloadFile_Parall(url, path=None, processes=6,
                            minChunkFile=1024**2, nonBlocking=False):
    '''
    Function downloads file parally.
    @param url: File url address.
    @param path: Destination file path.
    @param processes: Number of processes to use in the pool.
    @param minChunkFile: Minimum chunk file in bytes.
    @param nonBlocking: If true, returns (mapObj, pool). A list of file parts will be returned
                        from the mapObj results, and the developer must join them himself.
                        Developer also must close and join the pool.

    @return mapObj: Only if nonBlocking is True. A multiprocessing.pool.AsyncResult object.
    @return pool: Only if nonBlocking is True. A multiprocessing.pool object.
    '''
    from HTTPQuery import Is_ServerSupportHTTPRange
    global shared_bytes_var
    shared_bytes_var.value = 0
    url = url.replace(' ', '%20')

    if not path:
        path = get_rand_filename(os.environ['temp'])
        if not os.path.exists(os.path.dirname(path)):
            os.makedirs(os.path.dirname(path))
    log.debug("Downloading to %s..." % path)

    urlObj = urllib2.urlopen(url)
    meta = urlObj.info()
    filesize = int(meta.getheaders("Content-Length")[0])

    if filesize/processes > minChunkFile and Is_ServerSupportHTTPRange(url):
        args = []
        pos = 0
        chunk = filesize/processes
        for i in range(processes):
            startByte = pos
            endByte = pos + chunk
            if endByte > filesize-1:
                endByte = filesize-1
            args.append([url, path+".%.3d" % i, startByte, endByte, False])
            pos += chunk+1
    else:
        args = [[url, path+".000", None, None, False]]

    log.debug("Running %d processes..." % processes)
    pool = multiprocessing.Pool(processes, initializer=_initProcess,initargs=(shared_bytes_var,))
    mapObj = pool.map_async(lambda x: DownloadFile(*x) , args)
    if nonBlocking:
        return mapObj, pool
    while not mapObj.ready():
        status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (shared_bytes_var.value / 1024.0 / 1024.0,
                filesize / 1024.0 / 1024.0, progress_bar(1.0*shared_bytes_var.value/filesize),
                shared_bytes_var.value * 100.0 / filesize)
        status = status + chr(8)*(len(status)+1)
        print status,
        time.sleep(0.1)

    file_parts = mapObj.get()
    pool.terminate()
    pool.join()

    combine_files(file_parts, path)

def combine_files(parts, path):
    '''
    Function combines file parts.
    @param parts: List of file paths.
    @param path: Destination path.
    '''
    with open(path,'wb') as output:
        for part in parts:
            with open(part,'rb') as f:
                output.writelines(f.readlines())
            os.remove(part)

def progress_bar(progress, length=20):
    '''
    Function creates a textual progress bar.
    @param progress: Float number between 0 and 1 describes the progress.
    @param length: The length of the progress bar in chars. Default is 20.
    '''
    length -= 2 # The bracket are 2 chars long.
    return "[" + "#"*int(progress*length) + "-"*(length-int(progress*length)) + "]"

def get_rand_filename(dir_=os.getcwd()):
    "Function returns a non-existent random filename."
    return tempfile.mkstemp('.tmp', '', dir_)[1]

def _initProcess(x):
  dummy.shared_bytes_var = x