pycarol.staging¶
-
class
pycarol.staging.Staging(carol)[source]¶ Class to send data to Carol.
-
create_schema(fields_dict=None, staging_name=None, connector_id=None, connector_name=None, mdm_flexible=False, crosswalk_name=None, crosswalk_list=None, overwrite=False, auto_send=True, export_data=False, data=None)[source]¶ - Args:
- fields_dict: dict, list of dicts, pandas.DataFrame, default None
- Data to create schema from. fields_dict will be removed in the future. Use parameter data
- staging_name: str,
- Staging name to send the data.
- connector_id: str, default None
- Connector name where the staging should be.
- connector_name: str, default None
- Connector name where the staging should be.
- mdm_flexible: bool, default False
- If flexible schema.
- crosswalk_name: None, default staging_name
- Crosswalk name. Most of the time it should be the staging name.
- crosswalk_list: list, default None
- Crosswalk list of fields.
- overwrite: bool, default False
- if already exists, overwrite current schema
- auto_send: bool, default True
- Send the schema after creating.
- export_data: bool, default False
- Export data to CDS for this staging. This is a manual export.
- data: json, list of dicts, pandas.DataFrame, default None
- Data to create schema from.
-
drop_staging(staging_name=None, connector_id=None, connector_name=None, reject_on_no_schema=False, reject_on_etl_existence=False, reject_on_mapping_existence=False)[source]¶ Args:
- staging_name: str,
- Staging name.
- connector_name: str, default None
- Connector name.
- connector_id: str, default None
- Connector Id.
- reject_on_no_schema: bool default False
- Do not drop if no schema found.
- reject_on_etl_existence: bool default False
- Raise an error if there exists a ETL for this staging table
- reject_on_mapping_existence: bool default False
- Raise an error if there exists a mapping for this staging table
Returns:
Carol response
-
export(staging_name, connector_id=None, connector_name=None, sync_staging=True, full_export=False, delete_previous=False)[source]¶ @DEPRECATED. This function was removed in pycarol 3.34
Export Staging from RT to CDS
This method will trigger or pause the export of the data in the staging to CDS.
- Args:
- staging_name: str, default None
- Staging name to send the data. If empty it will get from the schema
- connector_id: str, default None
- Connector name. If empty it will get from the schema
- connector_name: str, default None
- Connector name. If empty it will get from the schema
- sync_staging: bool default True
- Start export for this staging
- full_export: bool default False
- Full export sync for this staging
- delete_previous: bool default False
- Delete previous data.
Returns: None
Usage: To trigger the export the first time:
from pycarol.staging import Staging from pycarol.auth.PwdAuth import PwdAuth from pycarol.carol import Carol login = Carol() stag = Staging(login) stag.export(staging, connector_name=connector_name,sync_staging=True) #To do a resync, that is, start the sync from the begining without delete old data stag.export(staging, connector_name=connector_name,sync_staging=True, full_export=True) #To delete the old data: stag.export(staging, connector_name=connector_name,sync_staging=True, full_export=True, delete_previous=True) #To Pause a sync: stag.export(staging, connector_name=connector_name,sync_staging=False)
-
export_all(connector_id=None, connector_name=None, sync_staging=True, full_export=False, delete_previous=False)[source]¶ @DEPRECATED. This function was removed in pycarol 3.34
Export all Stagings from a connector to s3
This method will trigger or pause the export of all stagings to s3.
Parameters: - sync_staging – bool, default True Sync the data model
- connector_name – str Connector name
- connector_id – str Connector id
- full_export – bool, default True Do a resync of the data model
- delete_previous – bool, default False Delete previous exported files.
Returns: None
Usage: See Staging.export()
-
fetch_parquet(staging_name, connector_id=None, connector_name=None, backend='pandas', merge_records=True, return_dask_graph=False, columns=None, max_hits=None, return_metadata=False, callback=None, cds=True, max_workers=None, file_pattern=None, return_callback_result=False)[source]¶ Fetch parquet from a staging table.
- Args:
- staging_name: str,
- Staging name to fetch parquet of
- connector_id: str, default None
- Connector id to fetch parquet of
- connector_name: str, default None
- Connector name to fetch parquet of
- backend: [‘dask’,’pandas’], default dask
- if to use either dask or pandas to fetch the data
- merge_records: bool, default True
- This will keep only the most recent record exported. Sometimes there are updates and/or deletions and one should keep only the last records.
- return_dask_graph: bool, default false
- If to return the dask graph or the dataframe.
- columns: list, default None
- List of columns to fetch.
- max_hits: int, default None
- Number of records to get. This only should be user for tests.
- return_metadata: bool, default False
- To return or not the fields like [‘mdmId’, ‘mdmCounterForEntity’, etc.]
- callback: callable, default None
- Function to be called each downloaded file.
- cds: bool, default True
- Get staging data from CDS.
- max_workers: int default None
- Number of workers to use when downloading parquet files with pandas back-end.
- file_pattern: str default None
- File pattern to filter data when fetching from CDS. e.g. file_pattern=’2019-11-25’ will fetch only CDS files that start with 2019-11-25.
- return_callback_result bool default False
- If a callback is used, it will return the result of the response of the callback. This will skip all the operation to merge records and return selected columns.
- Returns: pandas.DataFrame
- DataFrame with the staging data.
-
generate_SQL_tables(connector_id=None, connector_name=None)[source]¶ Generates SQL tables from a connector.
The tables created will be named: stg_{connector_id}_{staging_name}.
- Args:
- connector_id (str, optional): Connector ID. Defaults to None. connector_name (str, optional): Connector name. Defaults to None.
- Returns:
- dict: Carol’s response
-
get_schema(staging_name, connector_name=None, connector_id=None)[source]¶ Return the staging table schema.
- Args:
- staging_name: str,
- Staging name
- connector_name: str, default None
- Connector name. Either connector_name or connector_id need to be set.
- connector_id: str, default None
- Connector Id. Either connector_name or connector_id need to be set.
- Returns: dict
- Staging table schema
-
remove_SQL_tables(connector_id=None, connector_name=None)[source]¶ Remove all SQL tables from a connector
- Args:
- connector_id (str, optional): Connector ID. Defaults to None. connector_name (str, optional): Connector name. Defaults to None.
- Returns:
- dict: Carol’s response
-
send_data(staging_name, data=None, connector_name=None, connector_id=None, step_size=500, print_stats=True, gzip=True, auto_create_schema=False, crosswalk_auto_create=None, flexible_schema=False, force=False, max_workers=2, dm_to_delete=None, async_send=False, carol_data_storage=False, storage_only=True, carol_sync=False)[source]¶ Send data to a staging table in Carol.
- Args:
- staging_name: str,
- Staging name to send the data.
- data: pandas data frame, json. default None
- Data to be send to Carol
- connector_name: str, default None
- Connector name where the staging should be. Either connector_name or connector_id need to be set.
- connector_id: str, default None
- Connector Id where the staging should be. Either connector_name or connector_id need to be set.
- step_size: int, default 500
- Number of records to be sent in each iteration. Max size for each batch is 10MB.
- print_stats:bool, default True
- If print the number of records sent
- gzip:bool, default True
- If send each batch as a gzip file.
- auto_create_schema:bool, default False
- If to auto create the schema for the data being sent.
- crosswalk_auto_create: list, default None
- If auto_create_schema=True, crosswalk list of fields.
- flexible_schema: bool, default False
- If auto_create_schema=True, to use a flexible schema.
- force: bool, default False
- pycarol will check for duplicated values given the crosswalk. If force=True it will not check. If False it will check for duplicates and raise an error.
- max_workers: int, default 2
- To be used with async_send=True. Number of threads to use when sending.
- dm_to_delete: str, default None DEPRECATED.
- Name of the data model to be erased before send the data.
- async_send: bool, default False
To use async to send the data. This is much faster than a sequential send. It can conflict with jupyter notebooks process. To run this inside a jupyter notebook, use
import nest_asyncio nest_asyncio.apply()
- carol_data_storage: bool, default False
- Deprecated, Use storage_only
- storage_only: bool, default False
- Send data only to CDS.
- carol_sync: bool, default False
- Send and wait data to be processed in Carol
-
send_schema(schema, staging_name=None, connector_id=None, connector_name=None, overwrite=False)[source]¶ Send schema to Carol
- Args:
- schema: dict
- Dictianary with the schema to be sent
- staging_name: str, default None
- Staging name to send the data. If empty it will get from the schema
- connector_id: str, default None
- Connector name. If empty it will get from the schema
- connector_name: str, default None
- Connector name. If empty it will get from the schema
- overwrite: bool, default False
- if already exists, overwrite current schema
Returns: None
-