diff --git a/README.md b/README.md new file mode 100644 index 0000000..5f3e3de --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ +# pywinmutex + +A simple Python library to create and manage Windows mutexes. + +## Installation + +You can install the library using pip: + +```bash +pip install pywinmutex +``` + +## Usage + +### Basic Example +```python +from pywinmutex import WindowsMutex + +mutex = WindowsMutex("anidev/pywinmutex/simple", True) # Name may be any string +mutex.timeout = 2500 # Set a timeout of 2.5 seconds + +with mutex: + print(f"[I] Mutex({mutex}) acquired.") + input("Enter to release the mutex and exit> ") + +print(f"[I] Mutex({mutex}) released. Exiting...") +``` + +### Legacy +```python +from pywinmutex import WindowsMutex + +mutex = WindowsMutex("anidev/pywinmutex/acquire", True) # Name may be any string + +if not mutex.acquire(5000): # Acquire the mutex with a timeout of 5 seconds; None for no timeout + print(f"[W] Mutex({mutex}) already exists or acquire timeout exceeded.") + exit(1) + +# Do some work while holding the mutex + +print(f"[I] Mutex({mutex}) acquired.") +input("Enter to release the mutex and exit> ") + +# Release the mutex +mutex.release() +print(f"[I] Mutex({mutex}) released. Exiting...") +``` + +## License + +This project is licensed under the `MIT License`. See the [LICENSE](LICENSE) file for details. diff --git a/pywinmutex/__init__.py b/pywinmutex/__init__.py new file mode 100644 index 0000000..61623ea --- /dev/null +++ b/pywinmutex/__init__.py @@ -0,0 +1,135 @@ +import threading + +import win32event +import win32api +import winerror +import atexit + +class WindowsMutex: + def __init__(self, name, multiuser=False, timeout=None): + """ + Initialize the WindowsMutex object. + :param name: Name of the mutex. + :param multiuser: If True, create a local mutex; if False, create a global mutex (Shared across all users). + """ + self.__multiuser = multiuser + self._friendly_name = name + __global_prefix = "Global\\" if multiuser else "" + self._mutex_name = f"{__global_prefix}{name}" + self._handler = None + self._exist = False + self._th_lock = threading.Lock() + self.timeout = timeout + atexit.register(self.release) + + def __try_to_lock(self): + """ + Try to acquire the mutex. + :return: True if the mutex was acquired, False otherwise. + """ + with self._th_lock: # Thread-safe access + handler = win32event.CreateMutex(None, False, self._mutex_name) + if handler is None: + raise RuntimeError("Failed to create mutex.") + exist = win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS + if not exist: + win32api.CloseHandle(handler) + return exist + + def reinint(self, name, multiuser=None): + """ + Reinitialize the mutex with a new name. + :param name: New name for the mutex. + :param local: If True, create a local mutex; if False, create a global mutex. + """ + self.release() + if multiuser is None: + multiuser = self.__multiuser + self.__init__(name, multiuser) + + @property + def settings(self): + """ + Get the settings of the mutex. + :return: A dictionary containing the mutex name and existence status. + """ + return { + "name": self._mutex_name, + "exist": self.exist + } + + @property + def exist(self): + """ + Check if the mutex already exists. + :return: True if the mutex exists, False otherwise. + """ + if self._handler is None: + return self.__try_to_lock() + return self._exist + + def _lock(self): + """ + Try to lock the mutex. + :return: True if acquired, False otherwise. + """ + with self._th_lock: # Thread-safe access + if self._handler is None: # Создаём мутекс только один раз + self._handler = win32event.CreateMutex(None, False, self._mutex_name) + self._exist = win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS + return not self._exist # True, если удалось захватить мутекс + return False # Если мутекс уже создан, значит, его кто-то держит + + def acquire(self, timeout=None, __raw_result=False): + """ + Wait until the mutex is acquired. + :param timeout: Timeout in milliseconds. If None, waits indefinitely. + :param __raw_result: If True, return the raw result of WaitForSingleObject. + :return: True if acquired, False if timeout. + """ + if self._handler is None: + self._lock() + + wait_time = timeout if timeout is not None else win32event.INFINITE + result = win32event.WaitForSingleObject(self._handler, wait_time) + + if result != win32event.WAIT_OBJECT_0: + self._handler = None + self._exist = False + + if __raw_result: + return result + + return result == win32event.WAIT_OBJECT_0 + + def release(self): + """ + Release the mutex. + :return: None + """ + with self._th_lock: # Thread-safe access + if self._handler: + win32event.ReleaseMutex(self._handler) + win32api.CloseHandle(self._handler) # Закрываем _handler! + self._handler = None + self._exist = False + + def __enter__(self): + reason = self.acquire(self.timeout, True) + if reason == win32event.WAIT_OBJECT_0: + return self + elif reason == win32event.WAIT_TIMEOUT: + raise TimeoutError("Mutex acquisition timed out.") + elif reason == win32event.WAIT_ABANDONED: + raise RuntimeError("Mutex was abandoned.") + else: + raise RuntimeError("Failed to acquire mutex.") + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def __str__(self): + return f"{self!r}" + + def __repr__(self): + return f""