Mengapa ini dibutuhkan?
Suatu ketika saya dihadapkan pada tugas menyalin sejumlah besar file dari server ftp. Itu perlu membuat cadangan. Tampaknya itu bisa lebih mudah! Namun sayang, tidak ada yang siap bekerja secepat kondisi saya tidak dapat ditemukan.
Situasi
Diperlukan untuk secara berkala mengambil beberapa ratus file dari server ftp di bawah Windows. Banyak hal kecil dan beberapa file yang sangat besar. Totalnya sekitar 500 GB. Server adalah vps yang terletak cukup jauh di luar negeri. Siang hari mobil penuh, dini hari dilakukan perawatan rutin, total ada 5 jam download paling lama.
Tak satu pun dari utilitas yang telah saya ulas mampu menangani secara efisien dan dalam waktu yang ditentukan. Nah, tidak ada tempat untuk dituju, sistem cadangan normal belum dibeli, yang berarti kami mempersenjatai diri dengan editor atau Python IDE dan pergi! Petualangan!
Config
Kami akan meletakkan semua parameter untuk skrip dalam file terpisah untuk kenyamanan.
Template konfigurasi:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # ,
max_threads = 20 #
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
.py . , :
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # ,
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
ftp , , , ftp- . , , - .
. , , , -.
- ftp:
class MyFtp (ftplib.FTP):
""" , """
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
. ftplib, .
:
class FileList:
""" """
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
""" ftp-."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #
entry_file_list['filename'] = dirname[0] #
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
, , , , , .
logging. , .
class MyLogger:
""" """
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
""" """
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
""" """
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
, .
. , :
class BaseFileDownload(threading.Thread):
""" """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 #
def __del__(self):
self.__class__.count -= 1
def connect(self):
""" ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
""" """
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
count. : , , , .
run - threading ( !), .
, os.makedirs.
-
. zabbix, , - , .
Kelas untuk bekerja dengan file ini terlihat seperti ini:
class StatusFile:
""" ."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
Multithreading
Dan terakhir, skrip utama berfungsi sendiri, yang berfungsi dengan aliran unduhan:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # ,
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: #
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Di sini kita mulai masuk, mendapatkan daftar file (disimpan dalam memori).
Dalam while loop yang kekal, kami memeriksa jumlah unduhan yang berjalan secara bersamaan dan, jika perlu, memulai utas tambahan.
Seluruh kode sumber dapat ditemukan di sini .