This commit is contained in:
Maxim Khomutov 2024-07-17 18:42:01 +03:00
parent 97d1829fbe
commit 9ba18da3d6

319
src/main.py Normal file
View File

@ -0,0 +1,319 @@
import atexit
import base64
import json
import os
import random
import ssl
import string
import sys
import textwrap
import time
from datetime import datetime
from getpass import getpass
from pathlib import Path
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from easydict import EasyDict as edict
from loguru import logger
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
from ruamel.yaml import YAML
yaml = YAML()
workdir = Path.home() / ".vmker"
os.makedirs(workdir / "logs", exist_ok=True)
logger.remove()
logger.add(sys.stdout, level="INFO", backtrace=False, diagnose=False,
format="\r<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | {message}")
logger.add(workdir / "logs" / f"debug-{datetime.now().strftime("%d.%m.%Y_%H-%M.%S")}.log", level=0)
logger.debug("Logger initialized")
class VMKer:
def __init__(self):
self.si = None
self.content = None
self.config = None
@staticmethod
def generate_key_from_password(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def _encrypt_pass(self, password, n=True, passphrase=None) -> str:
logger.debug("_encrypt_pass")
if n:
logger.info("Passphrase needed for crypt your password.")
if not passphrase:
passphrase = getpass("Passphrase: ")
salt = os.urandom(16) # Генерация случайной соли
key = self.generate_key_from_password(passphrase, salt)
fernet = Fernet(key)
encrypted_text = fernet.encrypt(password.encode())
return base64.urlsafe_b64encode(salt + encrypted_text).decode()
def _decrypt_pass(self, n=True) -> str:
try:
logger.debug("_decrypt_pass")
passphrase = getpass("Passphrase: ")
encrypted_data = base64.urlsafe_b64decode(self.config['password'].encode())
salt = encrypted_data[:16] # Извлечение соли из зашифрованного текста
encrypted_text = encrypted_data[16:]
key = self.generate_key_from_password(passphrase, salt)
fernet = Fernet(key)
decrypted_text = fernet.decrypt(encrypted_text).decode()
if n:
logger.success("Password decrypted.")
return decrypted_text
else:
return passphrase
except InvalidToken:
logger.error("Bad passphrase.")
logger.info("Use 'vmkey init' for drop it.")
exit(1)
except Exception as e:
logger.exception(e)
exit(1)
def _load_config(self):
cf = workdir / "config.json"
if not cf.exists():
logger.info("Try 'vmkey init' before using program.")
exit(0)
with open(workdir / "config.json", "r") as f:
self.config = json.load(f)
def _connect(self):
try:
cfg = edict(self.config)
self.si = SmartConnect(host=cfg.server, user=cfg.user, pwd=self._decrypt_pass(),
sslContext=ssl._create_unverified_context())
atexit.register(Disconnect, self.si)
self.content = self.si.RetrieveContent()
logger.success(f"Connected to {self.content.about.fullName}({cfg.server}) as {cfg.user}")
except vim.fault.VimFault as e:
logger.error(f"Error while connecting: {e.msg}")
except ssl.SSLCertVerificationError as e:
logger.error(f"SSL Error: {e}")
except TimeoutError as e:
logger.error(f"TimeoutError: {e}")
def _parse_yml(self):
pass
def configure(self, _first_init=False):
logger.info("Entered to configuration mode.")
def _get_arg(name, default, required, _i=input):
while True:
i = _i(f"Enter {name}:{f' [{default}]' if default else ''} ")
if not i and not default and required:
print(f"Error: {name} required")
continue
if not i:
i = default
if i == "null":
i = None
return i
def _save():
logger.info(f"Saving: {(workdir / "config.json").resolve()};")
self.config = c
with open(workdir / "config.json", "w") as f:
json.dump(c, f)
if _first_init:
c = {
"server": _get_arg("serverIP", None, True), "user": _get_arg("user", None, True),
"password": self._encrypt_pass(_get_arg("password", None, True, getpass)), "insecure": True,
"datacenter": _get_arg('datacenter', 'ha-datacente', False),
"datastore": _get_arg('datastore', 'datastore1', False),
"resource_pool": _get_arg('resource pool', 'null', False)
}
sec = input("Is SSL trusted? [Y/n] ").lower()
if sec == "y":
c['insecure'] = False
else:
logger.warning("Insecure mode.")
_save()
c = self.config
conf = lambda: textwrap.dedent(f"""\
Server configuration:
Server: {c['server']!r}
User: {c['user']!r}
Password: encrypted.
SSL Secure: {not c['insecure']}
Datacenter: {c['datacenter']!r}
Datastore: {c['datastore']!r}
Resource Pool: {c['resource_pool']!r}""")
logger.info(f"\n{conf()}")
if _first_init:
self._connect()
if self.content:
logger.info("Exited from configuration mode.")
return
else:
logger.error("Bad data.")
logger.info("Type help for edit configuration.")
else:
logger.info("Type help for more information.")
edited = False
while True:
i = input("> ").split(" ")
logger.debug(f"> {i}")
if len(i) == 0:
continue
match i[0]:
case "help":
print("help - Print that message.\n"
"work - Print working configuration.\n"
"edit - Edit configuration.\n"
"test - Test configuration (connect to the server).\n"
"exit - Exit.")
case "decrypt":
print(f"Your password: {self._decrypt_pass()}")
case "work":
print(conf())
case "edit":
if len(i) == 3:
edited = True
data = i[2]
match i[1]:
case "server":
c['server'] = data
case "port":
c['port'] = data
case "user":
c['user'] = data
case "password":
c['password'] = self._encrypt_pass(data, False, self._decrypt_pass(False))
case "ssl":
if data not in ['on', 'off']:
print("Available values: on, off")
else:
c['insecure'] = data == "off"
case "datacenter":
c['datacenter'] = data
case "datastore":
c['datastore'] = data
case "respool":
c['resource_pool'] = None if data.lower() == "null" else data
case _:
print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] <new value>")
else:
print("Incorrect syntax.")
print("Usage: edit [server|port|password|ssl|datacenter|datastore|respool] <new value>")
case "test":
try:
self._connect()
except Exception as e:
logger.exception(e)
case "exit":
if edited:
print(conf())
_save()
break
case _:
print("Unknown command.")
logger.info("Exited from configuration mode.")
def _create(self):
pass
def main(self):
try:
if len(sys.argv) > 1:
match sys.argv[1]:
case "init":
self.configure(True)
case "edit":
self._load_config()
self.configure()
case "create":
self._load_config()
self._create()
logger.warning("WIP")
case "help":
print("HELP")
case _:
logger.warning("Unknown command.")
except KeyboardInterrupt:
logger.info("Exited by KeyboardInterrupt")
def clone_vm(si, template_name, vm_name, datacenter_name, datastore_name, resource_pool_name):
content = si.RetrieveContent()
template = None
for datacenter in content.rootFolder.childEntity:
if datacenter.name == datacenter_name:
view_manager = content.viewManager
container_view = view_manager.CreateContainerView(datacenter.vmFolder, [vim.VirtualMachine], True)
for vm in container_view.view:
if vm.name == template_name:
template = vm
break
container_view.Destroy()
break
if not template:
raise Exception("Template not found")
datacenter = [dc for dc in content.rootFolder.childEntity if dc.name == datacenter_name][0]
destfolder = datacenter.vmFolder
if resource_pool_name:
resource_pool = \
[rp for rp in datacenter.hostFolder.childEntity[0].resourcePool.resourcePool if
rp.name == resource_pool_name][
0]
else:
resource_pool = datacenter.hostFolder.childEntity[0].resourcePool
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.datastore = [ds for ds in datacenter.datastore if ds.name == datastore_name][0]
relocate_spec.pool = resource_pool
clone_spec = vim.vm.CloneSpec()
clone_spec.location = relocate_spec
clone_spec.powerOn = True
task = template.Clone(name=vm_name, folder=destfolder, spec=clone_spec)
while task.info.state == vim.TaskInfo.State.running:
time.sleep(1)
if task.info.state != vim.TaskInfo.State.success:
raise task.info.error
return task.info.result
def generate_password(length=12):
chars = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(chars) for _ in range(length))
def get_vm_ip(vm):
for nic in vm.guest.net:
if nic.ipConfig:
for ip in nic.ipConfig.ipAddress:
if ":" not in ip.ipAddress: # Skip IPv6 addresses
return ip.ipAddress
return None
if __name__ == "__main__":
VMKer().main()