nextcloud_docker_scripts/models.py

266 lines
11 KiB
Python

import datetime
import os
import stat
import tarfile
import traceback
import shutil
from colorama import Fore, Style
from pathlib import Path
import utils
from utils import _print
class Container:
def __init__(self, name, password, app_container, db_container, backup_dir, number_of_backups) -> None:
self.__datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
self.name = name
self.__password = password
self.app_container = app_container
self.db_container = db_container
self.backup_dir = backup_dir
self.tmp_dir = os.path.join(backup_dir, 'tmp')
self.number_of_backups = number_of_backups
self.__dump_file = self.name + '_' + self.__datetime + '.sql'
self.__dump_file_path = os.path.join(self.tmp_dir, self.__dump_file)
self.__tar_file = self.name + '_' + self.__datetime + '.tar'
self.__tar_file_path = os.path.join(self.tmp_dir, self.__tar_file)
self.tar_gz_file = self.__tar_file + '.gz'
self.tar_gz_file_path = os.path.join(self.backup_dir, self.tar_gz_file)
self.exceptions = {}
self.SUCCESS = F"{Fore.GREEN}success{Style.RESET_ALL}"
self.FAILED = F"{Fore.RED}failed{Style.RESET_ALL}"
global quiet_mode
# Create backup dir if it does not yet exist
def __create_backup_dir(self):
if not os.path.isdir(self.backup_dir):
try:
os.makedirs(self.backup_dir)
_print(F"{Fore.GREEN}Backup folder created under: {self.backup_dir}{Style.RESET_ALL}")
return True
except:
_print(F"{Fore.RED}Could not backup tmp folder under {self.backup_dir}{Style.RESET_ALL}")
self.exceptions.update({'create_backup_dir': traceback.format_exc()})
return False
else:
return True
# Create tmp dir
def __create_tmp_dir(self) -> bool:
if not os.path.isdir(self.tmp_dir):
try:
os.mkdir(self.tmp_dir)
return os.path.isdir(self.tmp_dir)
except:
_print(F"{Fore.RED}Could not create tmp folder{Style.RESET_ALL}")
self.exceptions.update({'create_tmp_dir': traceback.format_exc()})
return False
else:
if self.__delete_tmp_dir():
return self.__create_tmp_dir()
# Delete tmp dir
def __delete_tmp_dir(self) -> bool:
try:
shutil.rmtree(self.tmp_dir)
return not os.path.isdir(self.tmp_dir)
except:
_print(F"{Fore.RED}Could not delete old tmp folder{Style.RESET_ALL}")
self.exceptions.update({'delete_tmp_dir': traceback.format_exc()})
return False
# Dump database
def __dump_db(self) -> bool:
try:
os.system(
F"docker exec {self.db_container} mysqldump --password={self.__password} --all-databases > {self.__dump_file_path}")
status = os.path.isfile(self.__dump_file_path)
_print(F"Dump Nextcloud database: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Dump Nextcloud database: {self.FAILED}")
self.exceptions.update({'__dump_db': traceback.format_exc()})
return False
# Import database
def __import_db(self) -> bool:
try:
os.system(
F"docker exec {self.db_container} mysql -u root --password={self.__password} < {self.__dump_file_path}")
status = True # TODO: Implement test if database import was successful
_print(F"Import Nextcloud database: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Import Nextcloud database: {self.FAILED}")
self.exceptions.update({'__import_db': traceback.format_exc()})
return False
# Tar config folder within container and copy it into backup folder
def __export_config(self) -> bool:
try:
os.system(F"docker exec {self.app_container} tar -cf config.tar config/")
os.system(F"docker cp {self.app_container}:/var/www/html/config.tar {self.__tar_file_path}")
os.system(F"docker exec {self.app_container} rm config.tar")
if os.path.isfile(self.__tar_file_path):
with tarfile.open(self.__tar_file_path, 'r') as tarball:
tarball.extractall(self.tmp_dir)
status = os.path.isdir(os.path.join(self.tmp_dir, "config"))
_print(F"Export Nextcloud configuration: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Export Nextcloud configuration: {self.FAILED}")
self.exceptions.update({'__export_config': traceback.format_exc()})
return False
# Tar config folder within container and copy it into backup folder
def __import_config(self) -> bool:
try:
if os.path.isdir(os.path.join(self.tmp_dir, "config")):
with tarfile.open(self.__tar_file_path, 'w') as tarball:
tarball.add(os.path.join(self.tmp_dir, "config"), arcname="/config")
os.system(F"docker cp {self.__tar_file_path} {self.app_container}:/var/www/html/config.tar")
os.system(F"docker exec {self.app_container} rm -r config")
os.system(F"docker exec {self.app_container} tar -xf config.tar")
status = True # TODO: implement a test if export into docker container was successful
_print(F"Import Nextcloud configuration: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Import Nextcloud configuration: {self.FAILED}")
self.exceptions.update({'__import_config': traceback.format_exc()})
return False
# Tar database with config and settings
def __tar_backup(self) -> bool:
try:
with tarfile.open(self.tar_gz_file_path, 'w:gz') as tarball:
tarball.add(self.__dump_file_path, arcname=self.__dump_file)
tarball.add(os.path.join(self.tmp_dir, "config"), arcname="/config")
status = True # TODO: Implement a test to confirm that files where added to tar file
_print(F"Zip backup: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Zip backup: {self.FAILED}")
self.exceptions.update({'__tar_db': traceback.format_exc()})
return False
# Untar backup
def __untar_backup(self, backup_file_path) -> bool:
try:
with tarfile.open(backup_file_path, 'r:gz') as tarball:
tarball.extractall(self.tmp_dir)
status = os.path.isdir(os.path.join(self.tmp_dir, "config")) and os.path.isfile(self.__dump_file_path)
_print(F"Unzip backup: {self.SUCCESS if status else self.FAILED}")
return status
except:
_print(F"Unzip backup: {self.FAILED}")
self.exceptions.update({'__untar_db': traceback.format_exc()})
return False
# Set secure file permissions
def __set_file_permissions(self) -> bool:
try:
os.chmod(self.tar_gz_file_path, stat.S_IREAD)
status = oct(os.stat(self.tar_gz_file_path).st_mode)[-3:] == '400'
_print(F"Set secure file permissions: {self.SUCCESS if True else self.FAILED}")
return status
except:
_print(F"Set secure file permissions: {self.FAILED}")
self.exceptions.update({'__set_file_permissions': traceback.format_exc()})
return False
# Create backup and return file size in MB or False if it failed
def create_backup(self):
if self.__create_backup_dir() and self.__create_tmp_dir():
try:
step_status = [
self.__dump_db(),
self.__export_config(),
self.__tar_backup(),
self.__set_file_permissions(),
self.__delete_tmp_dir()
]
for step in step_status:
if not step:
return False
return round(Path(self.tar_gz_file_path).stat().st_size / 1000000, 2)
except:
self.exceptions.update({'backup': traceback.format_exc()})
return False
else:
_print(F"{Fore.RED}Backup aborted.{Style.RESET_ALL}")
def restore_backup(self, backup_file_path):
if self.__create_tmp_dir():
try:
step_status = [
self.__untar_backup(backup_file_path),
#self.__import_config(),
#self.__import_db(),
#self.__delete_tmp_dir()
]
for step in step_status:
if not step:
return False
return round(Path(self.tar_gz_file_path).stat().st_size / 1000000, 2)
except:
self.exceptions.update({'restore': traceback.format_exc()})
return False
@staticmethod
def instantiate_containers(data: dict) -> dict:
containers = {}
for name, values in data['nextcloud_containers'].items():
containers.update({name: Container(
name,
values['mysql_root_password'],
values['app_container'],
values['db_container'],
values['backup_dir'],
values['number_of_backups'])
})
return containers
class Log:
def __init__(self, log_dir):
self.log_dir = log_dir
self.__log_file = 'nextcloud_docker_scripts.log'
self.__log_file_path = os.path.join(self.log_dir, self.__log_file)
self.exceptions = {}
# Create log entry
def log(self, message):
dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = dt + " ; " + message + "\n"
if self.__check_log_dir():
try:
with open(self.__log_file_path, "a+") as log_file:
log_file.writelines(entry)
return True
except:
_print(F"{Fore.RED}Could not write to log file: {self.__log_file}{Style.RESET_ALL}")
self.exceptions.update({'log': traceback.format_exc()})
return False
# Try to create log directory if it does not yet exists
def __check_log_dir(self) -> bool:
if not os.path.isdir(self.log_dir):
try:
os.makedirs(self.log_dir)
return True
except:
_print(F"{Fore.RED}Could not create log directory under: {self.log_dir}{Style.RESET_ALL}")
self.exceptions.update({'get_log_file': traceback.format_exc()})
return False
else:
return True
if __name__ == '__main__':
pass