Download multithread de arquivos com script python ftp

Por que isso é necessário?

Uma vez tive a tarefa de copiar um grande número de arquivos de um servidor ftp. Foi necessário fazer um backup. Parece que poderia ser mais fácil! Mas, infelizmente, nada pronto para funcionar tão rapidamente para minhas condições não foi encontrado.





Situação

Era necessário buscar periodicamente algumas centenas de arquivos do servidor ftp no Windows. Muitas pequenas coisas e alguns arquivos muito grandes. Um total de cerca de 500 GB. O servidor é um vps localizado bem longe no exterior. Durante o dia o carro fica muito carregado, no início da noite é feita a manutenção de rotina, no total são 5 horas para download no máximo.





Nenhum dos utilitários que analisei foi capaz de lidar com a eficiência e no tempo previsto. Bem, não há para onde ir, um sistema de backup normal ainda não foi comprado, o que significa que nos armamos com um editor ou IDE Python e pronto! Aventura!





Config

Colocaremos todos os parâmetros do script em um arquivo separado por conveniência.





Modelo de configuração:





host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder'  # ,        
max_threads = 20 #     
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
      
      



.py . , :





if __name__ == "__main__":
    host = config.host
    user = config.user
    passwd = config.passwd
    basepath = config.basepath  # ,        
    max_threads = config.max_threads
    log_path = config.log_path
    statusfilepath = config.statusfilepath
    main()
      
      



ftp , , , ftp- . , , - .





. , , , -.





- ftp:





class MyFtp (ftplib.FTP):
    """  ,        """
    def __init__(self):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.timeout = 1800
        super(MyFtp, self).__init__()

    def connect(self):
        super(MyFtp, self).connect(self.host, timeout=self.timeout)

    def login(self):
        super(MyFtp, self).login(user=self.user, passwd=self.passwd)

    def quit(self):
        super(MyFtp,self).quit()
      
      



. ftplib, .





:





class FileList:
    """      """
    def __init__(self):
        self.ftp = None
        self.file_list = []

    def connect_ftp(self):
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()

    def get_list(self, name):
        """       ftp-."""
        import os
        for dirname in self.ftp.mlsd(str(name), facts=["type"]):
            if dirname[1]["type"] == "file":
                entry_file_list = {}
                entry_file_list['remote_path'] = name  #  
                entry_file_list['filename'] = dirname[0]  # 
                self.file_list.append(entry_file_list)
            else:
                path = os.path.join(name, dirname[0])
                self.get_list(path)

    def get_next_file(self):
        return self.file_list.pop()

    def len(self):
        return len(self.file_list)
      
      



, , , , , .





logging. , .





class MyLogger:
    """   """
    def __init__(self):
        self.logger = None

    def start_file_logging(self, logger_name, log_path):
        """   """
        import logging
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = logging.FileHandler(log_path)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
        """     """
        import logging
        from logging.handlers import RotatingFileHandler
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def add(self, msg):
        self.logger.info(str(msg))

    def add_error(self, msg):
        self.logger.error(str(msg))

      
      



, .





. , :





class BaseFileDownload(threading.Thread):
    """     """
    count = 0

    def __init__(self, rpath, filename, log):
        threading.Thread.__init__(self)
        self.remote_path = rpath
        self.filename = filename
        self.ftp = None
        self.command = None
        self.currentpath = None
        self.log = log
        self.__class__.count += 1 #     

    def __del__(self):
        self.__class__.count -= 1

    def connect(self):
        """    ftp"""
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()


    def run(self):
        """  """
        import os
        self.connect()
        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
        self.currentpath = os.path.join(basepath, self.remote_path[3:])
        self.ftp.cwd(self.remote_path)
        if not os.path.exists(self.currentpath):
            os.makedirs(self.currentpath, exist_ok=True)
        self.host_file = os.path.join(self.currentpath, self.filename)
        try:
            with open(self.host_file, 'wb') as local_file:
                self.log.add("Start downloading " + self.filename)
                self.ftp.retrbinary(self.command + self.filename, local_file.write)
                self.log.add("Downloading " + self.filename + " complete")
        except ftplib.error_perm:
            self.log.add_error('Perm error')
        self.ftp.quit()
      
      



count. : , , , .





run - threading ( !), .





, os.makedirs.





-

. zabbix, , - , .





A classe para trabalhar com esses arquivos se parece com esta:





class StatusFile:
    """          ."""
    def __init__(self):
        self.msg = ''

    def setstatus(self, msg):
        global statusfilepath
        with open(statusfilepath, 'w') as status_file:
            status_file.write(msg)

      
      



Multithreading

E, finalmente, a própria função de script principal, que funciona com fluxos de download:





def main():
    import os
    import datetime
    import time

    log = MyLogger()
    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #  
    now = datetime.datetime.today().strftime("%Y%m%d")
    global basepath
    basepath = os.path.join(basepath, now)  #  ,   
    list_file = FileList()
    list_file.connect_ftp()
    list_file.get_list("..")
    for i in range(list_file.len()):
        flag = True
        while flag:   #         
            if BaseFileDownload.count < max_threads:
                curfile = list_file.get_next_file()
                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
                threadid.start()
                flag = False
            else:
                time.sleep(20)
    log.add("Downloading files complete")
    statusfile = StatusFile()
    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
      
      



Aqui começamos o registro, obtemos uma lista de arquivos (ela é armazenada na memória).





Em um loop while eterno, verificamos o número de downloads em execução simultânea e, se necessário, iniciamos threads adicionais.





O código-fonte completo pode ser encontrado aqui .












All Articles