Por que isso é necessário?
Uma vez tive a tarefa de copiar um grande número de arquivos de um servidor ftp. Foi necessário fazer um backup. Parece que poderia ser mais fácil! Mas, infelizmente, nada pronto para funcionar tão rapidamente para minhas condições não foi encontrado.
Situação
Era necessário buscar periodicamente algumas centenas de arquivos do servidor ftp no Windows. Muitas pequenas coisas e alguns arquivos muito grandes. Um total de cerca de 500 GB. O servidor é um vps localizado bem longe no exterior. Durante o dia o carro fica muito carregado, no início da noite é feita a manutenção de rotina, no total são 5 horas para download no máximo.
Nenhum dos utilitários que analisei foi capaz de lidar com a eficiência e no tempo previsto. Bem, não há para onde ir, um sistema de backup normal ainda não foi comprado, o que significa que nos armamos com um editor ou IDE Python e pronto! Aventura!
Config
Colocaremos todos os parâmetros do script em um arquivo separado por conveniência.
Modelo de configuração:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # ,
max_threads = 20 #
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
.py . , :
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # ,
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
ftp , , , ftp- . , , - .
. , , , -.
- ftp:
class MyFtp (ftplib.FTP):
""" , """
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
. ftplib, .
:
class FileList:
""" """
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
""" ftp-."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #
entry_file_list['filename'] = dirname[0] #
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
, , , , , .
logging. , .
class MyLogger:
""" """
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
""" """
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
""" """
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
, .
. , :
class BaseFileDownload(threading.Thread):
""" """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 #
def __del__(self):
self.__class__.count -= 1
def connect(self):
""" ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
""" """
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
count. : , , , .
run - threading ( !), .
, os.makedirs.
-
. zabbix, , - , .
A classe para trabalhar com esses arquivos se parece com esta:
class StatusFile:
""" ."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
Multithreading
E, finalmente, a própria função de script principal, que funciona com fluxos de download:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # ,
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: #
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Aqui começamos o registro, obtemos uma lista de arquivos (ela é armazenada na memória).
Em um loop while eterno, verificamos o número de downloads em execução simultânea e, se necessário, iniciamos threads adicionais.
O código-fonte completo pode ser encontrado aqui .