pycarol.cds

The main Carol’s storage is called CDS (Carol Data Storage). Any data received or created in Carol is sent to CDS. Inside CDS one can have three kinds of data. Data coming from the Staging Area, data processed and mapped to a DataModel (Golden Record), and any other file that the user could send. The pycarol.cds.CDSStaging and the pycarol.cds.CDSGolden classes are used to manipulate the data inside the first two cases.

class pycarol.cds.CDSGolden(carol)[source]

Class to handle all CDS Staging iterations.

Args:

carol: ‘pycarol.Carol`
Carol() instance.
consolidate(dm_name=None, dm_id=None, worker_type=None, max_number_workers=-1, number_shards=-1, force_dataflow=False, ignore_merge=False, file_pattern='*.parquet', auto_scaling=True, file_size_limit=-1)[source]

Process staging CDS data.

Args:

dm_name: str,
Data Model name.
dm_id: str, default None
Data Model id.
worker_type: str, default None
Machine flavor to be used. If None Carol will decide the machine to use.
max_number_workers: int, default -1
Max number of workers to be used during the process. ‘-1’ means all the available.
number_shards: int, default -1
Number of shards.
ignore_merge: bool default `False
If merge rules should be ignored when consolidating the records
force_dataflow: bool default False
If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)
file_pattern: str, default *.parquet
File pattern of the files in CDS to be consolidated. The pattern is YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
auto_scaling: bool default True
Use auto scaling. It False Carol will use max_number_workers for the whole process.
file_size_limit: int default -1
Ignore files larger than “file_size_limit” bytes during consolidation.
Returns:dict Task created in Carol.
count(dm_name=None, dm_id=None)[source]

Count number of messages in CDS.

Args:

dm_name: str,
Data Model name.
dm_id: str, default None
Data Model id.
Returns:int Count
delete(dm_name=None, dm_id=None)[source]

Delete all CDS data model data.

Args:

dm_name: str,
Data Model name.
dm_id: str, default None
Data Model id.
Returns:None
delete_rejected(dm_name=None, dm_id=None, connector_name=None, connector_id=None, staging_name=None)[source]

Delete CDS DataModel rejected data

Args:
dm_name: str,
Data Model name.
dm_id: str, default None
Data Model id.
connector_name: str, default None
Connector name. Used if delete only records from a given connector/staging table
connector_id: str, default None
Connector id. Used if delete only records from a given connector/staging table
staging_name: str, default None
Staging name. Used if delete only records from a given connector/staging table
Returns: dict
Carol task created.
sync_data(dm_name, dm_id=None, num_records=-1, file_pattern='*', filter_query=None, skip_consolidation=False, force_dataflow=False, records_percentage=100, worker_type=None, max_number_workers=-1, clear_golden_realtime=False)[source]

Sync data to realtime layer.

Args:

dm_name: str,
Data model name.
dm_id: str, default None
Data model id.
num_records: int, default -1
Number of records to be processed. ‘-1’ means all the records.
file_pattern: str, default *
File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
filter_query: dict, default None
Query to be used to filter the data to be processed.
skip_consolidation: bool default `False
If consolidation process should be skipped
force_dataflow: bool default False
If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)
records_percentage” int default 100
The percentage of records (0-100) to import
worker_type: str, default None
Machine flavor to be used. If None Carol will decide the machine to use.
max_number_workers: int, default -1
Max number of workers to be used during the process. ‘-1’ means all the available.
clear_golden_realtime: bool, default False
If the records on realtime should be deleted first
Returns:None
class pycarol.cds.CDSStaging(carol)[source]

Class to handle all CDS Staging iterations.

consolidate(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1, force_dataflow=False, rehash_ids=False, file_pattern='*.parquet', compute_transformations=False, auto_scaling=True, file_size_limit=-1)[source]

Process staging CDS data.

Args:

staging_name: str,
Staging name.
connector_id: str, default None
Connector id.
connector_name: str, default None
Connector name.
worker_type: str, default None
Machine flavor to be used. If None Carol will decide the machine to use.
max_number_workers: int, default -1
Max number of workers to be used during the process. ‘-1’ means all the available.
number_shards: int, default -1
Number of shards.
force_dataflow: bool default False
If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)
rehash_ids” bool default False
If all ids should be regenerated from the crosswalk
file_pattern: str, default *.parquet
File pattern of the files in CDS to be consolidated. The pattern is YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
compute_transformations: bool default False
If staging transformations are defined, this will apply the transformations during the consolidate.
auto_scaling: bool default True
Use auto scaling. It False Carol will use max_number_workers for the whole process.
file_size_limit: int default -1
Ignore files larger than “file_size_limit” bytes during consolidation.
Returns:dict Task created in Carol.
count(staging_name, connector_id=None, connector_name=None)[source]

Count number of messages in CDS.

Args:

staging_name: str,
Staging name.
connector_id: str, default None
Connector id.
connector_name: str, default None
Connector name
Returns:int Count
delete(staging_name, connector_id=None, connector_name=None)[source]

Delete all CDS staging data.

Args:

staging_name: str,
Staging name.
connector_id: str, default None
Connector id.
connector_name: str, default None
Connector name
Returns:None
process_data(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1, num_records=-1, delete_target_folder=False, enable_realtime=None, delete_realtime_records=False, send_realtime=None, file_pattern='*', filter_query=None, skip_consolidation=False, force_dataflow=False, recursive_processing=True, dm_name=None, auto_scaling=True, force_paused=False)[source]

Process CDS staging data.

Args:

staging_name: str,
Staging name.
connector_id: str, default None
Connector id.
connector_name: str, default None
Connector name.
worker_type: str, default None
Machine flavor to be used. If None Carol will decide the machine to use.
max_number_workers: int, default -1
Max number of workers to be used during the process. ‘-1’ means all the available.
number_shards: int, default -1
Number of shards.
num_records: int, default -1
Number of records to be processed. ‘-1’ means all the records.
delete_target_folder: bool, default False
If delete the previous processed records.
enable_realtime: bool, default False
DEPRECATED. Removed from Carol. Enable this staging table to send the processed data to realtime layer.
delete_realtime_records: bool, default False
Delete previous processed data in realtime.
send_realtime: bool, default None
Send the processed data to realtime layer.
file_pattern: str, default *
File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
filter_query: dict, default None
Query to be used to filter the data to be processed.
skip_consolidation: bool default `False
If consolidation process should be skipped
force_dataflow: bool default False
If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)
recursive_processing: bool default True
If processing should be chained/recursed in target entities. e.g., If a staging has 3 ETLs and each ETL maps to a data model. If we process this staging it will trigger the whole tree to be processed.
dm_name: str default None
If not None, it will reprocess the rejected records from the selected staging table.
auto_scaling: bool default True
Use auto scaling. It False Carol will use max_number_workers for the whole process.
force_paused: bool default False
If processing should be forced even for paused stagings
Returns:dict Task definition.
sync_data(staging_name, connector_id=None, connector_name=None, num_records=-1, delete_realtime_records=False, enable_realtime=None, file_pattern='*', filter_query=None, force_dataflow=False, records_percentage=100)[source]

Sync data to realtime layer.

Args:

staging_name: str,
Staging name.
connector_id: str, default None
Connector id.
connector_name: str, default None
Connector name.
num_records: int, default -1
Number of records to be processed. ‘-1’ means all the records.
enable_realtime: bool, default False
DEPRECATED. Removed from Carol. Enable this staging table to send the processed data to realtime layer.
delete_realtime_records: bool, default False
Delete previous processed data in realtime.
file_pattern: str, default *
File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
filter_query: dict, default None
Query to be used to filter the data to be processed.
force_dataflow: bool default False
If Dataflow job should be spinned even for small datasets (by default, small datasets are processed directly inside Carol)
records_percentage” int default 100
The percentage of records (0-100) to import
Returns:None