A bit rework / refactoring.

This commit is contained in:
Kirill Zhosul
2022-03-16 14:12:44 +04:00
parent 0fa096f9ba
commit 1dc980eef5
5 changed files with 139 additions and 56 deletions

View File

@@ -1,17 +1,21 @@
"""
Gitflic authentication wrapper.
"""
from enum import Enum
import logging
from typing import Union
import webbrowser
import urllib.parse
import logging
import requests
from .exceptions import AuthError, GitflicExceptions
oauth_url = "https://oauth.gitflic.ru/oauth/authorize?scope={}&clientId={}&redirectUrl={}&state={}"
OAUTH_URL = "https://oauth.gitflic.ru/oauth/authorize?scope={}&clientId={}&redirectUrl={}&state={}"
class GitflicAuthScopes(Enum):
""" Authentication scopes from Gitflic. """
USER_READ = "USER_READ"
USER_WRITE = "USER_WRITE"
PROJECT_READ = "PROJECT_READ"
@@ -22,7 +26,10 @@ class GitflicAuthScopes(Enum):
class GitflicAuth:
"""
Gitflic authentication wrapper.
"""
# noinspection PyTypeChecker
def __init__(self,
access_token: str = None,
@@ -31,67 +38,97 @@ class GitflicAuth:
redirect_url: str = None,
state: str = None):
"""
:param access_token:
:param scope:
:param client_id:
:param redirect_url:
:param access_token: Raw token for raw AUTH.
:param scope: OAUTH field.
:param client_id: OAUTH field.
:param redirect_url: OAUTH field.
:param state: OAUTH field.
"""
# Logging.
self.log: logging.Logger = logging.getLogger(__name__)
# Requests.
self.session: requests.Session = requests.Session()
# Auth.
# Token fields.
self.access_token: str = access_token
# OAUTH fields.
self.scope: str = scope if not isinstance(scope, GitflicAuthScopes) else scope.value
self.client_id: str = client_id
self.redirect_url: str = redirect_url
self.state = state
self.state: str = state
self.refresh_token: str = None
if any((access_token, scope, client_id, redirect_url, state)):
if access_token:
pass
elif not (scope and client_id and redirect_url, state):
raise GitflicExceptions(
"Cannot auth without 'scope', 'client_id', 'redirect_url', 'state' parameters. "
"See doc: https://gitflic.ru/help/api/access-token."
)
self._try_login()
def _try_login(self):
"""
Tries to login user with token or OAUTH.
"""
if self.access_token:
# Raw authorization.
self._token_login()
else:
raise GitflicExceptions(
"Cannot auth without 'token' or 'scope' and 'client_id' and 'redirect_url' and 'state' parameters. "
"See doc: https://gitflic.ru/help/api/access-token."
)
if self.scope and self.client_id and self.redirect_url and self.state:
# OAUTH authorization.
self._oauth_login()
else:
if any(self.scope, self.client_id, self.redirect_url, self.state):
raise GitflicExceptions(
"Not found one of params for OAUTH, you are required to send ALL params from ('scope', 'client_id', 'redirect_url', 'state')!"
"See docs: https://gitflic.ru/help/api/access-token."
)
raise GitflicExceptions(
"Please pass 'token' param for raw auth or ('scope', 'client_id', 'redirect_url', 'state') params for OAUTH "
"See docs: https://gitflic.ru/help/api/access-token."
)
def _oauth_login(self):
"""
Tries to login user with OAUTH.
"""
self.log.debug("Trying to login with OAUTH...")
self._login()
# OAUTH authorization.
raise GitflicExceptions("OAUTH not implemented yet! Use raw access_token authorization.")
def _login(self):
self.log.debug("Trying to login.")
# redirect_url = urllib.parse.quote_plus(self.redirect_url)
# webbrowser.open(OAUTH_URL.format(self.scope, self.client_id, redirect_url, self.state))
# url = input("Paste redirect url: ")
# r = self.session.get("").json()
# print(r)
# self.session.headers.update({"Authorization": "token " + "null"})
# self.check_token()
def _token_login(self):
"""
Tries to login user with given access token.
"""
self.log.debug("Trying to login with token...")
if self.access_token:
self.session.headers.update({"Authorization": "token " + self.access_token})
else:
raise GitflicExceptions("Oauth not ready. Use access_token.")
# redirect_url = urllib.parse.quote_plus(self.redirect_url)
# webbrowser.open(oauth_url.format(self.scope, self.client_id, redirect_url, self.state))
# url = input("Paste redirect url: ")
# r = self.session.get("").json()
# print(r)
# self.session.headers.update({"Authorization": "token " + "null"})
self.session.headers.update({
"Authorization": "token " + self.access_token
})
self.check_token()
def check_token(self):
self.log.debug("Check token.")
"""
Checks that current auth session token is valid or not (by making request to get current user).
"""
self.log.debug("Checking token....")
r = self.session.get("https://api.gitflic.ru/user/me")
if r.status_code == 403:
raise AuthError("Authentication failed.")
raise AuthError("Authentication failed. Invalid token?")
else:
r = r.json()
self.log.debug(f"Logged as {r.get('username')} {r.get('id')}")
self.log.debug(f"Successfully logged as {r.get('username')} {r.get('id')}")