vmker/src/main.py
2024-07-24 11:16:02 +03:00

535 lines
20 KiB
Python

import atexit
import base64
import json
import os
import random
import ssl
import string
import sys
import textwrap
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from getpass import getpass
from pathlib import Path
from typing import Callable
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from loguru import logger
from netmiko import ConnectHandler, NetmikoAuthenticationException
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
from ruamel.yaml import YAML
timeout = 120
chars = string.ascii_letters + string.digits
yaml = YAML()
workdir = Path.home() / ".vmker"
logdir = workdir / "logs"
os.makedirs(logdir / "by-new_name", exist_ok=True)
logger.remove()
logger.add(logdir / f"debug-{datetime.now().strftime('%d.%m.%Y_%H-%M.%S')}.log", level=0)
_argv = sys.argv[1:]
args = {}
i = 0
while i < len(_argv):
arg = _argv[i]
if arg.startswith('-'):
key = arg[1:]
if i + 1 < len(_argv) and not _argv[i + 1].startswith('-'):
if not args.get(key):
args[key] = []
args[key].append(_argv[i + 1])
i += 1
else:
logger.warning(f"Ignored invalid parameter: {key}")
i += 1
if args.get("debug"):
logger.add(sys.stdout, level=0, backtrace=False, diagnose=False,
format="\r<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | {message}")
else:
logger.add(sys.stdout, level="INFO", backtrace=False, diagnose=False,
format="\r<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | {message}")
logger.debug("Logger initialized")
class VMKer:
def __init__(self):
self.si = None
self.content = None
self.config = None
self.vm_list = set()
self.template = None
self.cmds = {}
@staticmethod
def generate_key_from_password(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def _encrypt_pass(self, password, n=True, passphrase=None) -> str:
logger.debug("_encrypt_pass")
if n:
logger.info("Passphrase needed for crypt your password.")
if not passphrase:
passphrase = getpass("Passphrase: ")
salt = os.urandom(16) # Генерация случайной соли
key = self.generate_key_from_password(passphrase, salt)
fernet = Fernet(key)
encrypted_text = fernet.encrypt(password.encode())
return base64.urlsafe_b64encode(salt + encrypted_text).decode()
def _decrypt_pass(self, n=True) -> str:
try:
logger.debug("_decrypt_pass")
passphrase = getpass("Passphrase: ")
encrypted_data = base64.urlsafe_b64decode(self.config['password'].encode())
salt = encrypted_data[:16] # Извлечение соли из зашифрованного текста
encrypted_text = encrypted_data[16:]
key = self.generate_key_from_password(passphrase, salt)
fernet = Fernet(key)
decrypted_text = fernet.decrypt(encrypted_text).decode()
if n:
logger.debug("Password decrypted.")
return decrypted_text
else:
return passphrase
except InvalidToken:
logger.error("Bad passphrase.")
logger.info(f"Use for drop it: {sys.argv[0]} init")
exit(1)
except Exception as e:
logger.exception(e)
exit(1)
def _load_config(self):
cf = workdir / "config.json"
if args.get('c'):
cf = Path().resolve() / args['c'][0]
logger.debug(f"using config file: {cf}")
if not cf.exists():
logger.info(f"Init before using program: {sys.argv[0]} init")
exit(0)
with open(cf, "r") as f:
self.config = json.load(f)
def _connect(self):
try:
cfg = self.config
self.si = SmartConnect(host=cfg['server'], user=cfg['user'], pwd=self._decrypt_pass(),
disableSslCertValidation=cfg['insecure'])
atexit.register(Disconnect, self.si)
self.content = self.si.RetrieveContent()
logger.success(f"Connected to {self.content.about.fullName}({cfg['server']}) as {cfg['user']}")
return True
except vim.fault.VimFault as e:
# noinspection PyUnresolvedReferences
logger.error(f"ESXI Error: {e.msg}")
except ssl.SSLCertVerificationError as e:
logger.error(f"SSL Error: {e}")
except TimeoutError as e:
logger.error(f"TimeoutError: {e}")
return False
def _console(self, _first_init=False):
self._load_config()
logger.info("Entered to configuration mode.")
def _get_arg(name, default, required, _i: Callable = input):
while True:
i = _i(f"Enter {name}:{f' [{default}]' if default else ''} ")
if not i and not default and required:
print(f"Error: {name} required")
continue
if not i:
i = default
if i == "null":
i = None
return i
def _save():
logger.info(f"Saving: {(workdir / 'config.json').resolve()};")
self.config = c
with open(workdir / "config.json", "w") as f:
json.dump(c, f)
if _first_init:
c = {
"server": _get_arg("serverIP", None, True),
"user": _get_arg("user", None, True),
"password": self._encrypt_pass(_get_arg("password", None, True, getpass)),
"insecure": True,
"datacenter": _get_arg('datacenter', 'ha-datacente', False),
"datastore": _get_arg('datastore', 'datastore1', False),
"resource_pool": _get_arg('resource pool', 'null', False)
}
sec = input("Is SSL trusted? [Y/n] ").lower()
if sec == "y":
c['insecure'] = False
else:
logger.warning("Insecure mode.")
_save()
c = self.config
conf = lambda: textwrap.dedent(f"""\
Server configuration:
Server: {c['server']!r}
User: {c['user']!r}
Password: encrypted.
SSL Secure: {not c['insecure']}
Datacenter: {c['datacenter']!r}
Datastore: {c['datastore']!r}
Resource Pool: {c['resource_pool']!r}""")
logger.info(f"\n{conf()}")
if _first_init:
self._connect()
if self.content:
logger.info("Exited from configuration mode.")
return
else:
logger.error("Bad data.")
logger.info("Type help for more information.")
edited = False
while True:
i = input("> ").split(" ")
logger.debug(f"> {i}")
if len(i) == 0:
continue
match i[0]:
case "help":
print("help - Print that message.\n"
"work - Print working configuration.\n"
"edit - Edit configuration.\n"
"test - Test configuration (connect to the server).\n"
"exit - Exit.")
case "decrypt":
print(f"Your password: {self._decrypt_pass()}")
case "work":
print(conf())
case "edit":
if len(i) == 3:
edited = True
data = i[2]
match i[1]:
case "server":
c['server'] = data
case "port":
c['port'] = data
case "user":
c['user'] = data
case "password":
c['password'] = self._encrypt_pass(data, False, self._decrypt_pass(False))
case "ssl":
if data not in ['on', 'off']:
print("Available values: on, off")
else:
c['insecure'] = data == "off"
case "datacenter":
c['datacenter'] = data
case "datastore":
c['datastore'] = data
case "respool":
c['resource_pool'] = None if data.lower() == "null" else data
case _:
print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] <new value>")
else:
print("Incorrect syntax.")
print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] <new value>")
case "test":
self._load_yml(True)
try:
self._connect()
except Exception as e:
logger.exception(e)
case "exit":
if edited:
print(conf())
_save()
break
case _:
print("Unknown command.")
logger.info("Exited from configuration mode.")
@staticmethod
def _get_vm_ip(vm):
ipv4 = []
ipv6 = []
for nic in vm.guest.net:
if nic.ipConfig:
for ip in nic.ipConfig.ipAddress:
if ":" not in ip.ipAddress: # Skip IPv6 addresses
ipv4.append(ip.ipAddress)
else:
ipv6.append(ip.ipAddress)
return ipv4, ipv6
def _load_yml(self, test=False):
_f = args.get("f")
if not _f:
return
for f in _f:
try:
f = Path(f)
if not f.exists():
return logger.error(f"[YML] File not exist: {f}")
if f.is_dir():
return logger.error(f"[YML] Not a file: {f}")
if f.suffix != ".yml":
return logger.error(f"[YML] File not YAML: {f}")
y = yaml.load(f)
self.cmds[f] = y
if test:
s = f"{f}:"
br = y.get("before-reboot")
if br:
s += "\n Before-Reboot:"
for n, c in br.items():
s += f"\n {n}:"
for _c in c:
s += f"\n {_c}"
ar = y.get("after-reboot")
if ar:
s += "\n After-Reboot:"
for n, c in ar.items():
s += f"\n {n}:"
for _c in c:
s += f"\n {_c}"
logger.debug(s)
print(s)
except Exception as e:
logger.exception(e)
def _execute_yml(self, name, ssh, _pre=True):
for f, y in self.cmds.items():
try:
if _pre:
y = y.get('before-reboot')
logger.info(f"[{name}] [YML] Executing {f.name}..")
if not y:
return logger.debug(f"[{name}] [YML] Block 'before-reboot' not found. Skipping..")
for n, c in y.items():
logger.info(f"[{name}] [YML] {n}. Steps: {len(c)}.")
for i, cmd in enumerate(c, 1):
logger.debug(f"executing: {cmd}")
logger.debug(ssh.send_command(cmd))
logger.info(f"[{name}] [YML] {n}: {i}/{len(c)}")
else:
y = y.get('after-reboot')
if not y:
return logger.debug(f"[{name}] [YML] Block 'after-reboot' not found. Skipping..")
for n, c in y.items():
logger.info(f"[{name}] [YML] {n}. Steps: {len(c)}.")
for i, cmd in enumerate(c, 1):
logger.debug(f"executing: {cmd}")
logger.info(f"[{name}] < {cmd}")
_ssh_output = ssh.send_command(cmd)
logger.debug(_ssh_output)
logger.info(f"[{name}] > {_ssh_output}")
# logger.info(f"[{name}] [YML] {n}: {i}/{len(c)}")
except Exception as e:
logger.exception(e)
def _clone(self, new_name, destfolder, clone_spec):
logger.info(f"[{new_name}] Cloning from {self.template.name!r}")
task = self.template.Clone(name=new_name, folder=destfolder, spec=clone_spec)
while task.info.state in [vim.TaskInfo.State.running, vim.TaskInfo.State.queued]:
time.sleep(1)
if task.info.state != vim.TaskInfo.State.success:
logger.error(f"Error while cloning")
logger.exception(task.info.error)
logger.success(f"[{new_name}] Cloned")
return task.info.result
def _configure(self, vm):
_name = vm.name
# TODO: Default user
# TODO: key file
user = 'root'
key_file = 'rsa'
logger.info(f"[{_name}] Waiting VirtualMachine..")
t = 0
while True:
ipv4, ipv6 = self._get_vm_ip(vm)
if len(ipv4) > 0:
break
time.sleep(0.99)
t += 1
if t > timeout:
return logger.error(f"[{_name}] No IPv4 address found.")
logger.info(f"[{_name}] IPv4: {', '.join(ipv4)}")
logger.info(f"[{_name}] IPv6: {', '.join(ipv6)}")
_ssh_connect = {
'device_type': 'linux',
'host': ipv4[0],
'username': user,
'use_keys': True,
'key_file': key_file
}
try:
ssh = ConnectHandler(**_ssh_connect)
logger.success(f"[{_name}] [SSH] Connected to {ipv4[0]} as {user} ({key_file=})")
_pwd = ''.join(random.choice(chars) for _ in range(12))
ssh.send_command(f"echo '{user}:{_pwd}' | chpasswd")
logger.info(f"[{_name}] Password for root: {_pwd}")
ssh.send_command(f'echo {_name} > /etc/hostname')
logger.info(f"[{_name}] [YML] Before-Reboot.")
self._execute_yml(_name, ssh)
ssh.disconnect()
vm.RebootGuest()
logger.info(f"[{_name}] Rebooting")
time.sleep(5)
end_time = time.time() + timeout - 5
while time.time() < end_time:
try:
ssh = ConnectHandler(**_ssh_connect)
break
except Exception:
time.sleep(1)
if not ssh.is_alive():
return logger.error(f"[{_name}] Host startup TimeOut.")
logger.success(f"[{_name}] [SSH] Connected to {ipv4[0]} as {user}")
logger.debug(f'[{_name}] uname {ssh.send_command("uname -a")}')
logger.info(f"[{_name}] [YML] After-Reboot.")
self._execute_yml(_name, ssh, False)
except NetmikoAuthenticationException as e:
return logger.error(f"[{_name}] [SSH] Error: Authentication. ({e})")
except Exception as e:
logger.error(f"[{_name}] [SSH] Unhandled error ")
return logger.exception(e)
def _cache_vms(self, template_name, datacenter):
logger.debug("Checking VMs.")
self.vm_list = set()
self.template = None
container_view = self.content.viewManager.CreateContainerView(datacenter.vmFolder, [vim.VirtualMachine], True)
for _vm in container_view.view:
if _vm.name == template_name:
self.template = _vm
self.vm_list.add(_vm.name)
container_view.Destroy()
logger.debug(f"{self.vm_list=}")
logger.debug(f"{self.template=}")
def _create(self):
if len(sys.argv) > 3:
new_name = sys.argv[3]
template_name = f"{sys.argv[2]}-template"
_logfile = logdir / "by-new_name" / f"{new_name}.log"
i = 0
while True:
i += 1
if not _logfile.exists():
break
else:
_logfile = logdir / "by-new_name" / f"{new_name} ({i}).log"
logger.add(_logfile, level=0)
self._load_config()
if not self._connect():
return
logger.info("Loading...")
self._load_yml()
datacenter = [dc for dc in self.content.rootFolder.childEntity if dc.name == self.config['datacenter']]
if not datacenter:
return logger.error(f"Datacenter not found: {self.config['datacenter']}")
datacenter = datacenter[0]
logger.debug("datacenter found")
self._cache_vms(template_name, datacenter)
if args.get("n"):
new_names = [f'{new_name}-{i + 1}' for i in range(int(args['n'][0]))]
else:
new_names = [new_name]
for n in new_names:
if n in self.vm_list:
return logger.error(f"Error: {n!r} already exist.")
if not self.template:
return logger.error(f"Template: {template_name} - not found")
logger.debug("template found")
if self.config['resource_pool']:
resource_pool = \
[rp for rp in datacenter.hostFolder.childEntity[0].resourcePool.resourcePool if
rp.name == self.config['resource_pool']]
if not resource_pool:
return logger.error(f"Resource pool not found: {self.config['resource_pool']}")
resource_pool = resource_pool[0]
logger.debug("resource_pool found")
else:
resource_pool = datacenter.hostFolder.childEntity[0].resourcePool
logger.debug("Use default resource_pool")
datastore = [ds for ds in datacenter.datastore if ds.name == self.config['datastore']]
if not datastore:
return logger.error(f"Datastore not found: {self.config['datastore']}")
datastore = datastore[0]
logger.debug("datastore found")
destfolder = datacenter.vmFolder
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.datastore = datastore
relocate_spec.pool = resource_pool
clone_spec = vim.vm.CloneSpec()
clone_spec.location = relocate_spec
clone_spec.powerOn = True
def _caf(nn):
vm = self._clone(nn, destfolder, clone_spec)
self._configure(vm)
with ThreadPoolExecutor(max_workers=2) as executor:
for new_name in new_names:
executor.submit(_caf, new_name)
else:
logger.info(
f"Usage: {sys.argv[0]} create template_name new_name [-n VMs count] [[-f <file with commands>] ...]")
logger.info("Mark: template search on target host with '-template' suffix (%template_name%-template)")
def main(self):
logger.debug(f"{sys.argv=}")
logger.debug(f"{args=}")
try:
if len(sys.argv) > 1:
match sys.argv[1]:
case "init":
self._console(True)
case "console":
self._console()
case "create":
self._create()
case _:
print(f"Usage: {sys.argv[0]} [init|edit|create]")
else:
print(f"Usage: {sys.argv[0]} [init|edit|create]")
except KeyboardInterrupt:
logger.info("Exited by KeyboardInterrupt")
if __name__ == "__main__":
VMKer().main()