[~] Renaming

[+] _create
This commit is contained in:
Maxim Khomutov 2024-07-18 02:53:20 +03:00
parent 03610dc117
commit 83c18c3f2c

View File

@ -11,6 +11,7 @@ import time
from datetime import datetime from datetime import datetime
from getpass import getpass from getpass import getpass
from pathlib import Path from pathlib import Path
from typing import Callable
from cryptography.fernet import Fernet, InvalidToken from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -18,10 +19,17 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from easydict import EasyDict as edict from easydict import EasyDict as edict
from loguru import logger from loguru import logger
from netmiko import ConnectHandler
from pyVim.connect import SmartConnect, Disconnect from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim from pyVmomi import vim
from ruamel.yaml import YAML from ruamel.yaml import YAML
try:
from pyVmomi.vim import ServiceInstanceContent, ServiceInstance
except ImportError:
pass
chars = string.ascii_letters + string.digits + string.punctuation
yaml = YAML() yaml = YAML()
workdir = Path.home() / ".vmker" workdir = Path.home() / ".vmker"
os.makedirs(workdir / "logs", exist_ok=True) os.makedirs(workdir / "logs", exist_ok=True)
@ -35,8 +43,8 @@ logger.debug("Logger initialized")
class VMKer: class VMKer:
def __init__(self): def __init__(self):
self.si = None self.si: ServiceInstance = None
self.content = None self.content: ServiceInstanceContent = None
self.config = None self.config = None
@staticmethod @staticmethod
@ -74,13 +82,13 @@ class VMKer:
fernet = Fernet(key) fernet = Fernet(key)
decrypted_text = fernet.decrypt(encrypted_text).decode() decrypted_text = fernet.decrypt(encrypted_text).decode()
if n: if n:
logger.success("Password decrypted.") logger.debug("Password decrypted.")
return decrypted_text return decrypted_text
else: else:
return passphrase return passphrase
except InvalidToken: except InvalidToken:
logger.error("Bad passphrase.") logger.error("Bad passphrase.")
logger.info("Use 'vmkey init' for drop it.") logger.info(f"Use for drop it: {sys.argv[0]} init")
exit(1) exit(1)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@ -89,7 +97,7 @@ class VMKer:
def _load_config(self): def _load_config(self):
cf = workdir / "config.json" cf = workdir / "config.json"
if not cf.exists(): if not cf.exists():
logger.info("Try 'vmkey init' before using program.") logger.info(f"Init before using program: {sys.argv[0]} init")
exit(0) exit(0)
with open(workdir / "config.json", "r") as f: with open(workdir / "config.json", "r") as f:
self.config = json.load(f) self.config = json.load(f)
@ -97,26 +105,26 @@ class VMKer:
def _connect(self): def _connect(self):
try: try:
cfg = edict(self.config) cfg = edict(self.config)
self.si = SmartConnect(host=cfg.server, user=cfg.user, pwd=self._decrypt_pass(), self.si = SmartConnect(host=cfg.server, user=cfg.user, pwd=self._decrypt_pass(),
sslContext=ssl._create_unverified_context()) disableSslCertValidation=cfg.insecure)
atexit.register(Disconnect, self.si) atexit.register(Disconnect, self.si)
self.content = self.si.RetrieveContent() self.content = self.si.RetrieveContent()
logger.success(f"Connected to {self.content.about.fullName}({cfg.server}) as {cfg.user}") logger.success(f"Connected to {self.content.about.fullName}({cfg.server}) as {cfg.user}")
return True
except vim.fault.VimFault as e: except vim.fault.VimFault as e:
logger.error(f"Error while connecting: {e.msg}") # noinspection PyUnresolvedReferences
logger.error(f"ESXI Error: {e.msg}")
except ssl.SSLCertVerificationError as e: except ssl.SSLCertVerificationError as e:
logger.error(f"SSL Error: {e}") logger.error(f"SSL Error: {e}")
except TimeoutError as e: except TimeoutError as e:
logger.error(f"TimeoutError: {e}") logger.error(f"TimeoutError: {e}")
return False
def _parse_yml(self): def _edit(self, _first_init=False):
pass self._load_config()
def configure(self, _first_init=False):
logger.info("Entered to configuration mode.") logger.info("Entered to configuration mode.")
def _get_arg(name, default, required, _i=input): def _get_arg(name, default, required, _i: Callable = input):
while True: while True:
i = _i(f"Enter {name}:{f' [{default}]' if default else ''} ") i = _i(f"Enter {name}:{f' [{default}]' if default else ''} ")
if not i and not default and required: if not i and not default and required:
@ -136,8 +144,10 @@ class VMKer:
if _first_init: if _first_init:
c = { c = {
"server": _get_arg("serverIP", None, True), "user": _get_arg("user", None, True), "server": _get_arg("serverIP", None, True),
"password": self._encrypt_pass(_get_arg("password", None, True, getpass)), "insecure": True, "user": _get_arg("user", None, True),
"password": self._encrypt_pass(_get_arg("password", None, True, getpass)),
"insecure": True,
"datacenter": _get_arg('datacenter', 'ha-datacente', False), "datacenter": _get_arg('datacenter', 'ha-datacente', False),
"datastore": _get_arg('datastore', 'datastore1', False), "datastore": _get_arg('datastore', 'datastore1', False),
"resource_pool": _get_arg('resource pool', 'null', False) "resource_pool": _get_arg('resource pool', 'null', False)
@ -168,10 +178,8 @@ class VMKer:
return return
else: else:
logger.error("Bad data.") logger.error("Bad data.")
logger.info("Type help for edit configuration.")
else:
logger.info("Type help for more information.")
logger.info("Type help for more information.")
edited = False edited = False
while True: while True:
i = input("> ").split(" ") i = input("> ").split(" ")
@ -232,22 +240,129 @@ class VMKer:
print("Unknown command.") print("Unknown command.")
logger.info("Exited from configuration mode.") logger.info("Exited from configuration mode.")
def _create(self): def _parse_yml(self):
pass pass
def _search_vm(self, _template_name, _new_name, datacenter):
logger.debug("Checking VMs.")
vm = None
new = None
container_view = self.content.viewManager.CreateContainerView(datacenter.vmFolder, [vim.VirtualMachine], True)
for _vm in container_view.view:
if _vm.name == _template_name:
vm = _vm
if _vm.name == _new_name:
new = _vm
container_view.Destroy()
return vm, new
def _create(self):
if len(sys.argv) > 3:
self._load_config()
if not self._connect():
return
logger.info("Loading...")
datacenter = [dc for dc in self.content.rootFolder.childEntity if dc.name == self.config['datacenter']]
if not datacenter:
return logger.error(f"Datacenter not found: {self.config['datacenter']}")
datacenter = datacenter[0]
logger.debug("datacenter found")
new_name = sys.argv[3]
template, _new = self._search_vm(f"{sys.argv[2]}-template", new_name, datacenter)
if _new:
return logger.error(f"Error: {new_name!r} already exist.")
if not template:
return logger.error(f"Template not found: {sys.argv[2]}-template")
logger.debug("template found")
if self.config['resource_pool']:
resource_pool = \
[rp for rp in datacenter.hostFolder.childEntity[0].resourcePool.resourcePool if
rp.name == self.config['resource_pool']]
if not resource_pool:
return logger.error(f"Resource pool not found: {self.config['resource_pool']}")
resource_pool = resource_pool[0]
logger.debug("resource_pool found")
else:
resource_pool = datacenter.hostFolder.childEntity[0].resourcePool
logger.debug("Use default resource_pool")
datastore = [ds for ds in datacenter.datastore if ds.name == self.config['datastore']]
if not datastore:
return logger.error(f"Datastore not found: {self.config['datastore']}")
datastore = datastore[0]
logger.debug("datastore found")
logger.info(f"Cloning: {sys.argv[2]!r} > {new_name}")
destfolder = datacenter.vmFolder
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.datastore = datastore
relocate_spec.pool = resource_pool
clone_spec = vim.vm.CloneSpec()
clone_spec.location = relocate_spec
clone_spec.powerOn = True
task = template.Clone(name=new_name, folder=destfolder, spec=clone_spec)
while task.info.state == vim.TaskInfo.State.running:
time.sleep(1)
if task.info.state != vim.TaskInfo.State.success:
return logger.error(f"Error while cloning: {task.info.error}")
logger.success("Cloned")
vm = task.info.result
ipv4 = []
ipv6 = []
for nic in vm.guest.net:
if nic.ipConfig:
for ip in nic.ipConfig.ipAddress:
if ":" not in ip.ipAddress: # Skip IPv6 addresses
ipv4.append(ip.ipAddress)
else:
ipv6.append(ip.ipAddress)
logger.info(f"IPv4: {ipv4}")
logger.info(f"IPv6: {ipv6}")
if len(ipv4) == 0:
return logger.error("No IPv4 address found.")
# TODO: length
_pwd1 = ''.join(random.choice(chars) for _ in range(12))
_pwd2 = ''.join(random.choice(chars) for _ in range(12))
# TODO: Default user
ssh = ConnectHandler(
device_type='linux',
host=ipv4[0],
username="root",
password="toor",
)
ssh.send_command(f'echo -e "{_pwd1}\n{_pwd1}" | passwd root')
ssh.send_command(f'echo -e "{_pwd2}\n{_pwd2}" | passwd user')
logger.info(f"Password for root: {_pwd1}")
logger.info(f"Password for user: {_pwd2}")
# TODO: yml
logger.info("Executing %.yml")
ssh.disconnect()
else:
logger.info(
f"Usage: {sys.argv[0]} create <template_name> <new_name> [commands.yml | commands1.yml commands2.yml | ...]")
logger.info("Mark: template search on target host with '-template' suffix (%template_name%-template)")
def main(self): def main(self):
logger.debug(f"{sys.argv=}")
try: try:
if len(sys.argv) > 1: if len(sys.argv) > 1:
match sys.argv[1]: match sys.argv[1]:
case "init": case "init":
self.configure(True) self._edit(True)
case "edit": case "edit":
self._load_config() self._edit()
self.configure()
case "create": case "create":
self._load_config()
self._create() self._create()
logger.warning("WIP")
case "help": case "help":
print("HELP") print("HELP")
case _: case _:
@ -256,51 +371,6 @@ class VMKer:
logger.info("Exited by KeyboardInterrupt") logger.info("Exited by KeyboardInterrupt")
def clone_vm(si, template_name, vm_name, datacenter_name, datastore_name, resource_pool_name):
content = si.RetrieveContent()
template = None
for datacenter in content.rootFolder.childEntity:
if datacenter.name == datacenter_name:
view_manager = content.viewManager
container_view = view_manager.CreateContainerView(datacenter.vmFolder, [vim.VirtualMachine], True)
for vm in container_view.view:
if vm.name == template_name:
template = vm
break
container_view.Destroy()
break
if not template:
raise Exception("Template not found")
datacenter = [dc for dc in content.rootFolder.childEntity if dc.name == datacenter_name][0]
destfolder = datacenter.vmFolder
if resource_pool_name:
resource_pool = \
[rp for rp in datacenter.hostFolder.childEntity[0].resourcePool.resourcePool if
rp.name == resource_pool_name][
0]
else:
resource_pool = datacenter.hostFolder.childEntity[0].resourcePool
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.datastore = [ds for ds in datacenter.datastore if ds.name == datastore_name][0]
relocate_spec.pool = resource_pool
clone_spec = vim.vm.CloneSpec()
clone_spec.location = relocate_spec
clone_spec.powerOn = True
task = template.Clone(name=vm_name, folder=destfolder, spec=clone_spec)
while task.info.state == vim.TaskInfo.State.running:
time.sleep(1)
if task.info.state != vim.TaskInfo.State.success:
raise task.info.error
return task.info.result
def generate_password(length=12): def generate_password(length=12):
chars = string.ascii_letters + string.digits + string.punctuation chars = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(chars) for _ in range(length)) return ''.join(random.choice(chars) for _ in range(length))