Source code for pycarol.auth.PwdAuth

"""
Standard User/Password Auth

"""

import types
import time
from .PwdAuth_cloner import PwdAuthCloner


[docs]class PwdAuth: """ Args: user: `str` Username password: `str` Password """ def __init__(self, user, password): self.user = user self.password = password self._token = None self.carol = None self.connector_id = None def _set_token(self, data): self._token = types.SimpleNamespace() self._token.access_token = data['access_token'] self._token.refresh_token = data['refresh_token'] self._token.expiration = data['timeIssuedInMillis'] + ( data['expires_in'] * 1000) def set_connector_id(self, connector_id): self.connector_id = connector_id def cloner(self): return PwdAuthCloner(self)
[docs] def login(self, carol): """ Args: carol: pycarol.Carol Carol() instance. Returns: None """ self.carol = carol data = { 'username': self.user, 'password': self.password, 'grant_type': 'password', 'subdomain': carol.domain, 'connectorId': carol.connector_id, 'orgSubdomain': carol.organization } resp = self.carol.call_api('v2/oauth2/token', auth=False, data=data, content_type='application/x-www-form-urlencoded') self._set_token(resp) if self.carol.verbose: print("Token: {}".format(self._token.access_token))
def authenticate_request(self, headers): headers['Authorization'] = self.get_access_token() def _is_token_expired(self): if self._token is None: return True if self._token.expiration == 0: return False now = time.time() * 1000 # Adds 1 min buffer expiry = self._token.expiration - 60000 return now > expiry def get_access_token(self): if self._is_token_expired(): self._refresh_token() return self._token.access_token def _refresh_token(self): resp = self.carol.call_api('v2/oauth2/token', auth=False, data={ 'grant_type': 'refresh_token', 'refresh_token': self._token.refresh_token }, content_type='application/x-www-form-urlencoded') self._set_token(resp)
[docs] def switch_context(self, env_id): """ Switch context to an environment within the same organization. Args: env_id: `str` environment id to switch context to. Returns: None """ path = f'v2/oauth2/switchTenantContext/{env_id}' resp = self.carol.call_api(method='POST', path=path) self._set_token(resp)
[docs] def switch_org_context(self, org_id): """ Go to the organization context or switch organization. Args: org_id: `str` organization id. Returns: None """ path = f'v2/oauth2/switchOrgContext/{org_id}' resp = self.carol.call_api(method='POST', path=path) self._set_token(resp)