pycarol.cds

The main Carol’s storage is called CDS (Carol Data Storage). Any data received or created in Carol is sent to CDS. Inside CDS one can have three kinds of data. Data coming from the Staging Area, data processed and mapped to a DataModel (Golden Record), and any other file that the user could send. The pycarol.cds.CDSStaging and the pycarol.cds.CDSGolden classes are used to manipulate the data inside the first two cases.

class pycarol.cds.CDSGolden(carol)[source]

Class to handle all CDS Staging iterations.

Parameters

carol – ‘pycarol.Carol` Carol() instance.

consolidate(dm_name=None, dm_id=None, worker_type=None, max_number_workers=-1, number_shards=-1, force_dataflow=False, ignore_merge=False, file_pattern='*.parquet', auto_scaling=True, file_size_limit=-1)[source]

Process staging CDS data.

Parameters
  • dm_namestr, Data Model name.

  • dm_idstr, default None Data Model id.

  • worker_typestr, default None Machine flavor to be used. If None Carol will decide the machine to use.

  • max_number_workersint, default -1 Max number of workers to be used during the process. ‘-1’ means all the available.

  • number_shardsint, default -1 Number of shards.

  • ignore_mergebool default `False If merge rules should be ignored when consolidating the records

  • force_dataflowbool default False If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)

  • file_patternstr, default *.parquet File pattern of the files in CDS to be consolidated. The pattern is YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.

  • auto_scalingbool default True Use auto scaling. It False Carol will use max_number_workers for the whole process.

  • file_size_limitint default -1 Ignore files larger than “file_size_limit” bytes during consolidation.

Returns

dict Task created in Carol.

count(dm_name=None, dm_id=None)[source]

Count number of messages in CDS.

Parameters
  • dm_namestr, Data Model name.

  • dm_idstr, default None Data Model id.

Returns

int Count

delete(dm_name=None, dm_id=None)[source]

Delete all CDS data model data.

Parameters
  • dm_namestr, Data Model name.

  • dm_idstr, default None Data Model id.

Returns

None

delete_rejected(dm_name=None, dm_id=None, connector_name=None, connector_id=None, staging_name=None)[source]

Delete CDS DataModel rejected data

Parameters
  • dm_namestr, Data Model name.

  • dm_idstr, default None Data Model id.

  • connector_namestr, default None Connector name. Used if delete only records from a given connector/staging table

  • connector_idstr, default None Connector id. Used if delete only records from a given connector/staging table

  • staging_namestr, default None Staging name. Used if delete only records from a given connector/staging table

Returns: dict

Carol task created.

process_bigquery(query, dm_name=None, dm_id=None, save_cds_data=True, delete_target_folder=False, send_subscriptions=True, use_dataflow=False, delete_realtime_records=False, send_realtime_records=False, save_big_query=False, clear_big_query=False, deduplicate_results=False, **extra_params)[source]

Process CDS using bigquery engine.

Parameters
  • querystr BigQuery query.

  • dm_namestr, Data Model name.

  • dm_idstr, default None Data Model id.

  • save_cds_databool, default True Save result in CDS.

  • delete_target_folderbool, default False Delete target folder.

  • send_subscriptionsbool, default True Send subscriptions.

  • use_dataflowbool, default False Use Dataflow.

  • delete_realtime_recordsbool, default False Delete realtime records.

  • send_realtime_recordsbool, default False Send realtime records.

  • save_big_querybool default False save the result to BigQuery table

  • clear_big_querybool default False clean BigQuery first

  • deduplicate_resultsbool default False If results should be deduplicated (forced to true if send_realtime_records is True)

  • extra_paramsdict If a new parameter is added on carol, it is a way to make possible to add this new parameter without updating pycarol

Returns

dict Task created in Carol.

sync_data(dm_name, dm_id=None, num_records=-1, file_pattern='*', filter_query=None, skip_consolidation=False, force_dataflow=False, records_percentage=100, worker_type=None, max_number_workers=-1, clear_golden_realtime=False)[source]

Sync data to realtime layer.

Parameters
  • dm_namestr, Data model name.

  • dm_idstr, default None Data model id.

  • num_recordsint, default -1 Number of records to be processed. ‘-1’ means all the records.

  • file_patternstr, default * File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.

  • filter_querydict, default None Query to be used to filter the data to be processed.

  • skip_consolidationbool default `False If consolidation process should be skipped

  • force_dataflowbool default False If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)

  • 100 (records_percentage" int default) – The percentage of records (0-100) to import

  • worker_typestr, default None Machine flavor to be used. If None Carol will decide the machine to use.

  • max_number_workersint, default -1 Max number of workers to be used during the process. ‘-1’ means all the available.

  • clear_golden_realtimebool, default False If the records on realtime should be deleted first

Returns

None

class pycarol.cds.CDSStaging(carol)[source]

Class to handle all CDS Staging iterations.

consolidate(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1, force_dataflow=False, rehash_ids=False, file_pattern='*.parquet', compute_transformations=False, auto_scaling=True, file_size_limit=-1)[source]

Process staging CDS data.

Parameters
  • staging_namestr, Staging name.

  • connector_idstr, default None Connector id.

  • connector_namestr, default None Connector name.

  • worker_typestr, default None Machine flavor to be used. If None Carol will decide the machine to use.

  • max_number_workersint, default -1 Max number of workers to be used during the process. ‘-1’ means all the available.

  • number_shardsint, default -1 Number of shards.

  • force_dataflowbool default False If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)

  • False (rehash_ids" bool default) – If all ids should be regenerated from the crosswalk

  • file_patternstr, default *.parquet File pattern of the files in CDS to be consolidated. The pattern is YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.

  • compute_transformationsbool default False If staging transformations are defined, this will apply the transformations during the consolidate.

  • auto_scalingbool default True Use auto scaling. It False Carol will use max_number_workers for the whole process.

  • file_size_limitint default -1 Ignore files larger than “file_size_limit” bytes during consolidation.

Returns

dict Task created in Carol.

count(staging_name, connector_id=None, connector_name=None)[source]

Count number of messages in CDS.

Parameters
  • staging_namestr, Staging name.

  • connector_idstr, default None Connector id.

  • connector_namestr, default None Connector name

Returns

int Count

delete(staging_name, connector_id=None, connector_name=None)[source]

Delete all CDS staging data.

Parameters
  • staging_namestr, Staging name.

  • connector_idstr, default None Connector id.

  • connector_namestr, default None Connector name

Returns

None

process_data(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1, num_records=-1, delete_target_folder=False, enable_realtime=None, delete_realtime_records=False, send_realtime=None, file_pattern='*', filter_query=None, skip_consolidation=False, force_dataflow=False, recursive_processing=True, dm_name=None, auto_scaling=True, force_paused=False)[source]

Process CDS staging data.

Parameters
  • staging_namestr, Staging name.

  • connector_idstr, default None Connector id.

  • connector_namestr, default None Connector name.

  • worker_typestr, default None Machine flavor to be used. If None Carol will decide the machine to use.

  • max_number_workersint, default -1 Max number of workers to be used during the process. ‘-1’ means all the available.

  • number_shardsint, default -1 Number of shards.

  • num_recordsint, default -1 Number of records to be processed. ‘-1’ means all the records.

  • delete_target_folderbool, default False If delete the previous processed records.

  • enable_realtimebool, default False DEPRECATED. Removed from Carol. Enable this staging table to send the processed data to realtime layer.

  • delete_realtime_recordsbool, default False Delete previous processed data in realtime.

  • send_realtimebool, default None Send the processed data to realtime layer.

  • file_patternstr, default * File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.

  • filter_querydict, default None Query to be used to filter the data to be processed.

  • skip_consolidationbool default `False If consolidation process should be skipped

  • force_dataflowbool default False If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)

  • recursive_processingbool default True If processing should be chained/recursed in target entities. e.g., If a staging has 3 ETLs and each ETL maps to a data model. If we process this staging it will trigger the whole tree to be processed.

  • dm_namestr default None If not None, it will reprocess the rejected records from the selected staging table.

  • auto_scalingbool default True Use auto scaling. It False Carol will use max_number_workers for the whole process.

  • force_pausedbool default False If processing should be forced even for paused stagings

Returns

dict Task definition.

sync_data(staging_name, connector_id=None, connector_name=None, num_records=-1, delete_realtime_records=False, enable_realtime=None, file_pattern='*', filter_query=None, force_dataflow=False, records_percentage=100)[source]

Sync data to realtime layer.

Parameters
  • staging_namestr, Staging name.

  • connector_idstr, default None Connector id.

  • connector_namestr, default None Connector name.

  • num_recordsint, default -1 Number of records to be processed. ‘-1’ means all the records.

  • enable_realtimebool, default False DEPRECATED. Removed from Carol. Enable this staging table to send the processed data to realtime layer.

  • delete_realtime_recordsbool, default False Delete previous processed data in realtime.

  • file_patternstr, default * File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.

  • filter_querydict, default None Query to be used to filter the data to be processed.

  • force_dataflowbool default False If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)

  • 100 (records_percentage" int default) – The percentage of records (0-100) to import

Returns

None