import json
import itertools
import time
import copy
import warnings
import pandas as pd
import asyncio
from .data_models_fields import DataModelFields
from .data_model_types import DataModelTypeIds
from ..utils.importers import _import_dask, _import_pandas
from ..verticals import Verticals
from ..storage import Storage
from ..query import Query, delete_golden
from ..connectors import Connectors
from ..filter import RANGE_FILTER as RF
from ..filter import TYPE_FILTER, Filter, MAXIMUM, MINIMUM
from ..utils.miscellaneous import ranges
from ..utils import async_helpers
from ..utils.miscellaneous import stream_data
from .. import _CAROL_METADATA_GOLDEN
from ..utils.miscellaneous import drop_duplicated_parquet, _deprecation_msgs
_DATA_MODEL_TYPES_MAPPING = {
"boolean": bool,
"date": str, # TODO should it be pd.datetime?
"long": int,
"double": float,
"nested": str,
"string": str,
"binary": str,
"enum": str,
"object": str,
"geopoint": str
}
class DataModel:
def __init__(self, carol):
self.carol = carol
self.fields_dict = {}
self.entity_template_ = {}
def _build_query_params(self):
if self.sort_by is None:
self.query_params = {"offset": self.offset, "pageSize": str(self.page_size), "sortOrder": self.sort_order}
else:
self.query_params = {"offset": self.offset, "pageSize": str(self.page_size), "sortOrder": self.sort_order,
"sortBy": self.sort_by}
def _get_name_type_data_models(self, fields):
f = {}
for field in fields:
if field.get('mdmMappingDataType', None) not in ['NESTED', 'OBJECT']:
f.update({field['mdmName']: field['mdmMappingDataType']})
else:
f[field['mdmName']] = self._get_name_type_data_models(field['mdmFields'])
return f
def _get(self, id, by='id'):
if by == 'name':
url = f"v1/entities/templates/name/{id}"
elif by == 'id':
url = f"v1/entities/templates/{id}/working"
else:
raise print('Type incorrect, it should be "id" or "name"')
# TODO: Add 'Not Found' and `is in Deleted state`
resp = self.carol.call_api(url, method='GET')
self.entity_template_ = {resp['mdmName']: resp}
self.fields_dict.update({resp['mdmName']: self._get_name_type_data_models(resp['mdmFields'])})
return resp
def fetch_parquet(self, dm_name, merge_records=True, backend='pandas',
return_dask_graph=False,
columns=None, return_metadata=False, callback=None,
max_hits=None, cds=False, max_workers=None, file_pattern=None,
return_callback_result=False):
"""
Fetch parquet from Golden.
Args:
dm_name: `str`
Data model name to be imported
merge_records: `bool`, default `True`
This will keep only the most recent record exported. Sometimes there are updates and/or deletions and
one should keep only the last records.
backend: ['dask','pandas'], default `dask`
if to use either dask or pandas to fetch the data
return_dask_graph: `bool`, default `false`
If to return the dask graph or the dataframe.
columns: `list`, default `None`
List of columns to fetch.
return_metadata: `bool`, default `False`
To return or not the fields ['mdmId', 'mdmCounterForEntity']
callback: `callable`, default `None`
Function to be called each downloaded file.
max_hits: `int`, default `None`
Number of records to get.
cds: `bool`, default `False`
Get records from CDS.
max_workers: `int` default `None`
Number of workers to use when downloading parquet files with pandas back-end.
file_pattern: `str` default `None`
File pattern to filter data when fetching from CDS. e.g.
file_pattern='2019-11-25' will fetch only CDS files that start with `2019-11-25`.
return_callback_result `bool` default `False`
If a callback is used, it will return the result of the response of the callback. This will skip all the
operation to merge records and return selected columns.
:return:
"""
if callback:
assert callable(callback), \
f'"{callback}" is a {type(callback)} and is not callable.'
if not columns: # if an empty list was sent.
columns = None
if isinstance(columns, str):
columns = [columns]
assert backend == 'dask' or backend == 'pandas'
if return_dask_graph:
assert backend == 'dask'
# validate export
if not cds:
_deprecation_msgs("`cds` option will be removed from pycarol 3.33. Consider use `cds=True`"
" to avoid problems. ")
dms = self._get_dm_export_stats()
import_type = 'golden'
else:
import_type = 'golden_cds'
if columns:
columns.extend(_CAROL_METADATA_GOLDEN)
storage = Storage(self.carol)
token_carolina = storage.backend.carolina.token
storage_space = storage.backend.carolina.get_bucket_name(import_type)
if backend == 'dask':
d = _import_dask(storage=storage, dm_name=dm_name,
import_type=import_type,
merge_records=merge_records,
return_dask_graph=return_dask_graph,
columns=columns)
elif backend == 'pandas':
d = _import_pandas(storage=storage, dm_name=dm_name,
import_type=import_type, columns=columns,
callback=callback, max_hits=max_hits,
max_workers=max_workers,
token_carolina=token_carolina,
storage_space=storage_space, file_pattern=file_pattern)
if d is None:
warnings.warn("No data to fetch!", UserWarning)
_field_types = self._get_name_type_DMs(self.get_by_name(dm_name)['mdmFields'])
cols_keys = list(_field_types)
if return_metadata:
cols_keys.extend(_CAROL_METADATA_GOLDEN)
elif columns:
columns = [i for i in columns if i not in _CAROL_METADATA_GOLDEN]
d = pd.DataFrame(columns=cols_keys)
for key, value in _field_types.items():
if isinstance(value, dict):
value = "STRING" # If nested we receive as a `STR`
d.loc[:, key] = d.loc[:, key].astype(_DATA_MODEL_TYPES_MAPPING.get(value.lower(), str), copy=False)
if columns:
columns = list(set(columns))
d = d[list(set(columns))]
return d
else:
raise ValueError(f'backend should be either "dask" or "pandas" you entered {backend}')
if (return_callback_result) and (callback is not None):
return d
if merge_records:
if not return_dask_graph:
d = drop_duplicated_parquet(d)
else:
d = d.set_index('mdmCounterForEntity', sorted=True) \
.drop_duplicates(subset='mdmId', keep='last') \
.reset_index(drop=True)
if not return_metadata:
to_drop = set(_CAROL_METADATA_GOLDEN).intersection(set(d.columns))
d = d.drop(labels=to_drop, axis=1)
return d
def get_all(self, offset=0, page_size=-1, sort_order='ASC',
sort_by=None, print_status=False,
save_file=None):
self.offset = offset
self.page_size = page_size
self.sort_order = sort_order
self.sort_by = sort_by
self._build_query_params()
self.template_dict = {}
self.template_data = []
count = self.offset
set_param = True
self.total_hits = float("inf")
if save_file:
assert isinstance(save_file, str)
file = open(save_file, 'w', encoding='utf8')
while count < self.total_hits:
url_filter = "v1/entities/templates"
query = self.carol.call_api(url_filter, params=self.query_params, method='GET')
if query['count'] == 0:
print('There are no more results.')
print('Expecting {}, reponse = {}'.format(self.total_hits, count))
break
count += query['count']
if set_param:
self.total_hits = query["totalHits"]
set_param = False
query = query['hits']
self.template_data.extend(query)
self.fields_dict.update({i['mdmName']: self._get_name_type_data_models(i['mdmFields'])
for i in query})
self.template_dict.update({i['mdmName']: {'mdmId': i['mdmId'],
'mdmEntitySpace': i['mdmEntitySpace'],
'mdmPublishedExists': i['mdmPublishedExists']}
for i in query})
self.query_params['offset'] = count
if print_status:
print('{}/{}'.format(count, self.total_hits), end='\r')
if save_file:
file.write(json.dumps(query, ensure_ascii=False))
file.write('\n')
file.flush()
if save_file:
file.close()
return self
def get_by_name(self, name):
return self._get(name, by='name')
def get_by_id(self, id):
return self._get(id, by='id')
def get_snapshot(self, dm_id, entity_space):
url_snapshot = f'v1/entities/templates/{dm_id}/snapshot?entitySpace={entity_space}'
resp = self.carol.call_api(url_snapshot, method='GET')
self.snapshot_ = {resp['entityTemplateName']: resp}
return resp
def export(self, dm_name=None, dm_id=None, sync_dm=True, full_export=False, delete_previous=False):
"""
Export datamodel to s3
This method will trigger or pause the export of the data in the datamodel to
CDS
:param dm_name: `str`, default `None`
Data model Name
:param dm_id: `str`, default `None`
Data model id
:param sync_dm: `bool`, default `True`
Sync the data model
:param full_export: `bool`, default `True`
Do a resync of the data model
:param delete_previous: `bool`, default `False`
Delete previous exported files.
:return: None
"""
if sync_dm:
status = 'RUNNING'
else:
status = 'PAUSED'
if dm_name:
dm_id = self.get_by_name(dm_name)['mdmId']
else:
assert dm_id
query_params = {"status": status, "fullExport": full_export,
"deletePrevious": delete_previous}
url = f'v1/entities/templates/{dm_id}/exporter'
return self.carol.call_api(url, method='POST', params=query_params)
def export_all(self, sync_dm=True, full_export=False, delete_previous=False):
"""
Export all datamodel to s3
This method will trigger or pause the export of the data in the datamodel to
CDS
:param sync_dm: `bool`, default `True`
Sync the data model
:param full_export: `bool`, default `True`
Do a resync of the data model
:param delete_previous: `bool`, default `False`
Delete previous exported files.
:return: None
"""
self.get_all()
for _name, i in self.template_dict.items():
dm_id = i['mdmId']
if i['mdmPublishedExists']:
self.export(dm_id=dm_id, sync_dm=sync_dm, full_export=full_export,
delete_previous=delete_previous)
else:
print(
f'Data Model `{_name}` is only in draft, and cannot be exported. Publish the Data Model to export it.')
def delete(self, dm_id=None, dm_name=None, entity_space='WORKING'):
# TODO: Check Possible entity_spaces
if dm_id is None:
assert dm_name is not None
resp = self.get_by_name(dm_name)
dm_id = resp['mdmId']
url = f"v1/entities/templates/{dm_id}"
querystring = {"entitySpace": entity_space}
return self.carol.call_api(url, method='DELETE', params=querystring)
def _get_name_type_DMs(self, fields):
f = {}
for field in fields:
if field.get('mdmMappingDataType', None) not in ['NESTED', 'OBJECT']:
f.update({field['mdmName']: field['mdmMappingDataType']})
else:
f[field['mdmName']] = self._get_name_type_DMs(field['mdmFields'])
return f
def _get_dm_export_stats(self):
"""
Get export status for data models
:return: `dict`
dict with the information of which data model is exporting its data.
"""
json_q = Filter.Builder(key_prefix="") \
.must(TYPE_FILTER(value="mdmEntityTemplateExport")).build().to_json()
query = Query(self.carol, index_type='CONFIG', page_size=1000, only_hits=False)
query.query(json_q, ).go()
dm_results = query.results
dm_results = [elem.get('hits', elem) for elem in dm_results
if elem.get('hits', None)]
dm_results = list(itertools.chain(*dm_results))
dm = self.get_all().template_data
dm = {i['mdmId']: i['mdmName'] for i in dm}
if dm_results is not None:
return {dm.get(i['mdmEntityTemplateId'], i['mdmEntityTemplateId'] + '_NOT_FOUND'): i for i in dm_results}
return dm_results
def _get_min_max(self, query_filter=None):
if query_filter is not None:
j = query_filter.type(self.datamodel_name) \
.aggregation_list([MINIMUM(name='MINIMUM', params=self.mdm_key), MAXIMUM(name='MAXIMUM',
params=self.mdm_key)]) \
.build().to_json()
else:
j = Filter.Builder() \
.type(self.datamodel_name) \
.aggregation_list([MINIMUM(name='MINIMUM', params=self.mdm_key), MAXIMUM(name='MAXIMUM',
params=self.mdm_key)]) \
.build().to_json()
query = Query(self.carol, index_type=self.index_type, only_hits=False, get_aggs=True, save_results=False,
print_status=True, page_size=0).query(j).go()
if query.results[0].get('aggs') is None:
return None, None
min_v = query.results[0]['aggs']['MINIMUM']['value']
max_v = query.results[0]['aggs']['MAXIMUM']['value']
print(f"Total Hits to reprocess: {query.total_hits}")
return min_v, max_v
def reprocess(self, datamodel_name, n_of_tasks=10, copy_or_move='move', record_type='ALL', query_filter=None):
"""
Reprocess records from a data model
:param datamodel_name: `str`
Data model name
:param n_of_tasks: `int`, default `10`
Number of process to split the reprocess.
:param copy_or_move: `str`, default `move`
Either `move` or `copy` to staging.
:param record_type: `str`, default `ALL`
Type of records to reprocess. `All`, `Golden` or `Rejected`
:param query_filter: `Filter.Builder Object`, default `None`
The Filter instance to reprocess the data on.
:return: None
"""
assert copy_or_move == 'copy' or copy_or_move == 'move', 'copy_or_move shoulb be "copy" or "move"'
if query_filter:
assert isinstance(query_filter, Filter.Builder)
if copy_or_move == 'copy':
copy_or_move = False
else:
copy_or_move = True
self.mdm_id = self.get_by_name(datamodel_name)['mdmId']
self.index_type = 'MASTER'
self.datamodel_name = datamodel_name + 'Golden'
self.mdm_key = 'mdmCounterForEntity'
min_v, max_v = self._get_min_max(query_filter=query_filter)
chunks = ranges(min_v, max_v, n_of_tasks)
print(f"Number of chunks: {len(chunks)}")
url_filter = f"v1/entities/templates/{self.mdm_id}/reprocess"
for c, i in enumerate(chunks):
if query_filter is not None:
query_filter_to_use = copy.deepcopy(query_filter)
json_query = query_filter_to_use.must(RF(key=self.mdm_key, value=i)).build().to_json()
else:
json_query = Filter.Builder() \
.must(RF(key=self.mdm_key, value=i)) \
.build().to_json()
query_params = {"recordType": record_type, "fuzzy": "false",
"deleteRecords": copy_or_move}
result = self.carol.call_api(url_filter, data=json_query, params=query_params)
print(f"To go: {c + 1}/{len(chunks)}")
def send_data(self, data, dm_name=None, dm_id=None, step_size=500, gzip=False, delete_old_records=False,
print_stats=True, max_workers=2, async_send=False):
"""
:param data: pandas data frame, json.
Data to be send to Carol
:param dm_name: `str`, default `None`
Data model name
:param dm_id: `str`, default `None`
Data model id
:param step_size: `int`, default `500`
Number of records to be sent in each iteration. Max size for each batch is 10MB
:param print_stats: `bool`, default `True`
If print the status
:param gzip: `bool`, default `True`
If send each batch as a gzip file.
:param delete_old_records: `bool`, default `False`
Delete previous records in the data model.
:param max_workers: `int`, default `2`
To be used with `async_send=True`. Number of threads to use when sending.
:param async_send: `bool`, default `False`
To use async to send the data. This is much faster than a sequential send.
:return: None
"""
self.gzip = gzip
extra_headers = {}
content_type = 'application/json'
if self.gzip:
content_type = None
extra_headers["Content-Encoding"] = "gzip"
extra_headers['content-type'] = 'application/json'
if dm_name:
dm_id = self.get_by_name(dm_name)['mdmId']
else:
assert dm_id
dm_name = self._get(dm_id, by='id')['mdmName']
if delete_old_records:
delete_golden(self.carol, dm_name)
is_df = False
if isinstance(data, pd.DataFrame):
is_df = True
data_size = data.shape[0]
_sample_json = data.iloc[0].to_json(date_format='iso')
elif isinstance(data, str):
data = json.loads(data)
data_size = len(data)
_sample_json = data[0]
else:
data_size = len(data)
_sample_json = data[0]
if (not isinstance(data, list)) and (not is_df):
data = [data]
data_size = len(data)
url = f"v1/entities/templates/{dm_id}/goldenRecords?async=true"
if async_send:
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(async_helpers.send_data_asynchronous(carol=self.carol,
data=data,
step_size=step_size,
url=url,
extra_headers=extra_headers,
content_type=content_type,
max_workers=max_workers,
compress_gzip=self.gzip))
loop.run_until_complete(future)
else:
for data_json, cont in stream_data(data=data,
step_size=step_size,
compress_gzip=self.gzip):
self.carol.call_api(url, data=data_json, extra_headers=extra_headers,
content_type=content_type, status_forcelist=[502, 429, 502],
method_whitelist=frozenset(['POST']))
if print_stats:
print('{}/{} sent'.format(cont, data_size), end='\r')
def create_mapping(self, staging_name, connector_id=None, connector_name=None, dm_name=None, dm_id=None,
publish=False):
"""
This method will create the link between the staging and a data model. If Publish=True it will publish how it is.
:param staging_name: `str`
Name of the staging to be mapped.
:param connector_id: `str`, `str`, default `None`
Staging's connector ID
:param connector_name: `str`, default `None`
Staging's connector name
:param dm_name: `int`, default `100`
Data Model name
:param dm_id: `bool`, default `True`
Data model ID
:param publish: `bool`, default `True`
If publish after create mapping.
:return: None
"""
if connector_name:
_con = Connectors(self.carol)
connector_id = _con.get_by_name(connector_name)['mdmId']
else:
assert connector_id
_con = Connectors(self.carol)
connector_name = _con.get_by_id(connector_id)['mdmName']
if dm_name:
dm_id = self.get_by_name(dm_name)['mdmId']
else:
assert dm_id
dm_name = self.get_by_id(dm_id)['mdmName']
url = f"v1/connectors/{connector_id}/entityMappings"
payload = {"mdmMasterEntityId": dm_id,
"mdmMasterEntityName": dm_name,
"mdmConnectorId": connector_id,
"mdmStagingType": staging_name
}
resp = self.carol.call_api(url, data=payload)
if publish:
url = f"v1/connectors/{connector_id}/entityMappings/{resp['mdmId']}/publish"
self.carol.call_api(url, method='POST')
return resp
class entIntType(object):
ent_type = 'long'
class entDoubleType(object):
ent_type = 'double'
class entStringType(object):
ent_type = "string"
class entNullType(object):
ent_type = "string"
class entBooleanType(object):
ent_type = "boolean"
class entArrayType(object):
ent_type = "nested"
class entObjectType(object):
ent_type = "object"
class entType(object):
@classmethod
def get_ent_type_for(cls, t):
"""docstring for get_schema_type_for"""
SCHEMA_TYPES = {
type(None): entNullType,
str: entStringType,
int: entIntType,
float: entDoubleType,
bool: entBooleanType,
list: entArrayType,
dict: entObjectType,
}
schema_type = SCHEMA_TYPES.get(t)
if not schema_type:
raise JsonEntTypeNotFound("There is no schema type for %s.\n Try:\n %s" % (
str(t), ",\n".join(["\t%s" % str(k) for k in SCHEMA_TYPES.keys()])))
return schema_type
[docs]class JsonEntTypeNotFound(Exception):
pass
class CreateDataModel(object):
def __init__(self, carol):
self.carol = carol
self.template_dict = {}
self.fields = DataModelFields(self.carol)
self.fields.possible_types()
self.all_possible_types = self.fields._possible_types
self.all_possible_fields = self.fields.fields_dict
def from_snapshot(self, snapshot, publish=False, overwrite=False):
_count = 0
while True:
url = 'v1/entities/templates/snapshot'
resp = self.carol.call_api(path=url, method='POST', data=snapshot, errors='ignore')
if ('already exists' in resp.get('errorMessage', 'asdf')) and (overwrite):
del_DM = DataModel(self.carol)
del_DM.get_by_name(snapshot['entityTemplateName'])
dm_id = del_DM.entity_template_.get(snapshot['entityTemplateName']).get('mdmId', None)
if dm_id is None: # if None
continue
entity_space = del_DM.entity_template_.get(snapshot['entityTemplateName'])['mdmEntitySpace']
del_DM.delete(dm_id=dm_id, entity_space=entity_space)
time.sleep(0.5) # waint for deletion
_count += 1
if _count > 5:
print(f"Something wrong coping {snapshot['entityTemplateName']}")
print(f"Data model was not copied: {resp}")
return
continue
break
print('Data Model {} created'.format(snapshot['entityTemplateName']))
self.template_dict.update({resp['mdmName']: resp})
if publish:
self.publish_template(resp['mdmId'])
def publish_template(self, dm_id):
url = f'v1/entities/templates/{dm_id}/publish'
resp = self.carol.call_api(path=url, method='POST')
return resp
def _check_verticals(self):
self.verticals_dict = Verticals(self.carol).all()
if self.vertical_ids is not None:
for key, value in self.verticals_dict.items():
if value == self.vertical_ids:
self.vertical_ids = value
self.vertical_names = key
return
else:
for key, value in self.verticals_dict.items():
if key == self.vertical_names:
self.vertical_ids = value
self.vertical_names = key
return
raise Exception('{}/{} are not valid values for mdmVerticalNames/mdmVerticalIds./n'
' Possible values are: {}'.format(self.vertical_names, self.vertical_ids,
self.verticals_dict))
def _check_entity_template_types(self):
self.template_type_dict = DataModelTypeIds(self.carol).all()
if self.entity_template_type_ids is not None:
for key, value in self.template_type_dict.items():
if value == self.entity_template_type_ids:
self.entity_template_type_ids = value
self.entity_template_type_names = key
return
else:
for key, value in self.template_type_dict.items():
if key == self.entity_template_type_names:
self.entity_template_type_ids = value
self.entity_template_type_names = key
return
raise Exception('{}/{} are not valid values for mdmEntityTemplateTypeNames/mdmEntityTemplateTypeIds./n'
' Possible values are: {}'.format(self.vertical_names, self.vertical_ids,
self.template_type_dict))
def _check_dm_name(self):
est_ = DataModel(self.carol)
est_.get_by_name(self.dm_name)
if est_.entity_template_ is not None:
raise Exception('mdm name {} already exist'.format(self.dm_name))
def create(self, dm_name, overwrite=False, vertical_ids=None, vertical_names=None, entity_template_type_ids=None,
entity_template_type_names=None, label=None, group_name='Others',
transaction_data_model=False):
self.dm_name = dm_name
self.group_name = group_name
if not label:
self.label = self.dm_name
else:
self.label = label
self.transaction_data_model = transaction_data_model
assert ((vertical_names is not None) or (vertical_ids is not None))
assert ((entity_template_type_ids is not None) or (entity_template_type_names is not None))
self.vertical_names = vertical_names
self.vertical_ids = vertical_ids
self.entity_template_type_ids = entity_template_type_ids
self.entity_template_type_names = entity_template_type_names
self._check_verticals()
self._check_entity_template_types()
if not overwrite:
self._check_dm_name()
payload = {"mdmName": self.dm_name, "mdmGroupName": self.group_name, "mdmLabel": {"en-US": self.label},
"mdmVerticalIds": [self.vertical_ids],
"mdmEntityTemplateTypeIds": [self.entity_template_type_ids],
"mdmTransactionDataModel": self.transaction_data_model, "mdmProfileTitleFields": []}
while True:
url_filter = "v1/entities/templates"
resp = self.carol.call_api(url_filter, data=payload, method='POST', errors='ignore')
# error handler for token
if ('already exists' in resp.get('errorMessage', [])) and (overwrite):
del_DM = DataModel(self.carol)
del_DM.get_by_name(self.dm_name)
dm_id = del_DM.entity_template_.get(self.dm_name).get('mdmId', None)
if dm_id is None: # if None
continue
entity_space = del_DM.entity_template_.get(self.dm_name)['mdmEntitySpace']
del_DM.delete(dm_id=dm_id, entity_space=entity_space)
time.sleep(0.5) # waint for deletion
continue
break
self.template_dict.update({resp['mdmName']: resp})
def _profile_title(self, profile_title, dm_id):
if isinstance(profile_title, str):
profile_title = [profile_title]
profile_title = [i.lower() for i in profile_title]
url = f"v1/entities/templates/{dm_id}/profileTitle"
resp = self.carol.call_api(path=url, method='POST', data=profile_title)
return resp
def add_field(self, field_name, dm_id=None, parent_field_id=""):
if dm_id is None:
assert self.dm_id
else:
est_ = DataModel(self.carol)
est_.get_by_id(dm_id)
if est_.entity_template_ == {}:
print('Template does not exisit')
return
self.dm_id = dm_id
_, template_ = est_.entity_template_.popitem()
self.current_fields = [i for i in template_['mdmFieldsFull'].keys()]
if field_name.lower() in self.current_fields:
print("'{}' already in the template".format(field_name))
return
field_to_send = self.all_possible_fields.get(field_name)
if field_to_send is None:
print('Field does not exist')
return
querystring = {"parentFieldId": parent_field_id}
url = f"v1/entities/templates/{self.dm_id}/onboardField/{field_to_send['mdmId']}"
resp = self.carol.call_api(path=url, method='POST', params=querystring)
def _labels_and_desc(self, prop):
if self.label_map is None:
label = prop
else:
label = self.label_map.get(prop)
if label is None:
label = prop
else:
label = {"en-US": label}
if self.description_map is None:
description = prop
else:
description = self.description_map.get(prop)
if description is None:
description = prop
else:
description = {"en-US": description}
return label, description
def from_json(self, json_sample, profile_title=None, publish=False, dm_id=None,
label_map=None, description_map=None, ignore_field_type=False):
if publish:
assert profile_title is not None, "To publish the data model, `profile_title` has to be set."
if isinstance(profile_title, str):
profile_title = [profile_title]
assert all([i in json_sample for i in profile_title]), "all profile title values should be in `json_sample`"
self.label_map = label_map
self.description_map = description_map
if dm_id is None:
assert self.template_dict is not None
template_name, template_json = self.template_dict.copy().popitem()
self.dm_id = template_json['mdmId']
else:
self.dm_id = dm_id
self.json_sample = json_sample
n_fields = len(list(self.json_sample))
count = 0
for prop, value in self.json_sample.items():
count += 1
print('Creating {}/{}'.format(count, n_fields))
prop = prop.lower()
entity_type = entType.get_ent_type_for(type(value))
if prop in self.all_possible_fields.keys():
if not entity_type.ent_type == 'nested':
ent_ = self.all_possible_fields.get(prop, []).copy()
ent_.pop('mdmCreated')
ent_.pop('mdmLastUpdated')
ent_.pop('mdmTenantId')
if (ent_['mdmMappingDataType'].lower() == entity_type.ent_type) or (ignore_field_type):
self.add_field(prop, parent_field_id="")
else:
print('problem, {} not created, field name matches with an already'
'created field but different type'.format(prop))
else:
print('Nested fields are not supported')
else:
if not entity_type.ent_type == 'nested':
current_label, current_description = self._labels_and_desc(prop)
self.fields.create(mdm_name=prop, mdm_mpping_data_type=entity_type.ent_type,
mdm_field_type='PRIMITIVE', admin=True,
mdm_label=current_label, mdm_description=current_description)
self.all_possible_fields = self.fields.fields_dict
self.add_field(prop, parent_field_id="")
else:
print('Nested fields are not supported')
if publish:
self._profile_title(profile_title, self.dm_id)
self.publish_template(self.dm_id)
# not done
def _nested(self, mdmName, value, parentId=''):
raise ValueError('not implemented')
payload = {"mdmName": mdmName, "mdmMappingDataType": entity_type.ent_type,
"mdmLabel": {"en-US": mdmName}, "mdmDescription": {"en-US": mdmName}}
entity_type = entType.get_ent_type_for(type(value))
if entity_type == entObjectType and len(value) > 0:
for key, val in value.items():
payload['mdmFieldType'] = 'NESTED'
print('criando NESTED')
parentId = 1234
create_field(key, val, parentId=parentId)
elif entity_type == entArrayType and len(value) > 0:
pass
if entity_type.ent_type == 'nested':
payload['mdmFieldType'] = 'NESTED'
print('criando NESTED')
parentId = 1234
_, parentId = create_field()
else:
payload['mdmFieldType'] = 'PRIMITIVE'
print('criando PRIMITIVE')
return payload, parentId