import json
from collections import defaultdict
from .utils.deprecation_msgs import _deprecation_msgs
from .utils.miscellaneous import unroll_list, find_keys
[docs]class Connectors:
"""
This class handle all APIs related to Carol connectors
Args:
carol: class: pycarol.Carol
"""
def __init__(self, carol):
self.carol = carol
[docs] def create(self, name, label=None, group_name="Others", overwrite=False):
"""
Create a connector
Args:
name: 'str'
Connector name
label: 'str'
Connector label in UI
group_name: `str` default "Others"
Connector group name in UI
overwrite: `bool` default `False`
Overwrite if already exists.
"""
if label is None:
label = name
resp = self.carol.call_api('v1/connectors', data={
'mdmName': name,
'mdmGroupName': group_name,
'mdmLabel': {"en-US": label}
}, errors='ignore')
if resp.get('mdmId') is not None:
return resp.get('mdmId')
if ('already exists' in resp.get('errorMessage', [])):
if overwrite:
self.delete_by_name(name)
return self.create(name, label, group_name, False)
else:
return self.get_by_name(name)['mdmId']
else:
raise Exception(resp)
[docs] def get_by_name(self, name, errors='raise'):
"""
Get connector information using the connector name
Args:
name: 'str'
Connector Name
errors: {‘ignore’, ‘raise’}, default ‘raise’
If ‘raise’, then invalid request will raise an exception If ‘ignore’,
then invalid request will return the request response
Returns: `dict`
connector information.
"""
resp = self.carol.call_api(f'v1/connectors/name/{name}', errors=errors)
return resp
[docs] def get_by_id(self, id, errors='raise'):
"""
Get connector information using the connector name
Args:
id: 'str'
Connector ID
errors: {‘ignore’, ‘raise’}, default ‘raise’
If ‘raise’, then invalid request will raise an exception If ‘ignore’,
then invalid request will return the request response
Returns: `dict`
connector information.
"""
resp = self.carol.call_api(f'v1/connectors/{id}', errors=errors)
return resp
[docs] def delete_by_name(self, name, force_deletion=True):
"""
Delete connector by name
Args:
name: `str`
Connector name
force_deletion: `bool` default `True`
Force the deletion
Returns: None
"""
mdm_id = self.get_by_name(name)['mdmId']
self.delete_by_id(mdm_id, force_deletion)
[docs] def delete_by_id(self, connector_id=None, mdm_id=None, force_deletion=True):
"""
Delete Connector by ID
Args:
connector_id: `str``
Connector ID
mdm_id: `str``
Connector ID
force_deletion: `bool` default `True`
Force the deletion
Returns: None
"""
if connector_id is None and mdm_id is not None:
_deprecation_msgs(
"mdm_id is deprecated and will be removed, use connector_id")
connector_id = connector_id if connector_id else mdm_id
if connector_id is None:
raise ValueError('Connector Id must be set')
self.carol.call_api(
f'v1/connectors/{connector_id}?forceDeletion={force_deletion}', method='DELETE')
[docs] def get_all(self, offset=0, page_size=-1, sort_order='ASC', sort_by=None, include_connectors=False,
include_mappings=False, include_consumption=False, print_status=True, save_results=False,
filename='connectors.json'):
"""
Get all connectors.
Args:
offset: `int`, default `0`
Offset for the response.
page_size: `int`, default `1000`
Number of records in each call.
sort_by: `str` default `None`
Field to sort by
sort_order: `str`, default `ASC`
Sort Order. Possible values "ASC" and "DESC"
include_connectors: `bool` default `False`
Include connector information
include_mappings: `bool` default `False`
Include mapping information
include_consumption: `bool` default `False`
Include consumption information
print_status: `bool` default `True`
Print status of the request.
save_results: `bool` default `False`
If save json with the results.
filename: `str` default `None`
Filename to save
Returns: `dict`
Connector information
"""
params = {"offset": offset, "pageSize": str(page_size), "sortOrder": sort_order,
"includeMappings": include_mappings, "includeConsumption": include_consumption,
"includeConnectors": include_connectors}
if sort_by is not None:
params['sortBy'] = sort_by
connectors = []
set_param = True
count = offset
total_hits = float("inf")
if save_results:
file = open(filename, 'w', encoding='utf8')
while count < total_hits:
conn = self.carol.call_api('v1/connectors', params=params)
count += conn['count']
if set_param:
total_hits = conn["totalHits"]
set_param = False
conn = conn['hits']
if not conn:
break
connectors.extend(conn)
params['offset'] = count
if print_status:
print('{}/{}'.format(count, total_hits), end='\r')
if save_results:
file.write(json.dumps(conn, ensure_ascii=False))
file.write('\n')
file.flush()
if save_results:
file.close()
return connectors
[docs] def stats(self, connector_id=None, connector_name=None, all_connectors=False):
"""
Get connector stats
Args:
connector_id: `str`, default `None`
Connector Id
connector_name: `str`, default `None`
Connectot name
all_connectors: `bool` default `False`
Get from all connectors.
Returns: `dict`
Dict with the status of the connectors.
"""
if all_connectors:
response = self.carol.call_api('v1/connectors/stats/all')
else:
if connector_name:
connector_id = self.get_by_name(connector_name)['mdmId']
else:
assert connector_id
response = self.carol.call_api(
'v1/connectors/{}/stats'.format(connector_id))
self._conn_stats = response['aggs']
return {key: list(value['stagingEntityStats'].keys()) for key, value in self._conn_stats.items()}
[docs] def staging_to_connectors_map(self):
"""
Create a dictionary where the mapping of connectors and stagings.
Returns: `dict`
Dict
"""
d = defaultdict(list)
connectors = self.get_all(print_status=False)
for connector in connectors:
current_connector = connector['mdmId']
conn_stats = self.stats(current_connector)
for i in conn_stats[current_connector]:
d[i].append(current_connector)
return d
[docs] def find_by_staging(self, staging_name=None):
"""
Find connector given a staging table
Args:
staging_name: `str` default `None`
Staging table name
Returns: `dict`
Connector information
"""
d = self.staging_to_connectors_map()
if staging_name:
conn = d.get(staging_name, None)
if conn is None:
raise ValueError(
'There is no staging named {}'.format(staging_name))
elif len(conn) > 1:
print('More than one connector with the staging {}'.format(
staging_name))
return conn
[docs] def get_dm_mappings(self, connector_id=None, connector_name=None, staging_name=None,
dm_id=None, dm_name=None, reverse_mapping=False, offset=0, page_size=1000, sort_by=None,
sort_order='ASC', all_connectors=False):
"""
Get data models mappings information.
Args:
connector_id: `str`, default `None`
Connector Id
connector_name: `str`, default `None`
Connectot name
staging_name: `str`, default `None`
Staging name
dm_id: `str`, default `None`
Data model Id
dm_name: `str`, default `None`
Data model name
reverse_mapping: `bool`, default `False`
If to return the reverse mapping.
offset: `int`, default `0`
Offset for the response.
page_size: `int`, default `1000`
Number of records in each call.
sort_by: `str` default `None`
Field to sort by
sort_order: `str`, default `ASC`
Sort Order. Possible values "ASC" and "DESC"
all_connectors: `bool`, default `False`
It will return all the mapping for all connectors/stagings
Returns: `dict`
Mapping json definition
"""
if all_connectors:
payload = {
"offset": offset,
"sortBy": sort_by,
"pageSize": page_size,
"sortOrder": sort_order
}
url = "v1/connectors/mappings/all"
else:
if connector_id is None and connector_name is None:
raise ValueError(
'Either connector_id or connector_name must be set if `all_connectors=False`. ')
connector_id = connector_id if connector_id else self.get_by_name(connector_name)[
'mdmId']
if dm_name is not None:
url_dm = f"v1/entities/templates/name/{dm_name}"
dm_id = self.carol.call_api(url_dm, method='GET')['mdmId']
payload = {
"reverseMapping": reverse_mapping,
"entityId": dm_id,
"stagingType": staging_name,
"offset": offset,
"sortBy": sort_by,
"pageSize": page_size,
"sortOrder": sort_order
}
url = f"v1/connectors/{connector_id}/entityMappings"
set_param = True
to_get = float('inf')
count = 0
self.resp = []
while count < to_get:
resp = self.carol.call_api(url, method='GET', params=payload)
if set_param:
self.total_hits = resp["totalHits"]
to_get = resp["totalHits"]
set_param = False
count += resp['count']
query = resp['hits']
if not query:
break
self.resp.extend(query)
payload['offset'] = count
return self.resp
[docs] def get_entity_mappings(
self, connector_name=None, connector_id=None,
reverse_mapping=False, staging_name=None,
offset=0, page_size=1000, sort_order='ASC',
sort_by=None, print_status=False, errors='raise'
):
"""
Get all Entity Mappings.
Args:
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
reverse_mapping: `bool` default `False`
When using with consumer.False if you don't know what consumer is.
staging_name: str` default None
Name of the staging to find the mapping. Returns 404 if there is not mapping..
offset: `int`, default 0
Offset for pagination. Only used when `scrollable=False`
page_size: `int`, default 100
Number of records downloaded in each pagination. The maximum value is 1000
sort_order: `str`, default 'ASC'
Sort ascending ('ASC') vs. descending ('DESC').
sort_by: `str`, default `None`
Name to sort by.
print_status: `bool`, default `False`
Print the number of records in each interaction.
errors: {‘ignore’, ‘raise’}, default ‘raise’
If ‘raise’, then invalid request will raise an exception If ‘ignore’,
then invalid request will return the request response.
Returns: list of dict
List of dict with mappings,
"""
connector_id = connector_id if connector_id else self.get_by_name(connector_name)[
'mdmId']
template_data = []
count = offset
query_params = {
"offset": offset, "pageSize": str(page_size),
"sortOrder": sort_order,
"sortBy": sort_by,
'stagingType': staging_name,
'reverseMapping': reverse_mapping,
}
set_param = True
total_hits = float("inf")
while count < total_hits:
query = self.carol.call_api(path=f'v1/connectors/{connector_id}/entityMappings', method="GET",
params=query_params, errors=errors)
if query.get('hits') is None:
if len(template_data) == 0:
# when errors==ignore it will return the error msg.
return query
else:
return template_data
if query['count'] == 0:
print('There are no more results.')
print(f'Expecting {total_hits}, response = {count}')
break
count += query['count']
if set_param:
total_hits = query["totalHits"]
set_param = False
query = query['hits']
template_data.extend(query)
query_params['offset'] = count
if print_status:
print(f'{count}/{total_hits}', end='\r')
return template_data
def _play_pause_mapping(self, kind, entity_mapping_id=None, staging_name=None,
connector_name=None, connector_id=None,
reverse_mapping=False,
process_cds=True, ):
connector_id = connector_id if connector_id else self.get_by_name(connector_name)[
'mdmId']
if entity_mapping_id is None:
if staging_name is None:
raise ValueError(
"Either staging_name or entity_mapping_id must be set.")
entity_mappings = self.get_entity_mappings(
connector_id=connector_id, staging_name=staging_name)
entity_mappings = [i['mdmId'] for i in entity_mappings]
else:
if isinstance(entity_mapping_id, str):
entity_mappings = [entity_mapping_id]
elif isinstance(entity_mapping_id, list):
entity_mappings = entity_mapping_id
else:
raise ValueError(
'entity_mapping_id must be string of list of string.')
responses = {}
for _mapping in entity_mappings:
entity_mapping_id = _mapping
params = {
'reverseMapping': reverse_mapping,
'processCds': process_cds,
}
resp = self.carol.call_api(path=f'v1/connectors/{connector_id}/entityMappings/{entity_mapping_id}/{kind}',
method="POST", params=params, )
responses[entity_mapping_id] = resp
return responses
[docs] def play_mapping(
self, entity_mapping_id=None, staging_name=None,
connector_name=None, connector_id=None,
reverse_mapping=False,
process_cds=True,
):
"""
Start mapping.
Args:
entity_mapping_id: `str` or list of strings
Mapping ids to be resumed.
staging_name:
Staging name for starting the mapping
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
reverse_mapping: `bool` default `False`
When using with consumer.False if you don't know what consumer is.
process_cds: `bool` default `True`
Process pending records after play.
Returns: dict
Dictionary with the response of all mappings played.
"""
responses = self._play_pause_mapping(
kind='play', entity_mapping_id=entity_mapping_id, staging_name=staging_name,
connector_name=connector_name, connector_id=connector_id,
reverse_mapping=reverse_mapping,
process_cds=process_cds,
)
return responses
[docs] def pause_mapping(
self, entity_mapping_id=None, staging_name=None,
connector_name=None, connector_id=None,
reverse_mapping=False,
):
"""
Pause mapping.
Args:
entity_mapping_id: `str` or list of strings
Mapping ids to be stopped.
staging_name:
Staging name to stop the mappings
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
reverse_mapping: `bool` default `False`
When using with consumer.False if you don't know what consumer is.
Returns: dict
Dictionary with the response of all mappings played.
"""
responses = self._play_pause_mapping(
kind='pause', entity_mapping_id=entity_mapping_id, staging_name=staging_name,
connector_name=connector_name, connector_id=connector_id,
reverse_mapping=reverse_mapping,
)
return responses
def _play_pause_etl(self, kind, staging_name=None,
connector_name=None, connector_id=None, ):
connector_id = connector_id if connector_id else self.get_by_name(connector_name)[
'mdmId']
resp = self.carol.call_api(
path=f'v1/etl/staging/{connector_id}/{staging_name}/{kind}', method='POST')
return resp
[docs] def play_etl(
self, staging_name=None,
connector_name=None, connector_id=None,
):
"""
Start ETL processes.
Args:
staging_name:
Staging name for starting the ETLs
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
Returns: dict
Dictionary with the response of all mappings played.
"""
responses = self._play_pause_etl(
kind='play', staging_name=staging_name,
connector_name=connector_name, connector_id=connector_id,
)
return responses
[docs] def pause_etl(
self, staging_name=None,
connector_name=None, connector_id=None,
):
"""
Pause ETL.
Args:
staging_name:
Staging name to stop the ETLs
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
Returns: dict
Dictionary with the response Carol's response
"""
responses = self._play_pause_etl(
kind='pause', staging_name=staging_name,
connector_name=connector_name, connector_id=connector_id,
)
return responses
[docs] def get_all_stagings(self, connector_name=None, connector_id=None):
"""
Get all stagings from a connector.
Args:
connector_name: `str`, `str`, default `None`
Connector Name
connector_id: `str`, `str`, default `None`
Connector ID
Returns: list
List of staging tables.
"""
if connector_name is not None:
connector_id = self.get_by_name(connector_name)['mdmId']
elif connector_id is None:
raise ValueError(
'Either `connector_id` or `connnector_name` should be set.')
c = self.carol.call_api(path=f"v1/staging/connectors/{connector_id}/tables")
return sorted(c)
[docs] def pause_single_staging_etl(self, staging_name, output_list, connector_name=None, connector_id=None):
"""Pause a single staging ETL based on its output.
Args:
staging_name (str): Staging name
output_list (list): List of output staging names
connector_name (str, optional): Connector name. Defaults to None.
connector_id (str, optional): Connector ID. Defaults to None.
Returns:
list: Carol's response
"""
if connector_id is None and connector_name is None:
raise ValueError('Either connector_id or connector_name must be set.')
connector_id = connector_id if connector_id else self.get_by_name(connector_name)['mdmId']
url = f'v1/etl/connector/{connector_id}/sourceEntity/{staging_name}/published'
all_etls = self.carol.call_api(url, )
r = []
for etl in all_etls:
if len(set(output_list) - set(unroll_list(list(find_keys(etl, 'mdmParameterValues'))))) == 0:
mdm_id = etl['mdmId']
print(f'pausing etl {mdm_id} for {staging_name}')
url = f'v1/etl/{mdm_id}/PRODUCTION/pause'
resp = self.carol.call_api(url, method='PUT')
if not resp['success']:
raise ValueError(
f'Problem starting ETL {connector_name}/{staging_name}\n {resp}')
r.append(resp)
return r
[docs] def drop_single_etl(self, staging_name, output_list, connector_name=None, connector_id=None):
"""
Drop ETL based on the outputs of a given ETL.
Args:
login: login: pycarol.Carol
Carol() instance.
staging_name: str
staging to drop etls from
output_list: list
output list of the etl to drop. It will only drop the ETL if all the outputs are present.
connector_name: str
connector_name to drop etls from
connector_id: str
connector_id to drop etls from
Returns: None
"""
if connector_id is None and connector_name is None:
raise ValueError('Either connector_id or connector_name must be set.')
connector_id = connector_id if connector_id else self.get_by_name(connector_name)['mdmId']
url = f'v1/etl/connector/{connector_id}/sourceEntity/{staging_name}'
all_etls = self.carol.call_api(url, )
for etl in all_etls:
if len(set(output_list) - set(unroll_list(list(find_keys(etl, 'mdmParameterValues'))))) == 0:
mdm_id = etl['mdmId']
print(f'deleting etl {mdm_id} for {staging_name}')
self.drop_etls([mdm_id])
[docs] def drop_etls(self, etl_list):
"""
Drop ETLs from ETL list.
Args:
login: login: pycarol.Carol
Carol() instance.
etl_list: list
list of ETLs mdm_ids to delete.
Returns: None
"""
for mdm_id in etl_list:
try:
# Delete drafts.
self.carol.call_api(f'v2/etl/{mdm_id}', method='DELETE',
params={'entitySpace': 'WORKING'})
except Exception:
pass
self.carol.call_api(f'v2/etl/{mdm_id}', method='DELETE',
params={'entitySpace': 'PRODUCTION'})