import base64
import hmac
import json
import random
from urllib.parse import urlencode, urljoin
class MagentaRequest:
def __init__(self, client_id, client_secret, nonce, scopes, callback_url):
self.client_id = client_id
self.client_secret = client_secret
if not isinstance(client_secret, bytes):
self.client_secret = base64.b32decode(client_secret.upper())
self.nonce = nonce
if nonce is None:
self.nonce = random.randint(10000000, 99999999)
self.scopes = scopes
if not isinstance(scopes, list):
self.scopes = [
scopes,
]
self.callback_url = callback_url
def sign(self):
data = {
"client_id": self.client_id,
"nonce": self.nonce,
"scopes": self.scopes,
"callback_url": self.callback_url,
}
payload = json.dumps(data).encode("utf-8")
payload = base64.urlsafe_b64encode(payload)
signature = hmac.digest(self.client_secret, payload, "sha256")
signature = base64.urlsafe_b64encode(signature)
return (payload, signature)
def begin_url(self, magenta_base):
payload, signature = self.sign()
params = {
"client": self.client_id,
"payload": payload,
"signature": signature,
}
params = urlencode(params)
url = urljoin(magenta_base, f"/sso/begin?{params}")
return url
@classmethod
def verify(cls, payload, signature, client_secret):
if not isinstance(client_secret, bytes):
client_secret = base64.b32decode(client_secret.upper())
if not isinstance(signature, bytes):
signature = signature.encode("utf-8")
if not isinstance(payload, bytes):
payload = payload.encode("utf-8")
signature_check = hmac.digest(client_secret, payload, "sha256")
signature = base64.urlsafe_b64decode(signature)
if signature != signature_check:
raise ValueError("mismatching signatures")
payload = base64.urlsafe_b64decode(payload)
payload = json.loads(payload.decode("utf-8"))
scopes = payload["scopes"]
if not isinstance(scopes, list):
scopes = [
scopes,
]
output = cls(
payload["client_id"],
client_secret,
payload["nonce"],
scopes,
payload["callback_url"],
)
return output
def __repr__(self):
return "MagentaRequest(client_id=%r nonce=%r scopes=%r callback_url=%r)" % (
self.client_id,
self.nonce,
self.scopes,
self.callback_url,
)