import logging
from collections import defaultdict
import random
import time
from itertools import chain
from pycarol import (
Carol, ApiKeyAuth, PwdAuth, Tasks, Staging, Connectors, CDSStaging, Subscription, DataModel, Apps, CDSGolden
)
from pycarol.query import delete_golden
[docs]def track_tasks(carol, task_list, retry_count=3, logger=None, callback=None, polling_delay=5):
"""Track a list of taks from carol, waiting for errors/completeness.
Args:
carol (pycarol.Carol): pycarol.Carol instance
task_list (list): List of tasks in Carol
retry_count (int, optional): Number of times to restart a failed task. Defaults to 3.
logger (logging.logger, optional): logger to log information. Defaults to None.
callback (calable, optional): This function will be called every time task status are fetched from carol.
A dictionary with task status will be passed to the function. Defaults to None.
polling_delay (int, optional): Time in seconds to pull task status from Carol
Usage:
.. code:: python
from pycarol import Carol
from pycarol.functions import track_tasks
carol = Carol()
def callback(task_list):
print(task_list)
track_tasks(carol=carol, task_list=['task_id_1', 'task_id_2'], callback=callback)
Returns:
[dict, bool]: dict with status of each task and booling if any task failed more than retry_count times.
"""
if logger is None:
logger = logging.getLogger(carol.domain)
retry_tasks = defaultdict(int)
n_task = len(task_list)
max_retries = set()
carol_task = Tasks(carol)
while True:
task_status = defaultdict(list)
for task in task_list:
status = carol_task.get_task(task).task_status
task_status[status].append(task)
for task in task_status['FAILED'] + task_status['CANCELED']:
logger.warning(f'Something went wrong while processing: {task}')
retry_tasks[task] += 1
if retry_tasks[task] > retry_count:
max_retries.update([task])
logger.error(
f'Task: {task} failed {retry_count} times. will not restart')
continue
logger.info(f'Retry task: {task}')
carol_task.reprocess(task)
if len(task_status['COMPLETED']) == n_task:
logger.debug(f'All task finished')
return task_status, False
elif len(max_retries) + len(task_status['COMPLETED']) == n_task:
logger.warning(f'There are {len(max_retries)} failed tasks.')
return task_status, True
else:
time.sleep(round(polling_delay + random.random(), 2))
logger.debug('Waiting for tasks')
if callable(callback):
callback(task_status)
[docs]def delele_all_golden_data(carol, dm_name):
"""Delete golden files from a datamodel in all storages.
Args:
carol (pycarol.Carol): Carol instance
dm_name (str): Data Model name
Returns:
list: list of tasks created
"""
cds_CDSGolden = CDSGolden(carol)
t = []
dm_id = DataModel(carol).get_by_name(dm_name)['mdmId']
task = cds_CDSGolden.delete_rejected(dm_id=dm_id, )
t += [task]
task = cds_CDSGolden.delete(dm_id=dm_id, )
delete_golden(carol, dm_name)
t += [task['taskId'], ]
return t
[docs]def par_delete_golden(carol, dm_list, n_jobs=5):
"""
Deletes golden files from a list of datamodels in parallel.
Args:
carol (pycarol.Carol): Carol instance
dm_list (list): List of datamodels
n_jobs (int, optional): Number of parallel jobs. Defaults to 5.
Returns:
list: list of tasks created
"""
from joblib import Parallel, delayed
tasks = Parallel(n_jobs=n_jobs)(delayed(delele_all_golden_data)(carol, i)
for i in dm_list)
return list(chain(*tasks))
[docs]def delete_staging_data(carol, staging_name, connector_name):
"""Delete a staging.
Args:
carol (pycarol.Carol): Login instance
staging_name (str): Staging name
connector_name (str): Connector name
"""
t = []
cds_ = CDSStaging(carol)
task = cds_.delete(staging_name=staging_name,
connector_name=connector_name)
t += [task['taskId'], ]
return t
[docs]def par_delete_staging(carol, staging_list, connector_name, n_jobs=5):
"""
Deletes staging files from a list of datamodels in parallel.
Args:
carol (pycarol.Carol): Login instance
staging_list (list): List of datamodels
connector_name (str): Connector name
n_jobs (int, optional): Number of parallel jobs. Defaults to 5.
Returns:
list: list of tasks created
"""
from joblib import Parallel, delayed
tasks = Parallel(n_jobs=n_jobs)(delayed(delete_staging_data)(carol, i, connector_name)
for i in staging_list)
return list(chain(*tasks))