from pycarol import Carol, Staging, Connectors, DataModel
from collections import defaultdict
import itertools
[docs]def find_keys(node, kv):
"""Find recursively all the values from a given key.
Args:
node (dict of dict): Nested dictionary.
kv (str): dictionary key to find.
Yields:
dict: dictionary
"""
if isinstance(node, list):
for i in node:
yield from find_keys(i, kv)
elif isinstance(node, dict):
if kv in node:
yield node[kv]
for j in node.values():
yield from find_keys(j, kv)
[docs]def unroll_list(l):
"""Unroll a list of lists to a flat list
Args:
l (list of lists): List of list to unroll
Yields:
list: unrolled list.
"""
if isinstance(l, list):
for i in l:
yield from unroll_list(i)
else:
yield l
[docs]def get_staging_prefix(carol, staging_prefix, connector_name, connector_id):
"""Create the prefix for a given set of staging_prefix, connector_name, connector_id
Args:
carol: `pycarol.Carol`
Carol() object.
connector_name: `str`
Connector Name
prefix: 'str` default `DM_`
prefix to add to the data model name. e.g., if dm_name='mydatamoldel', the result will be "DM_mydatamoldel`.
This is only applied for DataModel.
staging_prefix: 'str` default `None`
Prefix for the staging name. e.g., if staging_prefix='connector_name` the output will be:
{ "connector_name_stag3" : {"connector_name_stag1", "connector_name_stag2"}}
Possible values are: 'connector_name', 'connector_id', None
"""
conn = Connectors(carol)
if staging_prefix is None:
staging_prefix = ''
elif staging_prefix == 'connector_name':
if connector_name is None:
connector_name = conn.get_by_id(connector_id)['mdmName']
staging_prefix = connector_name + '_'
elif staging_prefix == 'connector_id':
if connector_id is None:
connector_id = conn.get_by_name(connector_name)['mdmId']
staging_prefix = connector_id + '_'
else:
raise ValueError(
f'`staging_prefix` should be either `connector_name`, `connector_id` or `None` {staging_prefix} was used'
)
return staging_prefix, connector_name, connector_id
[docs]def get_dm_relationship_constraints(carol, connector_name=None, connector_id=None, dm_prefix='DM_', staging_prefix=None):
"""
Create relationship between data models based on their relationship constraints
Args:
carol: `pycarol.Carol`
Carol() object.
connector_name: `str`
Connector Name
dm_prefix: 'str` default `DM_`
data model prefix to add to the data model name. e.g., if dm_name='mydatamoldel', the result will be "DM_mydatamoldel`
Returns: `defaultdict(set)`
dictionary { "dm1" : {"dm2", "dm3"}} where "dm1" depends on "dm2"" and "dm3"
"""
staging_prefix, connector_name, connector_id = get_staging_prefix(
carol, staging_prefix=staging_prefix,
connector_name=connector_name, connector_id=connector_id
)
conn = Connectors(carol)
conn_to_create = conn.get_all(
include_mappings=True, include_connectors=True, include_consumption=True)
conn_to_create = [
i for i in conn_to_create if i['mdmName'] == connector_name]
dm_mapping = defaultdict(set)
for staging_name, mapping in conn_to_create[0]['mdmEntityMappings'].items():
dm_mapping[dm_prefix + mapping['mdmMasterEntityName']].update([staging_name])
# find Relationship Constraints
dm = DataModel(carol)
dms = dm.get_all().template_dict.keys()
relationship_constraints = defaultdict(set)
for i in dms:
snap = dm.get_by_name(i)['mdmRelationshipConstraints']
relationship_constraints[dm_prefix + i].update([dm_prefix + j['mdmTargetEntityName'] for j in snap])
# if there are more than one staging mapping to a DM I need to restrict them too, sinse process staging mean
# create golden records.
for parent_staging in dm_mapping[dm_prefix + i]:
relationship_constraints[staging_prefix+parent_staging].update(
[dm_prefix + j['mdmTargetEntityName'] for j in snap])
return relationship_constraints
[docs]def get_mapping_constraints(carol, connector_name=None, connector_id=None, dm_prefix='DM_', staging_prefix=None):
"""
Create relationship between data models and stagings in mappings.
Args:
carol: `pycarol.Carol`
Carol() object.
connector_name: `str`
Connector Name
connector_id: `str`
Connector ID
dm_prefix: 'str` default `DM_`
data model prefix to add to the data model name. e.g., if dm_name='mydatamoldel', the result will be "DM_mydatamoldel`.
This is only applied for DataModel.
staging_prefix: 'str` default `None`
Prefix for the staging name. e.g., if staging_prefix='connector_name` the output will be:
{ "connector_name_stag3" : {"connector_name_stag1", "connector_name_stag2"}}
Possible values are: 'connector_name', 'connector_id', None
Returns: `defaultdict(set)`
dictionary { "dm1" : {"stag1", "stag2"}} where "dm1" depends on "stag1"" and "stag2"
"""
lookup_types = ['LOOKUP_DATA_MODEL', 'LOOKUP_STAGING_TABLE', 'LOOKUP_MULTIPLE_DATA_MODELS',
'LOOKUP_MULTIPLE_STAGING_TABLES']
staging_prefix, connector_name, connector_id = get_staging_prefix(
carol, staging_prefix=staging_prefix,
connector_name=connector_name, connector_id=connector_id
)
conn = Connectors(carol)
conn_to_create = conn.get_all(
include_mappings=True, include_connectors=True, include_consumption=True)
conn_to_create = [
i for i in conn_to_create if i['mdmName'] == connector_name]
stag = Staging(carol)
dm_to_mapping = defaultdict(set)
for staging_source in conn_to_create[0]['mdmEntityMappings'].keys():
# staging_source = 'currency'
curr_map = conn_to_create[0]['mdmEntityMappings'][staging_source]
assert staging_source == curr_map['mdmStagingType']
staging_source = curr_map['mdmStagingType']
dm_target = curr_map['mdmMasterEntityName']
mapping_id = curr_map['mdmId']
entity_space = curr_map['mdmEntitySpace']
connector_id = curr_map['mdmConnectorId']
mappings_to_get = stag.get_mapping_snapshot(connector_id=connector_id, mapping_id=mapping_id,
entity_space=entity_space)
dm_to_mapping[dm_prefix +
dm_target].update([staging_prefix + staging_source])
# Get mapping lookups
for i in mappings_to_get[None]['fieldMappings']:
field_cleanse_rules = i['fieldCleanseRules']
for cleanse in field_cleanse_rules:
# it can be inside actions and falseActions if inside an If.
unest_func = list(itertools.chain(*(list(find_keys(cleanse, 'actions'))
+ list(find_keys(cleanse, 'falseActions'))
+ [[cleanse, ]])))
for cleanse_func in unest_func:
if cleanse_func['fieldFunction']['mdmName'] in lookup_types:
if "DATA_MODEL" in cleanse_func['fieldFunction']['mdmName']:
_prefix = dm_prefix
# index 0 sinde it is the only parameter for DM lookups.
idx = 0
elif "STAGING_TABLE" in cleanse_func['fieldFunction']['mdmName']:
# index 1 sinde the first param is the connector name.
idx = 1
_prefix = staging_prefix
dm_to_mapping[staging_prefix + staging_source].update(
[_prefix + cleanse_func['parameterValues'][idx]])
return dm_to_mapping
[docs]def get_etl_constraints(carol, connector_name=None, connector_id=None, staging_prefix=None):
"""
Create relationship between stagings in ETLs.
Args:
carol: `pycarol.Carol`
Carol() object.
connector_name: `str`
connector name to get the relationships.
connector_id: `str`
connector ID to get the relationships.
staging_prefix: 'str` default `None`
Prefix for the staging name. e.g., if staging_prefix='connector_name` the output will be:
{ "connector_name_stag3" : {"connector_name_stag1", "connector_name_stag2"}}
Possible values are: 'connector_name', 'connector_id', None
Returns: `defaultdict(set)`
dictionary { "stag3" : {"stag1", "stag2"}} where "stag3" depends on "stag1"" and "stag2"
"""
if connector_id is None and connector_name is None:
raise ValueError(
'Either connector_id or connector_name must be set.')
conn = Connectors(carol)
staging_prefix, connector_name, connector_id = get_staging_prefix(
carol, staging_prefix=staging_prefix,
connector_name=connector_name, connector_id=connector_id
)
st = conn.get_all_stagings(
connector_name=connector_name, connector_id=connector_id)
etls = conn.get_etl_information(
connector_name=connector_name, connector_id=connector_id)
rel_etls = defaultdict(set)
for etl in etls:
if etl.get("mdmETLType", "") == 'JOIN':
source = staging_prefix + etl['mdmSourceEntityName']
other_side = staging_prefix + etl['mdmOtherSourceEntityName']
etl_target = list(find_keys(etl, 'mdmParameterValues'))
etl_target = list(unroll_list(etl_target))
# get the target table.
assert len(etl_target) == 1
etl_target = etl_target[0]
# get source.
etl_source = set(itertools.chain(
*[i.keys() for i in find_keys(etl, 'mdmStagingTypeToFieldForMatching')]))
etl_source = [staging_prefix + i for i in etl_source]
rel_etls[staging_prefix + etl_target].update(etl_source)
# Need to add the sourve->other side to the list. We need to guarantee that the other side is there to do the join.
rel_etls[source].update({other_side})
elif etl.get("mdmETLType", "") in ['SPLIT', 'DUPLICATE']:
# get all actions
source = staging_prefix + etl['mdmSourceEntityName']
split_actions = list(find_keys(etl, 'mdmParameterValues'))
split_actions = set(unroll_list(split_actions))
for target in split_actions:
if target in st:
rel_etls[staging_prefix+target].update({source})
else:
raise ValueError(
f'mdmETLType are "SPLIT", "DUPLICATE" or "JOIN", {etl.get("mdmETLType", "")} were received')
return rel_etls
[docs]def generate_dependency_graph(carol, connector_list=None, dm_prefix='DM_', staging_prefix='connector_name', only_mapping=False):
"""Generates dependency graph for a list of connectors.
Args:
carol (pycarol.Carol): INstance of Carol
connector_list (list, optional): List of connectors to create the dependency graph. Defaults to None.
dm_prefix: 'str` default `DM_`
data model prefix to add to the data model name. e.g., if dm_name='mydatamoldel', the result will be "DM_mydatamoldel`.
This is only applied for DataModel.
staging_prefix: 'str` default `None`
Prefix for the staging name. e.g., if staging_prefix='connector_name` the output will be:
{ "connector_name_stag3" : {"connector_name_stag1", "connector_name_stag2"}}
Possible values are: 'connector_name', 'connector_id', None
only_mapping (bool, optional): Only generates mappings and constraints dependency graph. Defaults to False.
Returns:
default: defaultdict(set). Dependency graph.
"""
if connector_list is None:
connector_list = Connectors(carol).get_all()
connector_list = [i['mdmName'] for i in connector_list]
all_rel = defaultdict(set)
for connector_name in connector_list:
dm_constraints = get_dm_relationship_constraints(carol, dm_prefix=dm_prefix, connector_name=connector_name, )
mapping_constraint = get_mapping_constraints(carol, connector_name=connector_name, dm_prefix=dm_prefix, staging_prefix=staging_prefix)
if not only_mapping:
etl_constraints = get_etl_constraints(carol, connector_name=connector_name, staging_prefix=staging_prefix )
else:
etl_constraints = defaultdict(set)
for d in [dm_constraints, etl_constraints, mapping_constraint]:
for key, value in d.items():
all_rel[key].update(all_rel[key] | value)
return all_rel