Source code for pycarol.carolina

import copy
from string import Formatter
from datetime import datetime, timedelta
import threading

[docs]class Carolina: """ This class is used to handle any iteration with Carol data storage (CDS). Args: carol: class: pycarol.Carol """ token = None backend_lock = threading.Lock() def __init__(self, carol): self.carol = carol self.engine = None self.client = None self.token = None self.expires_at = None self.cds_app_storage_path = None self.cds_golden_path = None self.cds_staging_path = None self.cds_staging_master_path = None self.cds_staging_rejected_path = None self.cds_view_path = None self.cds_golden_intake_path = None self.cds_staging_intake_path = None self.cds_golden_rejected_intake_path = None self.cds_view_intake_path = None self.cds_pycarol_storage_path = None
[docs] def init_if_needed(self): """ Checks if backend credentials / object is still valid. This method is thread safe by using lock. Returns: True if backend has been reinitialized, false if it is still valid. """ expired = False # locks all instance variables to prevent inconsistencies with Carolina.backend_lock: if self.client: # Check if the token is not expired for at least another minute.. we do a little margin to avoid time difference issues if self.expires_at is None or datetime.utcnow() + timedelta(minutes=1) < self.expires_at: return else: expired = True if Carolina.token is None or expired or Carolina.token.get('tenant_name', '') != self.carol.tenant['mdmName'] \ or Carolina.token.get('app_name', '') != self.carol.app_name \ or (datetime.utcnow() + timedelta(minutes=1) >= datetime.fromtimestamp(Carolina.token.get('expirationTimestamp', 1)/1000.0)): token = self.carol.call_api( 'v1/storage/storage/token', params={'carolAppName': self.carol.app_name}, status_forcelist=(500, 502, 503, 504, 524), retries=5 ) token['tenant_name'] = self.carol.tenant['mdmName'] token['app_name'] = self.carol.app_name Carolina.token = token token = Carolina.token self.engine = token['engine'] if token.get('expirationTimestamp', None) is not None: self.expires_at = datetime.fromtimestamp(token['expirationTimestamp'] / 1000.0) self.cds_app_storage_path = token['cdsAppStoragePath'] self.cds_golden_path = token['cdsGoldenPath'] self.cds_staging_path = token['cdsStagingPath'] self.cds_staging_master_path = token['cdsStagingMasterPath'] self.cds_staging_rejected_path = token['cdsStagingRejectedPath'] self.cds_view_path = token['cdsViewPath'] self.cds_golden_intake_path = token['cdsIntakeGoldenPath'] self.cds_staging_intake_path = token['cdsIntakeStagingPath'] self.cds_golden_rejected_intake_path = token['cdsRejectedPath'] self.cds_pycarol_storage_path = copy.deepcopy(token['cdsAppStoragePath']) self.cds_pycarol_storage_path['path'] = 'carolapps/._pycarol' self.cds_view_intake_path = token['cdsIntakeViewPath'] if self.engine == 'GCP-CS': self._init_gcp(token)
def _init_gcp(self, token): """ Initialize GCP back-end. Warning: this method is not thread safe and should not be called directly, use init_if_needed instead. Args: token: `dict` TODO: add here the fields for GCP GCP token { "aiAccessKeyId": aws_access_key_id, "aiSecretKey": aws_secret_access_key, "aiAccessToken": aws_session_token, } Returns: None """ from google.oauth2 import service_account from google.cloud import storage self.token = token['token'] gcp_credentials = service_account.Credentials.from_service_account_info(token['token']) self.client = storage.Client(credentials=gcp_credentials, project=token['token']['project_id']) def get_client(self): self.init_if_needed() return self.client
[docs] def get_bucket_name(self, space): """ Format the bucket path for each possible space. Args: space: `str`, Which bucket to get. Possible values: 1. "golden": Data Model golden records. 2. "staging": Staging records path 3. "staging_master": Staging records from Master 4. "staging_rejected": Staging records from Rejected 5. "view": Data Model View records 6. "app": App bucket 7. "golden_cds": CDS golden records 8. "staging_cds": Staging Intake. Returns: formatted bucket path """ # TODO: we can use a dictionary or instead of ifs. if space == 'golden': template = self.cds_golden_path['bucket'] elif space == 'staging': template = self.cds_staging_path['bucket'] elif space == 'staging_master': template = self.cds_staging_master_path['bucket'] elif space == 'staging_rejected': template = self.cds_staging_rejected_path['bucket'] elif space == 'view': template = self.cds_view_path['bucket'] elif space == 'app': template = self.cds_app_storage_path['bucket'] elif space == 'staging_cds': template = self.cds_staging_intake_path['bucket'] elif space == 'golden_cds': template = self.cds_golden_intake_path['bucket'] elif space == 'view_cds': template = self.cds_view_intake_path['bucket'] elif space == 'golden_rejected': template = self.cds_staging_rejected_path['bucket'] elif space == 'pycarol': template = self.cds_pycarol_storage_path['bucket'] else: raise ValueError name = Formatter().vformat(template, None, {'tenant_id': self.carol.tenant['mdmId']}) return name
[docs] def get_path(self, space, vars): """ Format the bucket path for each possible space. Args: space: `str`, Which bucket to get. Possible values: 1. "golden": Data Model golden records. 2. "staging": Staging records path 3. "staging_master": Staging records from Master 4. "staging_rejected": Staging records from Rejected 5. "view": Data Model Relationship View records 6. "app": App bucket 7. "golden_cds": CDS golden records 8. "staging_cds": Staging Intake. vars: `dict` Parameters needed to format the storage path. Possible keys: 1. "tenant_id": Tenant ID. 2. "connector_id": Connector ID 3. "staging_type": Staging Name 4. "dm_name": Data model Name 5. "relationship_view_name": Relationship view name 6. "app_name": App name Returns: formatted path """ vars['tenant_id'] = self.carol.tenant['mdmId'] # TODO: we can use a dictionary or instead of ifs. if space == 'golden': template = self.cds_golden_path['path'] + '/' elif space == 'staging': template = self.cds_staging_path['path'] + '/' elif space == 'staging_master': template = self.cds_staging_master_path['path'] + '/' elif space == 'staging_rejected': template = self.cds_staging_rejected_path['path'] + '/' elif space == 'view': template = self.cds_view_path['path'] + '/' elif space == 'app': template = self.cds_app_storage_path['path'] + '/' elif space == 'staging_cds': template = self.cds_staging_intake_path['path'] + '/' elif space == 'golden_cds': template = self.cds_golden_intake_path['path'] + '/' elif space == 'view_cds': template = self.cds_view_intake_path['path'] + '/' elif space == 'golden_rejected': template = self.cds_golden_rejected_intake_path['path'] + '/' elif space == 'pycarol': template = self.cds_pycarol_storage_path['path'] + '/' else: raise ValueError name = Formatter().vformat(template, None, vars) return name