pycarol.cds¶
The main Carol’s storage is called CDS (Carol Data Storage). Any data received or created in Carol is sent to CDS. Inside CDS one can have three kinds of data. Data coming from the Staging Area, data processed and mapped to a DataModel (Golden Record), and any other file that the user could send. The pycarol.cds.CDSStaging and the pycarol.cds.CDSGolden classes are used to manipulate the data inside the first two cases.
-
class
pycarol.cds.CDSGolden(carol)[source]¶ Class to handle all CDS Staging iterations.
Args:
- carol: ‘pycarol.Carol`
- Carol() instance.
-
consolidate(dm_name=None, dm_id=None, worker_type=None, max_number_workers=-1, number_shards=-1)[source]¶ Process staging CDS data.
Args:
- dm_name: str,
- Data Model name.
- dm_id: str, default None
- Data Model id.
- worker_type: str, default None
- Machine flavor to be used. If None Carol will decide the machine to use.
- max_number_workers: int, default -1
- Max number of workers to be used during the process. ‘-1’ means all the available.
- number_shards: int, default -1
- Number of shards.
Returns: None
-
count(dm_name=None, dm_id=None)[source]¶ Count number of messages in CDS.
Args:
- dm_name: str,
- Data Model name.
- dm_id: str, default None
- Data Model id.
Returns: int Count
-
delete(dm_name=None, dm_id=None)[source]¶ Delete all CDS data model data.
Args:
- dm_name: str,
- Data Model name.
- dm_id: str, default None
- Data Model id.
Returns: None
-
sync_data(dm_name, dm_id=None, num_records=-1, file_pattern='*', filter_query=None)[source]¶ Sync data to realtime layer.
Args:
- dm_name: str,
- Data model name.
- dm_id: str, default None
- Data model id.
- num_records: int, default -1
- Number of records to be processed. ‘-1’ means all the records.
- file_pattern: str, default *
- File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
- filter_query: dict, default None
- Query to be used to filter the data to be processed.
Returns: None
-
class
pycarol.cds.CDSStaging(carol)[source]¶ Class to handle all CDS Staging iterations.
-
consolidate(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1)[source]¶ Process staging CDS data.
Args:
- staging_name: str,
- Staging name.
- connector_id: str, default None
- Connector id.
- connector_name: str, default None
- Connector name.
- worker_type: str, default None
- Machine flavor to be used. If None Carol will decide the machine to use.
- max_number_workers: int, default -1
- Max number of workers to be used during the process. ‘-1’ means all the available.
- number_shards: int, default -1
- Number of shards.
Returns: None
-
count(staging_name, connector_id=None, connector_name=None)[source]¶ Count number of messages in CDS.
Args:
- staging_name: str,
- Staging name.
- connector_id: str, default None
- Connector id.
- connector_name: str, default None
- Connector name
Returns: int Count
-
delete(staging_name, connector_id=None, connector_name=None)[source]¶ Delete all CDS staging data.
Args:
- staging_name: str,
- Staging name.
- connector_id: str, default None
- Connector id.
- connector_name: str, default None
- Connector name
Returns: None
-
process_data(staging_name, connector_id=None, connector_name=None, worker_type=None, max_number_workers=-1, number_shards=-1, num_records=-1, delete_target_folder=False, enable_realtime=False, delete_realtime_records=False, send_realtime=False, file_pattern='*', filter_query=None)[source]¶ Process CDS staging data.
Args:
- staging_name: str,
- Staging name.
- connector_id: str, default None
- Connector id.
- connector_name: str, default None
- Connector name.
- worker_type: str, default None
- Machine flavor to be used. If None Carol will decide the machine to use.
- max_number_workers: int, default -1
- Max number of workers to be used during the process. ‘-1’ means all the available.
- number_shards: int, default -1
- Number of shards.
- num_records: int, default -1
- Number of records to be processed. ‘-1’ means all the records.
- delete_target_folder: bool, default False
- If delete the previous processed records.
- enable_realtime: bool, default False
- Enable this staging table to send the processed data to realtime layer.
- delete_realtime_records: bool, default False
- Delete previous processed data in realtime.
- send_realtime: bool, default False
- Send the processed data to realtime layer.
- file_pattern: str, default *
- File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
- filter_query: dict, default None
- Query to be used to filter the data to be processed.
Returns: None
-
sync_data(staging_name, connector_id=None, connector_name=None, num_records=-1, delete_realtime_records=False, enable_realtime=False, file_pattern='*', filter_query=None)[source]¶ Sync data to realtime layer.
Args:
- staging_name: str,
- Staging name.
- connector_id: str, default None
- Connector id.
- connector_name: str, default None
- Connector name.
- num_records: int, default -1
- Number of records to be processed. ‘-1’ means all the records.
- enable_realtime: bool, default False
- Enable this staging table to send the processed data to realtime layer.
- delete_realtime_records: bool, default False
- Delete previous processed data in realtime.
- file_pattern: str, default *
- File pattern of the files in CDS to be processed. The pattern in YYYY-MM-DDTHH_mm_ss*.parquet. One can use this to filter data in CDS received in a given date.
- filter_query: dict, default None
- Query to be used to filter the data to be processed.
Returns: None
-