import copy
from pathlib import Path
import json
import os
import typing as T
import warnings
from dotenv import load_dotenv
import requests
from urllib3.util.retry import Retry
from . import __CONNECTOR_PYCAROL__, __version__
from . import exceptions
from .auth.ApiKeyAuth import ApiKeyAuth
from .auth.PwdAuth import PwdAuth
from .organization import Organization
from .tenant import Tenant
ResponseType = T.Union[requests.Response, T.Dict[str, T.Any], T.List]
class SwitchContext:
def __init__(
self,
parent_context,
env_name=None,
env_id=None,
org_id=None,
org_name=None,
app_name=None,
):
self.parent_context = parent_context
self.env_name = env_name
self.env_id = env_id
self.app_name = app_name
self.org_id = org_id
self.org_name = org_name
def __enter__(self):
self.parent_context.switch_environment(
env_name=self.env_name,
env_id=self.env_id,
app_name=self.app_name,
org_name=self.org_name,
org_id=self.org_id,
)
return self.parent_context
def __exit__(self, exc_type, exc_val, exc_tb):
del self.parent_context
[docs]class Carol:
"""Handles all Carol`s API calls and log user in Carol.
In case all parameters are `None`, pycarol will try yo find their values in the
environment variables.
The values are:
1. `CAROLTENANT` for domain
2. `CAROLAPPNAME` for app_name
3. `CAROLAPPOAUTH` for auth
4. `CAROLORGANIZATION` for organization
5. `CAROLCONNECTORID` for connector_id
6. `CAROL_DOMAIN` for environment
7. `CAROLUSER` for carol user email
8. `CAROLPWD` for user password.
Args:
domain: Tenant name. e.x., domain.carol.ai.
app_name: Carol app name.
auth: Auth Carol object to handle authentication
connector_id: Connector Id
port: Port to be used (when running locally it could change)
verbose: If True will print the header, method and URL of each API call.
organization: Organization domain.
environment: Which Carol's environment to use.
There are three possible values today.
1. 'carol.ai' for the production environment
2. 'karol.ai' for the explore environment
3. 'qarol.ai' for the QA environment
host: This will overwrite the host used. Today the host is:
1. if organization is None, host={domain}.{environment}
2. else host={organization}.{environment}
See Carol._set_host.
user: User
password:User passowrd
api_key: Carol's Api Key
org_level: If True, will log-in at organization level.
dotenv_path: Path to dotenv file, if loading is required.
Raises:
MissingInfoCarolException if there is any mandatory parameter missing
"""
def __init__(
self,
domain: T.Optional[str] = None,
app_name: T.Optional[str] = None,
auth: T.Optional[T.Union[ApiKeyAuth, PwdAuth]] = None,
connector_id: T.Optional[str] = None,
port: int = 443,
verbose: bool = False,
organization: T.Optional[str] = None,
environment: T.Optional[str] = None,
host: T.Optional[str] = None,
user: T.Optional[str] = None,
password: T.Optional[str] = None,
api_key: T.Optional[str] = None,
org_level: bool = False,
dotenv_path: T.Optional[T.Union[str, Path]] = None,
):
if dotenv_path is not None:
dotenv_path = Path(dotenv_path)
if not dotenv_path.exists():
msg = f"Invalid path to dotenv: {dotenv_path}"
raise exceptions.MissingInfoCarolException(msg)
load_dotenv(str(dotenv_path))
auth = _prepare_auth(api_key, auth, connector_id, password, user)
domain = domain or os.getenv("CAROLTENANT") or None
if (domain is None) and (not org_level):
raise exceptions.MissingInfoCarolException("`domain` must be set.")
if org_level is True:
domain = None
app_name = app_name or os.getenv("CAROLAPPNAME", " ")
environment = environment or os.getenv("CAROL_DOMAIN") or "carol.ai"
organization = organization or os.getenv("CAROLORGANIZATION") or None
if (domain is None and not org_level) or app_name is None or auth is None:
raise exceptions.MissingInfoCarolException(
"domain, app_name and auth must be specified as parameters, either "
"in the environment variables CAROLTENANT, CAROLAPPNAME,"
" CAROLAPPOAUTH or CAROLUSER+CAROLPWD and CAROLCONNECTORID"
)
if environment is None and os.getenv("ENV_DOMAIN") is not None:
raise exceptions.DeprecatedEnvVarException("ENV_DOMAIN", "CAROL_DOMAIN")
if organization is None and os.getenv("API_SUBDOMAIN") is not None:
raise exceptions.DeprecatedEnvVarException(
"API_SUBDOMAIN", "CAROLORGANIZATION"
)
self._current_user: T.Optional[ResponseType] = None
self._host_string = host
self._is_org_level = org_level
self._tenant: T.Optional[ResponseType] = None
self._user_agent = f"pyCarol/{__version__}"
self.app_name: T.Optional[str] = app_name
self.auth = auth
self.connector_id = auth.connector_id
self.domain = domain
self.environment = environment
self.host = _set_host(self.environment, domain, host, organization)
self.org = None
self.organization = organization
self.port = port
self.response: T.Optional[ResponseType] = None
self.session: T.Optional[requests.Session] = None
self.verbose = verbose
self.auth.login(self)
@property
def current_user(self) -> ResponseType:
"""Return the current user.
Returns:
Carol's API response payload.
"""
endpoint = (
"v2/orgUsers/current" if self._is_org_level is True else "v2/users/current"
)
if self._current_user is None:
self._current_user = self.call_api(endpoint)
return self._current_user
@property
def tenant(self) -> T.Optional[ResponseType]:
"""Return the current tenant.
Returns:
Carol's API response payload.
"""
if self._tenant is None and not self._is_org_level:
self._tenant = Tenant(self).get_tenant_by_domain(self.domain)
return self._tenant
[docs] def call_api(
self,
path: str,
method: T.Optional[str] = None,
data=None,
auth: bool = True,
params=None,
content_type: T.Optional[str] = "application/json",
retries: int = 8,
session: T.Optional[requests.Session] = None,
backoff_factor: float = 0.5,
status_forcelist: T.Tuple[int, ...] = (502, 503, 504, 524),
downloadable: bool = False,
method_whitelist: T.FrozenSet[str] = frozenset(
["HEAD", "TRACE", "GET", "PUT", "OPTIONS", "DELETE", "POST"]
),
errors: str = "raise",
extra_headers: T.Optional[T.Dict] = None,
files: T.Optional[T.Dict] = None,
prefix_path: str = "/api/",
**kwds,
) -> ResponseType:
"""Handle all the API calls.
Args:
path: API URI path. e.x. v2/staging/schema.
method: Set of uppercased HTTP method verbs that we should call on.
data: Same type as :class: `Session.request.data`
Object to send in the body of the request.
auth: If API call should be authenticated
params: in the query string for the :class:`requests.Request`.
content_type: Content type for the api call
retries: Number of retries for the API calls
session: It allows you to persist certain parameters across requests.
backoff_factor: Backoff factor to apply between attempts. It will sleep
for: {backoff factor} * (2 ^ ({retries} - 1)) seconds
status_forcelist: A set of integer HTTP status codes that we should force a
retry on. A retry is initiated if the request method is in
method_whitelist and the response status code is in status_forcelist.
downloadable: If the request will return a file to download.
method_whitelist: Set of uppercased HTTP method verbs that we should retry
on.
errors: {‘ignore’, ‘raise’}, default ‘raise’
If ‘raise’, then invalid request will raise an exception If ‘ignore’,
then invalid request will return the request response
extra_headers: extra headers to be sent.
files: Used when uploading files to carol. This will be sent to
:class: `requests.request`
prefix_path: Prefix path to be used to create the final url
'https://{self.host}:{self.port}{prefix_path}{path}.
kwds: `dict` default `None`
Extra parameters to be sent to :class: `requests.request`
Returns:
Dict with API response.
"""
if session is not None:
self.session = session
extra_headers = extra_headers or {}
url = f"https://{self.host}:{self.port}{prefix_path}{path}"
if method is None:
method = "GET" if data is None else "POST"
met_list = [
"HEAD",
"TRACE",
"GET",
"PUT",
"POST",
"OPTIONS",
"PATCH",
"DELETE",
"CONNECT",
]
assert method in met_list, f"API method must be {met_list}"
headers = {"accept": "application/json"}
if auth:
self.auth.authenticate_request(headers)
data_json = None
if method == "GET":
pass
elif method in ("POST", "DELETE", "PUT"):
if content_type is not None:
headers["content-type"] = content_type
if content_type == "application/json":
data_json = data
data = None
headers.update(extra_headers)
headers.update({"User-Agent": self._user_agent})
__count = 0
while True:
self.session = _retry_session(
retries=retries,
session=self.session,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=method_whitelist,
)
response = self.session.request(
method=method,
url=url,
data=data,
json=data_json,
headers=headers,
params=params,
files=files,
**kwds,
)
if self.verbose:
data_ = data_json if data_json is not None else data
print(f"Calling {method} {url}. Payload: {data_}. Params: {params}")
print(f" Headers: {headers}")
if response.ok or errors == "ignore":
if downloadable: # Used when downloading carol app file.
return response
response.encoding = "utf-8"
self.response = response
if response.text == "":
return {}
return json.loads(response.text)
if (response.reason == "Unauthorized") and isinstance(self.auth, PwdAuth):
if response.json().get("possibleResponsibleField") in [
"password",
"userLogin",
]:
raise exceptions.InvalidToken(response.text)
self.auth.get_access_token() # It will refresh token if Unauthorized
__count += 1
if __count < 5: # To avoid infinity loops
continue
raise Exception(
"Too many retries to refresh token.\n",
response.text,
response.status_code,
)
if response.status_code == 404:
raise exceptions.CarolApiResponseException(
response.text, response.status_code
)
raise Exception(response.text, response.status_code)
[docs] def issue_api_key(self, connector_id: T.Optional[str] = None) -> ResponseType:
"""Create an API key for a given connector.
Args:
connector_id: Connector ID to be used when creating the APIkey
Returns:
Dictionary with the API key.
"""
if connector_id is None:
connector_id = self.connector_id
resp = self.call_api(
"v2/apiKey/issue",
data={"connectorId": connector_id},
content_type="application/x-www-form-urlencoded",
)
return resp
[docs] def api_key_details(self, api_key: str, connector_id: str) -> ResponseType:
"""Display information about the API key.
Args:
api_key: Carol's api key
connector_id: Connector Id which API key was created.
Returns:
Dictionary with API key information.
"""
resp = self.call_api(
"v2/apiKey/details", params={"apiKey": api_key, "connectorId": connector_id}
)
return resp
[docs] def api_key_revoke(self, connector_id: str) -> ResponseType:
"""Revoke API key for the given connector_id.
Args:
connector_id: Connector Id which API key was created.
Returns:
Dictionary with API request response.
"""
resp = self.call_api(
"v2/apiKey/revoke",
method="DELETE",
content_type="application/x-www-form-urlencoded",
params={"connectorId": connector_id},
)
return resp
[docs] def switch_org_level(self):
"""Switch organization level."""
if self._is_org_level:
warnings.warn("already in org level.", UserWarning, stacklevel=3)
return
org = self._current_org()
self.auth.switch_org_context(org["mdmId"])
self._is_org_level = True
[docs] def switch_environment(
self,
env_name: T.Optional[str] = None,
env_id: T.Optional[str] = None,
app_name: T.Optional[str] = None,
org_name: T.Optional[str] = None,
org_id: T.Optional[str] = None,
) -> "Carol":
"""Switch org/environments.
If the user has access to this environment, it will be "logged in" in this new
org/environment.
Args:
env_name: Environment (tenant) name to switch the context to.
env_id: Environment (tenant) id to switch the context to.
app_name: App name in the target environment to switch the context to.
Only needed with using CDS.
org_name: The organization name to switch context to. If the same keep it
`None`.
org_id: The organization id to switch context to. If the same keep it
`None`.
Returns:
Carol
.. code:: python
from pycarol import Carol, Staging
carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pass'), )
carol.switch_environment('A')
Staging(carol_tenant_A).fetch_parquet(...) # fetch parquet from tenant A
# To switch back
carol.switch_environment('B')
#back to tenant B
"""
if self.org is None:
self.org = Organization(self).get_organization_info(self.organization)
current = self.get_current()
if (org_id is not None and org_id != current["org_id"]) or (
org_name is not None and org_name != current["org_name"]
):
# Switch to org context.
if not current["org_level"]:
self.switch_org_level()
if org_id is None:
org_id = Organization(self).get_organization_info(org_name)["mdmId"]
# Switch org.
self.auth.switch_org_context(org_id)
if env_name:
env_id = Tenant(self).get_tenant_by_domain(env_name)["mdmId"]
if env_id is not None:
self.auth.switch_context(env_id=env_id)
self._tenant = self._current_env()
self._is_org_level = False
else:
self._tenant = None
self._is_org_level = True
self.domain = env_name
# TODO: Today we cannot use CDS without a valid app name.
self.app_name = app_name
self.organization = self._current_org()["mdmName"]
self.host = _set_host(
domain=self.domain,
organization=self.organization,
environment=self.environment,
host=self._host_string,
)
return self
[docs] def switch_context(
self,
env_name: T.Optional[str] = None,
env_id: T.Optional[str] = None,
app_name: T.Optional[str] = None,
org_name: T.Optional[str] = None,
org_id: T.Optional[str] = None,
) -> SwitchContext:
"""Context manager to temporary have access to a second environment.
Args:
env_name: Environment (tenant) name to switch the context to.
env_id: Environment (tenant) id to switch the context to.
app_name: App name in the target environment to switch the context to.
Only needed with using CDS.
Returns:
SwitchContext
Examples:
.. code:: python
from pycarol import Carol, Staging
carol = Carol('B', 'teste', auth=PwdAuth('email@totvs.com.br', 'pwd'), )
with carol.switch_context('A') as carol_tenant_A:
# fetch parquet from tenant A
Staging(carol_tenant_A).fetch_parquet(...)
#back to tenant B
"""
if self.org is None:
self.org = Organization(self).get_organization_info(self.organization)
if env_name:
env_id = Tenant(self).get_tenant_by_domain(env_name)["mdmId"]
return SwitchContext(
parent_context=copy.deepcopy(self),
env_name=env_name,
env_id=env_id,
app_name=app_name,
org_name=org_name,
org_id=org_id,
)
def _current_env(self):
return self.call_api("v1/tenants/current", errors="ignore")
def _current_org(self):
return self.call_api("v1/organizations/current")
[docs] def get_current(self, level: str = "all") -> T.Dict[str, T.Any]:
"""Get current org/env information.
Args:
level: Possible Values:
"all": To get organization and environment information.
"org": To get organization information.
"env": To get environment information.
Returns:
Dictionary with keys org_Id, org_name, env_id, env_name
Raises:
ValueError when level not in 'org', 'env' or 'all'
"""
env = {}
org = {}
if level.lower() not in ["org", "env", "all"]:
raise ValueError(
f"level should be 'all', 'org', 'env', {level} was passed."
)
if level.lower() in ("env", "all"):
env = self._current_env()
if level.lower() in ("org", "all"):
org = self._current_org()
self._is_org_level = env.get("mdmName") is None
return {
"env_name": env.get("mdmName"),
"env_id": env.get("mdmId"),
"org_name": org.get("mdmName"),
"org_id": org.get("mdmId"),
"org_level": self._is_org_level,
}
[docs] def get_tenants_for_user(self) -> ResponseType:
"""Get all tenants for the current user.
Returns:
Dict
"""
return self.call_api("v1/users/assignedTenantsForCurrentUser", method="GET")
def _prepare_auth(
app_oauth: T.Optional[str] = None,
auth: T.Optional[T.Union[ApiKeyAuth, PwdAuth]] = None,
connector_id: T.Optional[str] = None,
password: T.Optional[str] = None,
username: T.Optional[str] = None,
) -> T.Union[ApiKeyAuth, PwdAuth]:
app_oauth = app_oauth or os.getenv("CAROLAPPOAUTH") or None
connector_id = (
connector_id or os.getenv("CAROLCONNECTORID") or __CONNECTOR_PYCAROL__
)
password = password or os.getenv("CAROLPWD") or None
username = username or os.getenv("CAROLUSER") or None
if auth is not None:
if auth.connector_id is None:
auth.set_connector_id(connector_id)
return auth
if username is not None and password is not None:
auth = PwdAuth(user=username, password=password)
auth.set_connector_id(connector_id)
return auth
if app_oauth is not None and connector_id is not None:
auth = ApiKeyAuth(app_oauth)
auth.set_connector_id(connector_id)
return auth
raise exceptions.MissingInfoCarolException(
"either `auth` or `username/password` or `api_key` or "
"pycarol env variables must be set."
)
def _set_host(
environment: str,
domain: T.Optional[str],
host: T.Optional[str] = None,
organization: T.Optional[str] = None,
) -> str:
"""Set the host to be used.
Args:
domain: Tenant name.
e.x., tenant.carol.ai
environment: There are three possible values today:
1. 'carol.ai' for the production environment
2. 'karol.ai' for the explore environment
3. 'qarol.ai' for the QA environment
host: Host to be used.
organization: Organization domain.
Returns:
host
"""
if host is not None:
return host
if organization is not None:
return f"{organization}.{environment}"
return f"{domain}.{environment}"
def _retry_session(
retries: int = 5,
session: T.Optional[requests.Session] = None,
backoff_factor: float = 0.5,
status_forcelist: T.Tuple[int, ...] = (500, 502, 503, 504, 524),
method_whitelist: T.FrozenSet[str] = frozenset(
["HEAD", "TRACE", "GET", "PUT", "OPTIONS", "DELETE"]
),
) -> requests.Session:
"""Handle retries between calls.
Args:
retries: Number of retries for the API calls
session: Session object dealt `None`
It allows you to persist certain parameters across requests.
backoff_factor: Backoff factor to apply between attempts. It will sleep
for: {backoff factor} * (2 ^ ({retries} - 1)) seconds
status_forcelist: A set of integer HTTP status codes that we should force a
retry on. A retry is initiated if the request method is in
method_whitelist and the response status code is in status_forcelist.
method_whitelist: `iterable` , default frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS', 'DELETE'])). Set of uppercased HTTP method verbs that
we should retry on.
Returns:
session
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=method_whitelist,
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session