Source code for pycarol.carol

from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter
import json
import os, copy
import os.path
from .auth.ApiKeyAuth import ApiKeyAuth
from .auth.PwdAuth import PwdAuth
from .tenant import Tenant
from . import __CONNECTOR_PYCAROL__
from . import __version__
from . organization import Organization
from .exceptions import CarolApiResponseException, InvalidToken

[docs]class Carol: """ Handles all Carol`s API calls It will handle all API calls, for a given authentication method. :param domain: `str`. Args: domain: `str`. default `None`. Tenant name. e.x., domain.carol.ai app_name: `str`. default `None`. Carol app name. auth: `PwdAuth` or `ApiKeyAuth`. object Auth Carol object to handle authentication connector_id: `str` , default `__CONNECTOR_PYCAROL__`. Connector Id port: `int` , default 443. Port to be used (when running locally it could change) verbose: `bool` , default `False`. If True will print the header, method and URL of each API call. organization: `str` , default `None`. Organization domain. environment: `str`, default `carol.ai`, Which Carol's environment to use. There are three possible values today. 1. 'carol.ai' for the production environment 2. 'karol.ai' for the explore environment 3. 'qarol.ai' for the QA environment host: `str` default `None` This will overwrite the host used. Today the host is: 1. if organization is None, host={domain}.{environment} 2. else host={organization}.{environment} See Carol._set_host. user: `str` default `None` User password: `str` default `None` User passowrd api_key: `str` default `None` Carol's Api Key org_level: `bool` default `False` If True, will log-in at organization level. OBS: In case all parameters are `None`, pycarol will try yo find their values in the environment variables. The values are: 1. `CAROLTENANT` for domain 2. `CAROLAPPNAME` for app_name 3. `CAROLAPPOAUTH` for auth 4. `CAROLORGANIZATION` for organization 5. `CAROLCONNECTORID` for connector_id 6. `CAROL_DOMAIN` for environment 7. `CAROLUSER` for carol user email 8. `CAROLPWD` for user password. """ def __init__(self, domain=None, app_name=None, auth=None, connector_id=None, port=443, verbose=False, organization=None, environment=None, host=None, user=None, password=None, api_key=None, org_level=False): self.connector_id = connector_id if auth is None: auth = self.__get_auth(connector_id=connector_id, username=user, password=password, app_oauth=api_key) if (domain is None) and (not org_level): domain = os.getenv('CAROLTENANT') if domain is None: raise ValueError("`domain` must be set.") if org_level: domain = None if app_name is None: app_name = os.getenv('CAROLAPPNAME', ' ') if self.connector_id is None: if auth.connector_id is None: self.connector_id = __CONNECTOR_PYCAROL__ else: self.connector_id = auth.connector_id if (domain is None and not org_level) or app_name is None or auth is None: raise ValueError("domain, app_name and auth must be specified as parameters, either " + "in the environment variables CAROLTENANT, CAROLAPPNAME, CAROLAPPOAUTH" + " or CAROLUSER+CAROLPWD and CAROLCONNECTORID") # TODO Fixed to be compatible with the old `ENV_DOMAIN`. We could add a deprecated warning. self.environment = environment if environment is not None else os.getenv('CAROL_DOMAIN', os.getenv('ENV_DOMAIN', 'carol.ai')) self.organization = organization if organization is not None else os.getenv('CAROLORGANIZATION', os.getenv('API_SUBDOMAIN')) self.domain = domain self.app_name = app_name self.port = port self.verbose = verbose self._host_string = host self.host = self._set_host(domain=self.domain, organization=self.organization, environment=self.environment, host=host) self._tenant = None self._current_user = None self.auth = auth self.auth.set_connector_id(self.connector_id) self.session = None self._is_org_level = org_level self.auth.login(self) self.response = None self.org = None def __get_auth(self, connector_id=None, username=None, password=None, app_oauth=None): if username is None: username = os.getenv('CAROLUSER') if password is None: password = os.getenv('CAROLPWD') if username and password: return PwdAuth(user=username, password=password) if app_oauth is None: app_oauth = os.getenv('CAROLAPPOAUTH') if connector_id is None: connector_id = os.getenv('CAROLCONNECTORID', __CONNECTOR_PYCAROL__) self.connector_id = connector_id if app_oauth and connector_id: return ApiKeyAuth(app_oauth) raise ValueError("either `auth` or `username/password` or `api_key` or pycarol env variables must be set.") @property def current_user(self, force=False): """ Returns the current user. Args: force: `bool` default `False`. If True will force the request to fetch current user. """ if self._current_user is None or force: self._current_user = self.call_api('v2/users/current', ) return self._current_user @property def tenant(self): if self._tenant is None and not self._is_org_level: self._tenant = Tenant(self).get_tenant_by_domain(self.domain) return self._tenant @staticmethod def _set_host(domain, organization, environment, host): """ Set the host to be used. Args: domain: `str` Former tenant name. e.x., domain.carol.ai organization: `str` Organization domain. environment: `str` Which Carol's environment to use. There are three possible values today. 1. 'carol.ai' for the production environment 2. 'karol.ai' for the explore environment 3. 'qarol.ai' for the QA environment host: `str` Host to be used. It overwrite the default one. Returns: `str` host """ if host is not None: return host elif organization is not None: return f"{organization}.{environment}" else: return f"{domain}.{environment}" @staticmethod def _retry_session(retries=5, session=None, backoff_factor=0.5, status_forcelist=(500, 502, 503, 504, 524), method_whitelist=frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])): """ Static method used to handle retries between calls. Args: retries: `int` , default `5` Number of retries for the API calls session: Session object dealt `None` It allows you to persist certain parameters across requests. backoff_factor: `float` , default `0.5` Backoff factor to apply between attempts. It will sleep for: {backoff factor} * (2 ^ ({retries} - 1)) seconds status_forcelist: `iterable` , default (500, 502, 503, 504, 524). A set of integer HTTP status codes that we should force a retry on. A retry is initiated if the request method is in method_whitelist and the response status code is in status_forcelist. method_whitelist: `iterable` , default frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])) Set of uppercased HTTP method verbs that we should retry on. Returns: :class:`requests.Section` """ session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, method_whitelist=method_whitelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
[docs] def call_api(self, path, method=None, data=None, auth=True, params=None, content_type='application/json', retries=8, session=None, backoff_factor=0.5, status_forcelist=(502, 503, 504, 524), downloadable=False, method_whitelist=frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE', 'POST']), errors='raise', extra_headers=None, files=None, **kwds): """ This method handles all the API calls. Args: path: `str`. API URI path. e.x. v2/staging/schema method: 'str', default `None`. Set of uppercased HTTP method verbs that we should call on. data: 'dict`, default `None`. Dictionary, list of tuples, bytes, or file-like object to send in the body of the request. auth: :class: `pycarol.ApiKeyAuth` or `pycarol.PwdAuth` Auth type to be used within the API's calls. params: (optional) Dictionary, list of tuples or bytes to send in the query string for the :class:`requests.Request`. content_type: `str`, default 'application/json' Content type for the api call retries: `int` , default `5` Number of retries for the API calls session: :class `requests.Session` object dealt `None` It allows you to persist certain parameters across requests. backoff_factor: `float` , default `0.5` Backoff factor to apply between attempts. It will sleep for: {backoff factor} * (2 ^ ({retries} - 1)) seconds status_forcelist: `iterable` , default (500, 502, 503, 504, 524). A set of integer HTTP status codes that we should force a retry on. A retry is initiated if the request method is in method_whitelist and the response status code is in status_forcelist. downloadable: `bool` default `False`. If the request will return a file to download. method_whitelist: `iterable` , default frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE'])) Set of uppercased HTTP method verbs that we should retry on. errors: {‘ignore’, ‘raise’}, default ‘raise’ If ‘raise’, then invalid request will raise an exception If ‘ignore’, then invalid request will return the request response extra_headers: `dict` default `None` extra headers to be sent. files: `dict` default `None` Used when uploading files to carol. This will be sent to :class: `requests.request` kwds: `dict` default `None` Extra parameters to be sent to :class: `requests.request` Rerturn: Dict with API response. """ if session is not None: self.session = session extra_headers = extra_headers or {} url = f'https://{self.host}:{self.port}/api/{path}' if method is None: if data is None: method = 'GET' else: method = 'POST' met_list = ['HEAD', 'TRACE', 'GET', 'PUT','POST', 'OPTIONS', 'PATCH', 'DELETE', 'CONNECT' ] assert method in met_list, f"API method must be {met_list}" headers = {'accept': 'application/json'} if auth: self.auth.authenticate_request(headers) data_json = None if method == 'GET': pass elif (method == 'POST') or (method == 'DELETE') or (method == 'PUT'): if content_type is not None: headers['content-type'] = content_type if content_type == 'application/json': data_json = data data = None headers.update(extra_headers) headers.update({'User-Agent': f'pyCarol/{__version__}'}) __count = 0 while True: self.session = self._retry_session( retries=retries, session=self.session, backoff_factor=backoff_factor, status_forcelist=status_forcelist, method_whitelist=method_whitelist ) response = self.session.request(method=method, url=url, data=data, json=data_json, headers=headers, params=params, files=files, **kwds) if self.verbose: if data_json is not None: print("Calling {} {}. Payload: {}. Params: {}".format(method, url, data_json, params)) else: print("Calling {} {}. Payload: {}. Params: {}".format(method, url, data, params)) print(" Headers: {}".format(headers)) if response.ok or errors == 'ignore': if downloadable: #Used when downloading carol app file. return response response.encoding = 'utf-8' self.response = response if response.text == '': return {} return json.loads(response.text) elif (response.reason == 'Unauthorized') and isinstance(self.auth, PwdAuth): if response.json().get('possibleResponsibleField') in ['password', 'userLogin']: raise InvalidToken(response.text) self.auth.get_access_token() #It will refresh token if Unauthorized __count += 1 if __count < 5: #To avoid infinity loops continue else: raise Exception('Too many retries to refresh token.\n', response.text, response.status_code) elif response.status_code == 404: raise CarolApiResponseException(response.text, response.status_code) raise Exception(response.text, response.status_code)
[docs] def issue_api_key(self, connector_id=None): """ Create an API key for a given connector. Args: connector_id: `str` default `None`. Connector ID to be used when creating the APIkey Returns: `dict` Dictionary with the API key. """ if connector_id is None: connector_id = self.connector_id resp = self.call_api('v2/apiKey/issue', data={ 'connectorId': connector_id }, content_type='application/x-www-form-urlencoded') return resp
[docs] def api_key_details(self, api_key, connector_id): """ Display information about the API key. Args: api_key: `str` Carol's api key connector_id: `str` Connector Id which API key was created. Returns: `dict` Dictionary with API key information. """ resp = self.call_api('v2/apiKey/details', params = {"apiKey": api_key, "connectorId": connector_id}) return resp
[docs] def api_key_revoke(self, connector_id): """ Revoke API key for ta given connector_id Args: connector_id: `str` Connector Id which API key was created. Returns: `dict` Dictionary with API request response. """ resp = self.call_api('v2/apiKey/revoke', method='DELETE', content_type='application/x-www-form-urlencoded', params = {"connectorId": connector_id}) return resp
[docs] def switch_org_level(self): """ Switch organization level. """ org = self._current_org() self.auth.switch_org_context(org['mdmId']) self._is_org_level = True
[docs] def switch_environment(self, env_name=None, env_id=None, app_name=None, org_name=None, org_id=None): """ Switch org/environments. If the user has access to this environment, it will be "logged in" in this new org/environment. Args: env_name: `str` default `None` Environment (tenant) name to switch the context to. env_id: `str` default `None` Environment (tenant) id to switch the context to. app_name: `str` default `None` App name in the target environment to switch the context to. Only needed with using CDS. org_name: `str` default `None` The organization name to switch context to. If the same keep it `None` org_id: `str` default `None` The organization id to switch context to. If the same keep it `None` Returns: self .. code:: python from pycarol import Carol, Staging carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pass'), ) carol.switch_environment('A') Staging(carol_tenant_A).fetch_parquet(...) # fetch parquet from tenant A # To switch back carol.switch_environment('B') #back to tenant B """ if self.org is None: self.org = Organization(self).get_organization_info(self.organization) current = self.get_current() if (org_id is not None and org_id != current['org_id']) or (org_name is not None and org_name != current['org_name'] ): # Switch to org context. if not current['org_level']: self.switch_org_level() if org_id is None: org_id = Organization(self).get_organization_info(org_name)['mdmId'] # Switch org. self.auth.switch_org_context(org_id) if env_name: env_id = Tenant(self).get_tenant_by_domain(env_name)['mdmId'] if env_id is not None: self.auth.switch_context(env_id=env_id) self._tenant = self._current_env() self._is_org_level = False else: self._tenant = None self._is_org_level = True self.domain = env_name self.app_name = app_name # TODO: Today we cannot use CDS without a valid app name. self.organization = self._current_org()['mdmName'] self.host = self._set_host(domain=self.domain, organization=self.organization, environment=self.environment, host=self._host_string) return self
[docs] def switch_context(self, env_name=None, env_id=None, app_name=None, org_name=None, org_id=None): """ Context manager to temporary have access to a second environment Args: env_name: `str` default `None` Environment (tenant) name to switch the context to. env_id: `str` default `None` Environment (tenant) id to switch the context to. app_name: `str` default `None` App name in the target environment to switch the context to. Only needed with using CDS. Returns: None Usage: .. code:: python from pycarol import Carol, Staging carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pass'), ) with carol.switch_context('A') as carol_tenant_A: Staging(carol_tenant_A).fetch_parquet(...) # fetch parquet from tenant A #back to tenant B """ if self.org is None: self.org = Organization(self).get_organization_info(self.organization) if env_name: env_id = Tenant(self).get_tenant_by_domain(env_name)['mdmId'] class SwitchContext(object): def __init__(self, parent_context, env_name=None, env_id=None, org_id=None, org_name=None, app_name=None): self.parent_context = parent_context self.env_name = env_name self.env_id = env_id self.app_name = app_name self.org_id = org_id self.org_name = org_name def __enter__(self): self.parent_context.switch_environment(env_name=self.env_name, env_id=self.env_id, app_name=self.app_name, org_name=self.org_name, org_id=self.org_id) return self.parent_context def __exit__(self, exc_type, exc_val, exc_tb): del self.parent_context return SwitchContext(parent_context=copy.deepcopy(self), env_name=env_name, env_id=env_id, app_name=app_name, org_name=org_name, org_id=org_id,)
def _current_env(self): return self.call_api('v1/tenants/current', errors='ignore') def _current_org(self): return self.call_api('v1/organizations/current')
[docs] def get_current(self, level='all'): """ Get current org/env information. Args: level: `str`: Possible Values: "all": To get organization and environment information. "org": To get organization information. "env": To get environment information. Returns: `dict` Dictionary with keys org_Id, org_name, env_id, env_name """ env = {} org = {} if level.lower() not in ['org', 'env', 'all']: raise ValueError(f"level should be 'all', 'org', 'env', {level} was passed.") if level.lower() == 'env' or level.lower() == 'all': env = self._current_env() if level.lower() == 'org' or level.lower() == 'all': org = self._current_org() if env.get('mdmName') is None: self._is_org_level = True else: self._is_org_level = False return { "env_name": env.get('mdmName'), "env_id": env.get('mdmId'), "org_name": org.get('mdmName'), "org_id": org.get('mdmId'), "org_level": self._is_org_level }
[docs] def get_tenants_for_user(self): """ Get all tenants for the current user. Returns: `list` List of tenants. """ return self.call_api("v1/users/assignedTenantsForCurrentUser", method='GET')