Source code for pycarol.carol

from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter
import json
import os, copy
import os.path
from .auth.ApiKeyAuth import ApiKeyAuth
from .auth.PwdAuth import PwdAuth
from .tenant import Tenant
from . import __CONNECTOR_PYCAROL__
from . import __version__
from . organization import Organization


[docs]class Carol: """ This class handle all Carol`s API calls It will handle all API calls, for a given authentication method. :param domain: `str`. Args: domain: `str`. default `None`. Tenant name. e.x., domain.carol.ai app_name: `str`. default `None`. Carol app name. auth: `PwdAuth` or `ApiKeyAuth`. object Auth Carol object to handle authentication connector_id: `str` , default `__CONNECTOR_PYCAROL__`. Connector Id port: `int` , default 443. Port to be used (when running locally it could change) verbose: `bool` , default `False`. If True will print the header, method and URL of each API call. organization: `str` , default `None`. Organization domain. environment: `str`, default `carol.ai`, Which Carol's environment to use. There are three possible values today. 1. 'carol.ai' for the production environment 2. 'karol.ai' for the explore environment 3. 'qarol.ai' for the QA environment host: `str` default `None` This will overwrite the host used. Today the host is: 1. if organization is None, host={domain}.{environment} 2. else host={organization}.{environment} See Carol._set_host. OBS: In case all parameters are `None`, pycarol will try yo find their values in the environment variables. The values are: 1. `CAROLTENANT` for domain 2. `CAROLAPPNAME` for app_name 3. `CAROLAPPOAUTH` for auth 4. `CAROLORGANIZATION` for organization 5. `CAROLCONNECTORID` for connector_id 6. `CAROL_DOMAIN` for environment 7. `CAROLUSER` for carol user email 8. `CAROLPWD` for user password. """ def __init__(self, domain=None, app_name=None, auth=None, connector_id=None, port=443, verbose=False, organization=None, environment=None, host=None): settings = dict() if auth is None and domain is None: domain = os.getenv('CAROLTENANT') app_name = os.getenv('CAROLAPPNAME') assert domain and app_name, \ f"One of the following env variables are missing:\n CAROLTENANT: {domain}\n CAROLAPPNAME: {app_name}" carol_user = os.getenv('CAROLUSER') carol_pw = os.getenv('CAROLPWD') if carol_user and carol_pw: auth = PwdAuth(user=carol_user, password=carol_pw) else: auth_token = os.getenv('CAROLAPPOAUTH') connector_id = os.getenv('CAROLCONNECTORID') assert domain and app_name and auth_token and connector_id,\ "One of the following env variables are missing:\n " \ f"CAROLTENANT: {domain}\nCAROLAPPNAME: {app_name}" \ f"\nCAROLAPPOAUTH: {auth}\nCAROLCONNECTORID: {connector_id}\n" auth = ApiKeyAuth(auth_token) if connector_id is None: if auth.connector_id is None: connector_id = __CONNECTOR_PYCAROL__ else: connector_id = auth.connector_id if domain is None or app_name is None or auth is None: raise ValueError("domain, app_name and auth must be specified as parameters, either " + "in the environment variables CAROLTENANT, CAROLAPPNAME, CAROLAPPOAUTH" + " or CAROLUSER+CAROLPWD and CAROLCONNECTORID") # TODO Fixed to be compatible with the old `ENV_DOMAIN`. We could add a deprecated warning. self.environment = environment if environment is not None else os.getenv('CAROL_DOMAIN', os.getenv('ENV_DOMAIN', 'carol.ai')) self.organization = organization if organization is not None else os.getenv('CAROLORGANIZATION', os.getenv('API_SUBDOMAIN')) self.domain = domain self.app_name = app_name self.port = port self.verbose = verbose self.host = self._set_host(domain=self.domain, organization=self.organization, environment=self.environment, host=host) self.tenant = Tenant(self).get_tenant_by_domain(domain) self.connector_id = connector_id self.auth = auth self.auth.set_connector_id(self.connector_id) self.auth.login(self) self.response = None self.org = None @staticmethod def _set_host(domain, organization, environment, host): """ Set the host to be used. Args: domain: `str` Former tenant name. e.x., domain.carol.ai organization: `str` Organization domain. environment: `str` Which Carol's environment to use. There are three possible values today. 1. 'carol.ai' for the production environment 2. 'karol.ai' for the explore environment 3. 'qarol.ai' for the QA environment host: `str` Host to be used. It overwrite the default one. Returns: `str` host """ if host is not None: return host elif organization is not None: return f"{organization}.{environment}" else: return f"{domain}.{environment}" pass @staticmethod def _retry_session(retries=5, session=None, backoff_factor=0.5, status_forcelist=(500, 502, 503, 504, 524), method_whitelist=frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])): """ Static method used to handle retries between calls. Args: retries: `int` , default `5` Number of retries for the API calls session: Session object dealt `None` It allows you to persist certain parameters across requests. backoff_factor: `float` , default `0.5` Backoff factor to apply between attempts. It will sleep for: {backoff factor} * (2 ^ ({retries} - 1)) seconds status_forcelist: `iterable` , default (500, 502, 503, 504, 524). A set of integer HTTP status codes that we should force a retry on. A retry is initiated if the request method is in method_whitelist and the response status code is in status_forcelist. method_whitelist: `iterable` , default frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])) Set of uppercased HTTP method verbs that we should retry on. Returns: :class:`requests.Section` """ session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, method_whitelist=method_whitelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
[docs] def call_api(self, path, method=None, data=None, auth=True, params=None, content_type='application/json', retries=8, session=None, backoff_factor=0.5, status_forcelist=(502, 503, 504, 524), downloadable=False, method_whitelist=frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE']), errors='raise', extra_headers=None, **kwds): """ This method handles all the API calls. Args: path: `str`. API URI path. e.x. v2/staging/schema method: 'str', default `None`. Set of uppercased HTTP method verbs that we should call on. data: 'dict`, default `None`. Dictionary, list of tuples, bytes, or file-like object to send in the body of the request. auth: :class: `pycarol.ApiKeyAuth` or `pycarol.PwdAuth` Auth type to be used within the API's calls. params: (optional) Dictionary, list of tuples or bytes to send in the query string for the :class:`requests.Request`. content_type: `str`, default 'application/json' Content type for the api call retries: `int` , default `5` Number of retries for the API calls session: :class `requests.Session` object dealt `None` It allows you to persist certain parameters across requests. backoff_factor: `float` , default `0.5` Backoff factor to apply between attempts. It will sleep for: {backoff factor} * (2 ^ ({retries} - 1)) seconds status_forcelist: `iterable` , default (500, 502, 503, 504, 524). A set of integer HTTP status codes that we should force a retry on. A retry is initiated if the request method is in method_whitelist and the response status code is in status_forcelist. downloadable: `bool` default `False`. If the request will return a file to download. method_whitelist: `iterable` , default frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])) Set of uppercased HTTP method verbs that we should retry on. errors: {‘ignore’, ‘raise’}, default ‘raise’ If ‘raise’, then invalid request will raise an exception If ‘ignore’, then invalid request will return the request response extra_headers: `dict` default `None` extra headers to be sent. kwds: `dict` default `None` Extra parameters to be sent to :class: `requests.request` Rerturn: Dict with API response. """ extra_headers = extra_headers or {} url = f'https://{self.host}:{self.port}/api/{path}' if method is None: if data is None: method = 'GET' else: method = 'POST' met_list = ['HEAD', 'TRACE', 'GET', 'PUT','POST', 'OPTIONS', 'PATCH', 'DELETE', 'CONNECT' ] assert method in met_list, f"API method must be {met_list}" headers = {'accept': 'application/json'} if auth: self.auth.authenticate_request(headers) data_json = None if method == 'GET': pass elif (method == 'POST') or (method == 'DELETE') or (method == 'PUT'): headers['content-type'] = content_type if content_type == 'application/json': data_json = data data = None headers.update(extra_headers) headers.update({'User-Agent': f'pyCarol/{__version__}'}) __count = 0 while True: if session is None: session = self._retry_session(retries=retries, session=session, backoff_factor=backoff_factor, status_forcelist=status_forcelist, method_whitelist=method_whitelist) response = session.request(method=method, url=url, data=data, json=data_json, headers=headers, params=params, **kwds) if self.verbose: if data_json is not None: print("Calling {} {}. Payload: {}. Params: {}".format(method, url, data_json, params)) else: print("Calling {} {}. Payload: {}. Params: {}".format(method, url, data, params)) print(" Headers: {}".format(headers)) if response.ok or errors == 'ignore': if downloadable: #Used when downloading carol app file. return response response.encoding = 'utf-8' self.response = response if response.text == '': return {} return json.loads(response.text) elif (response.reason == 'Unauthorized') and isinstance(self.auth,PwdAuth): if response.json().get('possibleResponsibleField') in ['password', 'userLogin']: raise Exception(response.text) self.auth.get_access_token() #It will refresh token if Unauthorized __count+=1 if __count<5: #To avoid infinity loops continue else: raise Exception('Too many retries to refresh token.\n', response.text, response.status_code) raise Exception(response.text, response.status_code)
[docs] def issue_api_key(self): """ Create an API key for a given connector. Returns: `dict` Dictionary with the API key. """ resp = self.call_api('v2/apiKey/issue', data={ 'connectorId': self.connector_id }, content_type='application/x-www-form-urlencoded') return resp
[docs] def api_key_details(self, api_key, connector_id): """ Display information about the API key. Args: api_key: `str` Carol's api key connector_id: `str` Connector Id which API key was created. Returns: `dict` Dictionary with API key information. """ resp = self.call_api('v2/apiKey/details', params = {"apiKey": api_key, "connectorId": connector_id}) return resp
[docs] def api_key_revoke(self, connector_id): """ Revoke API key for ta given connector_id Args: connector_id: `str` Connector Id which API key was created. Returns: `dict` Dictionary with API request response. """ resp = self.call_api('v2/apiKey/revoke', method='DELETE', content_type='application/x-www-form-urlencoded', params = {"connectorId": connector_id}) return resp
[docs] def copy_token(self): """ Copy token to clipboard Returns: None """ import pyperclip if isinstance(self.auth, PwdAuth): token = self.auth._token.access_token pyperclip.copy(token) print("Copied auth token to clipboard: " + token) elif isinstance(self.auth, ApiKeyAuth): token = self.auth.api_key pyperclip.copy(token) print("Copied API Key to clipboard: " + token) else: raise Exception("Auth object not set. Can't fetch token.")
[docs] def switch_environment(self, env_name=None, env_id=None, app_name=None): """ Switch environments. If the user has access to this environment, it will be "logged in" in this new environment. Args: env_name: `str` default `None` Environment (tenant) name to switch the context to. env_id: `str` default `None` Environment (tenant) id to switch the context to. app_name: `str` default `None` App name in the target environment to switch the context to. Only needed with using CDS. Returns: self .. code:: python from pycarol import Carol, Staging carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pass'), ) carol.switch_environment('A') Staging(carol_tenant_A).fetch_parquet(...) # fetch parquet from tenant A # To switch back carol.switch_environment('B') #back to tenant B """ if self.org is None: self.org = Organization(self).get_organization_info(self.organization) if env_name: env_id = Tenant(self).get_tenant_by_domain(env_name)['mdmId'] elif env_id is None: raise ValueError('Either `env_name` or `env_id` must be set.') self.auth.switch_context(env_id=env_id) self.domain = env_name self.app_name = app_name #TODO: Today we cannot use CDS without a valid app name. self.tenant = Tenant(self).get_tenant_by_domain(env_name) return self
[docs] def switch_context(self, env_name=None, env_id=None, app_name=None): """ Context manager to temporary have access to a second environment Args: env_name: `str` default `None` Environment (tenant) name to switch the context to. env_id: `str` default `None` Environment (tenant) id to switch the context to. app_name: `str` default `None` App name in the target environment to switch the context to. Only needed with using CDS. Returns: None Usage: .. code:: python from pycarol import Carol, Staging carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pass'), ) with carol.switch_context('A') as carol_tenant_A: Staging(carol_tenant_A).fetch_parquet(...) # fetch parquet from tenant A #back to tenant B """ if self.org is None: self.org = Organization(self).get_organization_info(self.organization) if env_name: env_id = Tenant(self).get_tenant_by_domain(env_name)['mdmId'] elif env_id is None: raise ValueError('Either `env_name` or `env_id` must be set.') class SwitchContext(object): def __init__(self, parent_context, env_name=None, env_id=None, app_name=None): self.parent_context = parent_context self.env_name = env_name self.env_id = env_id self.app_name = app_name def __enter__(self): self.parent_context.switch_environment(env_name= self.env_name, env_id=self.env_id, app_name=self.app_name) return self.parent_context def __exit__(self, exc_type, exc_val, exc_tb): del self.parent_context return SwitchContext(parent_context=copy.deepcopy(self), env_name=env_name, env_id=env_id, app_name=app_name)