Source code for pycarol.functions.misc

import logging
from collections import defaultdict
import random
import time
from itertools import chain

from pycarol import (
    Carol, ApiKeyAuth, PwdAuth, Tasks, Staging, Connectors, CDSStaging, Subscription, DataModel, Apps, CDSGolden
)
from pycarol.query import delete_golden

[docs]def track_tasks(carol, task_list, retry_count=3, logger=None, callback=None, polling_delay=5): """Track a list of taks from carol, waiting for errors/completeness. Args: carol (pycarol.Carol): pycarol.Carol instance task_list (list): List of tasks in Carol retry_count (int, optional): Number of times to restart a failed task. Defaults to 3. logger (logging.logger, optional): logger to log information. Defaults to None. callback (calable, optional): This function will be called every time task status are fetched from carol. A dictionary with task status will be passed to the function. Defaults to None. polling_delay (int, optional): Time in seconds to pull task status from Carol Usage: .. code:: python from pycarol import Carol from pycarol.functions import track_tasks carol = Carol() def callback(task_list): print(task_list) track_tasks(carol=carol, task_list=['task_id_1', 'task_id_2'], callback=callback) Returns: [dict, bool]: dict with status of each task and booling if any task failed more than retry_count times. """ if logger is None: logger = logging.getLogger(carol.domain) retry_tasks = defaultdict(int) n_task = len(task_list) max_retries = set() carol_task = Tasks(carol) while True: task_status = defaultdict(list) for task in task_list: status = carol_task.get_task(task).task_status task_status[status].append(task) for task in task_status['FAILED'] + task_status['CANCELED']: logger.warning(f'Something went wrong while processing: {task}') retry_tasks[task] += 1 if retry_tasks[task] > retry_count: max_retries.update([task]) logger.error( f'Task: {task} failed {retry_count} times. will not restart') continue logger.info(f'Retry task: {task}') carol_task.reprocess(task) if len(task_status['COMPLETED']) == n_task: logger.debug(f'All task finished') return task_status, False elif len(max_retries) + len(task_status['COMPLETED']) == n_task: logger.warning(f'There are {len(max_retries)} failed tasks.') return task_status, True else: time.sleep(round(polling_delay + random.random(), 2)) logger.debug('Waiting for tasks') if callable(callback): callback(task_status)
[docs]def pause_dm_mappings(carol, dm_list, connector_name=None, connector_id=None, do_not_pause_staging_list=None): """Pause mappings from a connetor based on a list of Datamodels. Args: carol (pycal.Carol): Carol instance dm_list (list): list of Datamodels to pause connector_name (str): connector name connector_name (str): connector id do_not_pause_staging_list (list, optional): List of stagings to do not pause. Defaults to None. Usage: .. code:: python from pycarol import Carol from pycarol.functions import pause_dm_mappings carol = Carol() pause_dm_mappings(carol, connector_name='connector_name', dm_list=['dm1','dm2']) """ do_not_pause_staging_list = do_not_pause_staging_list if do_not_pause_staging_list else [''] conn = Connectors(carol) mappings = conn.get_dm_mappings(connector_name=connector_name, connector_id=connector_id ) mappings = mappings = [i['mdmId'] for i in mappings if (i['mdmRunningState'] == 'RUNNING') and (i['mdmMasterEntityName'] in dm_list) and (i['mdmStagingType'] not in do_not_pause_staging_list) ] _ = conn.pause_mapping( connector_name=connector_name, entity_mapping_id=mappings, connector_id=connector_id)
[docs]def check_mapping(login, staging_name, connector_name=None, connector_id=None): """Check if a staging is mapped to a datamodel and return the mapping. Args: login (pycaol.Carol): Carol instance staging_name ([type]): staging name connector_name ([type]): connetor name connector_id ([type]): connetor id Usage: .. code:: python from pycarol import Carol from pycarol.functions import check_mapping carol = Carol() check_mapping(carol, staging_name='staging_name', connector_name='connector_name') Returns: list: list of mappings or None """ conn = Connectors(login) if connector_id is None and connector_name is None: raise ValueError('Either connector_id or connector_name must be set.') connector_id = connector_id if connector_id else conn.get_by_name(connector_name)['mdmId'] resp = conn.get_entity_mappings( connector_name=connector_name, staging_name=staging_name, connector_id=connector_id, errors='ignore' ) if isinstance(resp, dict): if resp['errorCode'] == 404 and 'Entity mapping not found' in resp['errorMessage']: return None else: raise ValueError(f'Error checking mapping {resp}') return resp
[docs]def resume_process(carol, staging_name, connector_name=None, connector_id=None, logger=None, delay=1): """Resume staging process (mappings or ETLs) Args: carol (pycarol.Carol): Carol instance staging_name (str): staging name connector_name (str): connector name connector_id (str): connetor id logger (logging.logger, optional): logger. Defaults to None. delay (int, optional): time to wait after resume processing. Defaults to 1. Usage: .. code:: python from pycarol import Carol from pycarol.functions import resume_process carol = Carol() resume_process(carol, staging_name='staging_name', connector_name='connector_name') Returns: list: list of mappings paused. """ if logger is None: logger = logging.getLogger(carol.domain) conn = Connectors(carol) if connector_id is None and connector_name is None: raise ValueError('Either connector_id or connector_name must be set.') connector_id = connector_id if connector_id else conn.get_by_name(connector_name)['mdmId'] # TODO Review this once we have mapping and ETLs in the same staging. # Play ETLs if any. resp = conn.play_etl(connector_id=connector_id, staging_name=staging_name, connector_name=connector_name) if not resp['success']: logger.error(f'Problem starting ETL {connector_name}/{staging_name}\n {resp}') raise ValueError(f'Problem starting ETL {connector_name}/{staging_name}\n {resp}') # Play mapping if any. mappings_list = check_mapping(carol, connector_name=connector_name, staging_name=staging_name, connector_id=connector_id) if mappings_list is not None: for mappings_ in mappings_list: conn.play_mapping(connector_name=connector_name, connector_id=connector_id, entity_mapping_id=mappings_['mdmId'], process_cds=False, ) # wait for mapping effect. time.sleep(delay) return mappings_list
[docs]def pause_etls(carol, etl_list, connector_name=None, connector_id=None, logger=None): """Pause ETLs from a connetor based on a list of ETLs. Args: carol (pycarol.Carol): Carol instance etl_list (list): ETLs to pause connector_name (str, optional): Connector Name. Defaults to None. connector_id (str, optional): Connector ID. Defaults to None. logger (logging.logger, optional): Logger. Defaults to None. Usage: .. code:: python from pycarol import Carol from pycarol.functions import resume_process carol = Carol() etl_list = ['staging1', 'staging2', 'staging3'] pause_etls(carol, connector_name='rui', etl_list=etl_list,) Returns: list: list of paused ETLs """ if logger is None: logger = logging.getLogger(carol.domain) conn = Connectors(carol) if connector_id is None and connector_name is None: raise ValueError('Either connector_id or connector_name must be set.') connector_id = connector_id if connector_id else conn.get_by_name(connector_name)['mdmId'] r = {} for staging_name in etl_list: logger.debug(f'Pausing {staging_name} ETLs') r[staging_name] = conn.pause_etl(connector_name=connector_name, staging_name=staging_name) if not all(i['success'] for _, i in r.items()): logger.error(f'Some ETLs were not paused. {r}') raise ValueError(f'Some ETLs were not paused. {r}') return r
[docs]def delele_all_golden_data(carol, dm_name): """Delete golden files from a datamodel in all storages. Args: carol (pycarol.Carol): Carol instance dm_name (str): Data Model name Returns: list: list of tasks created """ cds_CDSGolden = CDSGolden(carol) t = [] dm_id = DataModel(carol).get_by_name(dm_name)['mdmId'] task = cds_CDSGolden.delete_rejected(dm_id=dm_id, ) t += [task] task = cds_CDSGolden.delete(dm_id=dm_id, ) delete_golden(carol, dm_name) t += [task['taskId'], ] return t
[docs]def par_delete_golden(carol, dm_list, n_jobs=5): """ Deletes golden files from a list of datamodels in parallel. Args: carol (pycarol.Carol): Carol instance dm_list (list): List of datamodels n_jobs (int, optional): Number of parallel jobs. Defaults to 5. Returns: list: list of tasks created """ from joblib import Parallel, delayed tasks = Parallel(n_jobs=n_jobs)(delayed(delele_all_golden_data)(carol, i) for i in dm_list) return list(chain(*tasks))
[docs]def delete_staging_data(carol, staging_name, connector_name): """Delete a staging. Args: carol (pycarol.Carol): Login instance staging_name (str): Staging name connector_name (str): Connector name """ t = [] cds_ = CDSStaging(carol) task = cds_.delete(staging_name=staging_name, connector_name=connector_name) t += [task['taskId'], ] return t
[docs]def par_delete_staging(carol, staging_list, connector_name, n_jobs=5): """ Deletes staging files from a list of datamodels in parallel. Args: carol (pycarol.Carol): Login instance staging_list (list): List of datamodels connector_name (str): Connector name n_jobs (int, optional): Number of parallel jobs. Defaults to 5. Returns: list: list of tasks created """ from joblib import Parallel, delayed tasks = Parallel(n_jobs=n_jobs)(delayed(delete_staging_data)(carol, i, connector_name) for i in staging_list) return list(chain(*tasks))