diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..5ae00c6 --- /dev/null +++ b/src/main.py @@ -0,0 +1,319 @@ +import atexit +import base64 +import json +import os +import random +import ssl +import string +import sys +import textwrap +import time +from datetime import datetime +from getpass import getpass +from pathlib import Path + +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from easydict import EasyDict as edict +from loguru import logger +from pyVim.connect import SmartConnect, Disconnect +from pyVmomi import vim +from ruamel.yaml import YAML + +yaml = YAML() +workdir = Path.home() / ".vmker" +os.makedirs(workdir / "logs", exist_ok=True) +logger.remove() +logger.add(sys.stdout, level="INFO", backtrace=False, diagnose=False, + format="\r{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {message}") +logger.add(workdir / "logs" / f"debug-{datetime.now().strftime("%d.%m.%Y_%H-%M.%S")}.log", level=0) +logger.debug("Logger initialized") + + +class VMKer: + + def __init__(self): + self.si = None + self.content = None + self.config = None + + @staticmethod + def generate_key_from_password(password: str, salt: bytes) -> bytes: + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + backend=default_backend() + ) + key = base64.urlsafe_b64encode(kdf.derive(password.encode())) + return key + + def _encrypt_pass(self, password, n=True, passphrase=None) -> str: + logger.debug("_encrypt_pass") + if n: + logger.info("Passphrase needed for crypt your password.") + if not passphrase: + passphrase = getpass("Passphrase: ") + salt = os.urandom(16) # Генерация случайной соли + key = self.generate_key_from_password(passphrase, salt) + fernet = Fernet(key) + encrypted_text = fernet.encrypt(password.encode()) + return base64.urlsafe_b64encode(salt + encrypted_text).decode() + + def _decrypt_pass(self, n=True) -> str: + try: + logger.debug("_decrypt_pass") + passphrase = getpass("Passphrase: ") + encrypted_data = base64.urlsafe_b64decode(self.config['password'].encode()) + salt = encrypted_data[:16] # Извлечение соли из зашифрованного текста + encrypted_text = encrypted_data[16:] + key = self.generate_key_from_password(passphrase, salt) + fernet = Fernet(key) + decrypted_text = fernet.decrypt(encrypted_text).decode() + if n: + logger.success("Password decrypted.") + return decrypted_text + else: + return passphrase + except InvalidToken: + logger.error("Bad passphrase.") + logger.info("Use 'vmkey init' for drop it.") + exit(1) + except Exception as e: + logger.exception(e) + exit(1) + + def _load_config(self): + cf = workdir / "config.json" + if not cf.exists(): + logger.info("Try 'vmkey init' before using program.") + exit(0) + with open(workdir / "config.json", "r") as f: + self.config = json.load(f) + + def _connect(self): + try: + cfg = edict(self.config) + + self.si = SmartConnect(host=cfg.server, user=cfg.user, pwd=self._decrypt_pass(), + sslContext=ssl._create_unverified_context()) + atexit.register(Disconnect, self.si) + self.content = self.si.RetrieveContent() + logger.success(f"Connected to {self.content.about.fullName}({cfg.server}) as {cfg.user}") + except vim.fault.VimFault as e: + logger.error(f"Error while connecting: {e.msg}") + except ssl.SSLCertVerificationError as e: + logger.error(f"SSL Error: {e}") + except TimeoutError as e: + logger.error(f"TimeoutError: {e}") + + def _parse_yml(self): + pass + + def configure(self, _first_init=False): + logger.info("Entered to configuration mode.") + + def _get_arg(name, default, required, _i=input): + while True: + i = _i(f"Enter {name}:{f' [{default}]' if default else ''} ") + if not i and not default and required: + print(f"Error: {name} required") + continue + if not i: + i = default + if i == "null": + i = None + return i + + def _save(): + logger.info(f"Saving: {(workdir / "config.json").resolve()};") + self.config = c + with open(workdir / "config.json", "w") as f: + json.dump(c, f) + + if _first_init: + c = { + "server": _get_arg("serverIP", None, True), "user": _get_arg("user", None, True), + "password": self._encrypt_pass(_get_arg("password", None, True, getpass)), "insecure": True, + "datacenter": _get_arg('datacenter', 'ha-datacente', False), + "datastore": _get_arg('datastore', 'datastore1', False), + "resource_pool": _get_arg('resource pool', 'null', False) + } + + sec = input("Is SSL trusted? [Y/n] ").lower() + if sec == "y": + c['insecure'] = False + else: + logger.warning("Insecure mode.") + _save() + + c = self.config + conf = lambda: textwrap.dedent(f"""\ + Server configuration: + Server: {c['server']!r} + User: {c['user']!r} + Password: encrypted. + SSL Secure: {not c['insecure']} + Datacenter: {c['datacenter']!r} + Datastore: {c['datastore']!r} + Resource Pool: {c['resource_pool']!r}""") + logger.info(f"\n{conf()}") + if _first_init: + self._connect() + if self.content: + logger.info("Exited from configuration mode.") + return + else: + logger.error("Bad data.") + logger.info("Type help for edit configuration.") + else: + logger.info("Type help for more information.") + + edited = False + while True: + i = input("> ").split(" ") + logger.debug(f"> {i}") + if len(i) == 0: + continue + match i[0]: + case "help": + print("help - Print that message.\n" + "work - Print working configuration.\n" + "edit - Edit configuration.\n" + "test - Test configuration (connect to the server).\n" + "exit - Exit.") + case "decrypt": + print(f"Your password: {self._decrypt_pass()}") + case "work": + print(conf()) + case "edit": + if len(i) == 3: + edited = True + data = i[2] + match i[1]: + case "server": + c['server'] = data + case "port": + c['port'] = data + case "user": + c['user'] = data + case "password": + c['password'] = self._encrypt_pass(data, False, self._decrypt_pass(False)) + case "ssl": + if data not in ['on', 'off']: + print("Available values: on, off") + else: + c['insecure'] = data == "off" + case "datacenter": + c['datacenter'] = data + case "datastore": + c['datastore'] = data + case "respool": + c['resource_pool'] = None if data.lower() == "null" else data + case _: + print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] ") + else: + print("Incorrect syntax.") + print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] ") + case "test": + try: + self._connect() + except Exception as e: + logger.exception(e) + case "exit": + if edited: + print(conf()) + _save() + break + case _: + print("Unknown command.") + logger.info("Exited from configuration mode.") + + def _create(self): + pass + + def main(self): + try: + if len(sys.argv) > 1: + match sys.argv[1]: + case "init": + self.configure(True) + case "edit": + self._load_config() + self.configure() + case "create": + self._load_config() + self._create() + logger.warning("WIP") + case "help": + print("HELP") + case _: + logger.warning("Unknown command.") + except KeyboardInterrupt: + logger.info("Exited by KeyboardInterrupt") + + +def clone_vm(si, template_name, vm_name, datacenter_name, datastore_name, resource_pool_name): + content = si.RetrieveContent() + template = None + + for datacenter in content.rootFolder.childEntity: + if datacenter.name == datacenter_name: + view_manager = content.viewManager + container_view = view_manager.CreateContainerView(datacenter.vmFolder, [vim.VirtualMachine], True) + for vm in container_view.view: + if vm.name == template_name: + template = vm + break + container_view.Destroy() + break + + if not template: + raise Exception("Template not found") + + datacenter = [dc for dc in content.rootFolder.childEntity if dc.name == datacenter_name][0] + destfolder = datacenter.vmFolder + if resource_pool_name: + resource_pool = \ + [rp for rp in datacenter.hostFolder.childEntity[0].resourcePool.resourcePool if + rp.name == resource_pool_name][ + 0] + else: + resource_pool = datacenter.hostFolder.childEntity[0].resourcePool + + relocate_spec = vim.vm.RelocateSpec() + relocate_spec.datastore = [ds for ds in datacenter.datastore if ds.name == datastore_name][0] + relocate_spec.pool = resource_pool + + clone_spec = vim.vm.CloneSpec() + clone_spec.location = relocate_spec + clone_spec.powerOn = True + + task = template.Clone(name=vm_name, folder=destfolder, spec=clone_spec) + while task.info.state == vim.TaskInfo.State.running: + time.sleep(1) + + if task.info.state != vim.TaskInfo.State.success: + raise task.info.error + return task.info.result + + +def generate_password(length=12): + chars = string.ascii_letters + string.digits + string.punctuation + return ''.join(random.choice(chars) for _ in range(length)) + + +def get_vm_ip(vm): + for nic in vm.guest.net: + if nic.ipConfig: + for ip in nic.ipConfig.ipAddress: + if ":" not in ip.ipAddress: # Skip IPv6 addresses + return ip.ipAddress + return None + + +if __name__ == "__main__": + VMKer().main() \ No newline at end of file