pycarol.staging
- class pycarol.staging.Staging(carol)[source]
Class to send data to Carol.
- create_schema(fields_dict=None, staging_name=None, connector_id=None, connector_name=None, mdm_flexible=False, crosswalk_name=None, crosswalk_list=None, overwrite=False, auto_send=True, export_data=False, data=None)[source]
- Parameters
fields_dict – dict, list of dicts, pandas.DataFrame, default None Data to create schema from. fields_dict will be removed in the future. Use parameter data
staging_name – str, Staging name to send the data.
connector_id – str, default None Connector name where the staging should be.
connector_name – str, default None Connector name where the staging should be.
mdm_flexible – bool, default False If flexible schema.
crosswalk_name – None, default staging_name Crosswalk name. Most of the time it should be the staging name.
crosswalk_list – list, default None Crosswalk list of fields.
overwrite – bool, default False if already exists, overwrite current schema
auto_send – bool, default True Send the schema after creating.
export_data – bool, default False Export data to CDS for this staging. This is a manual export.
data – json, list of dicts, pandas.DataFrame, default None Data to create schema from.
- drop_staging(staging_name=None, connector_id=None, connector_name=None, reject_on_no_schema=False, reject_on_etl_existence=False, reject_on_mapping_existence=False)[source]
- Parameters
staging_name – str, Staging name.
connector_name – str, default None Connector name.
connector_id – str, default None Connector Id.
reject_on_no_schema – bool default False Do not drop if no schema found.
reject_on_etl_existence – bool default False Raise an error if there exists a ETL for this staging table
reject_on_mapping_existence – bool default False Raise an error if there exists a mapping for this staging table
- Returns
Carol response
- fetch_parquet(staging_name, connector_id=None, connector_name=None, backend='pandas', merge_records=True, return_dask_graph=False, columns=None, max_hits=None, return_metadata=False, callback=None, cds=True, max_workers=None, file_pattern=None, return_callback_result=False)[source]
Fetch parquet from a staging table.
- Parameters
staging_name – str, Staging name to fetch parquet of
connector_id – str, default None Connector id to fetch parquet of
connector_name – str, default None Connector name to fetch parquet of
backend – [‘dask’,’pandas’], default dask if to use either dask or pandas to fetch the data
merge_records – bool, default True This will keep only the most recent record exported. Sometimes there are updates and/or deletions and one should keep only the last records.
return_dask_graph – bool, default false If to return the dask graph or the dataframe.
columns – list, default None List of columns to fetch.
max_hits – int, default None Number of records to get. This only should be user for tests.
return_metadata – bool, default False To return or not the fields like [‘mdmId’, ‘mdmCounterForEntity’, etc.]
callback – callable, default None Function to be called each downloaded file.
cds – bool, default True Get staging data from CDS.
max_workers – int default None Number of workers to use when downloading parquet files with pandas back-end.
file_pattern – str default None File pattern to filter data when fetching from CDS. e.g. file_pattern=’2019-11-25’ will fetch only CDS files that start with 2019-11-25.
False (return_callback_result bool default) – If a callback is used, it will return the result of the response of the callback. This will skip all the operation to merge records and return selected columns.
- Returns: pandas.DataFrame
DataFrame with the staging data.
- get_schema(staging_name, connector_name=None, connector_id=None)[source]
Return the staging table schema.
- Parameters
staging_name – str, Staging name
connector_name – str, default None Connector name. Either connector_name or connector_id need to be set.
connector_id – str, default None Connector Id. Either connector_name or connector_id need to be set.
- Returns: dict
Staging table schema
- send_data(staging_name, data=None, connector_name=None, connector_id=None, step_size=500, print_stats=True, gzip=True, auto_create_schema=False, crosswalk_auto_create=None, flexible_schema=False, force=False, max_workers=2, dm_to_delete=None, async_send=False, carol_data_storage=False, storage_only=True, carol_sync=False)[source]
Send data to a staging table in Carol.
- Parameters
staging_name – str, Staging name to send the data.
data – pandas data frame, json. default None Data to be send to Carol
connector_name – str, default None Connector name where the staging should be. Either connector_name or connector_id need to be set.
connector_id – str, default None Connector Id where the staging should be. Either connector_name or connector_id need to be set.
step_size – int, default 500 Number of records to be sent in each iteration. Max size for each batch is 10MB.
print_stats – bool, default True If print the number of records sent
gzip – bool, default True If send each batch as a gzip file.
auto_create_schema – bool, default False If to auto create the schema for the data being sent.
crosswalk_auto_create – list, default None If auto_create_schema=True, crosswalk list of fields.
flexible_schema – bool, default False If auto_create_schema=True, to use a flexible schema.
force – bool, default False pycarol will check for duplicated values given the crosswalk. If force=True it will not check. If False it will check for duplicates and raise an error.
max_workers – int, default 2 To be used with async_send=True. Number of threads to use when sending.
dm_to_delete – str, default None DEPRECATED. Name of the data model to be erased before send the data.
async_send –
bool, default False To use async to send the data. This is much faster than a sequential send. It can conflict with jupyter notebooks process. To run this inside a jupyter notebook, use
import nest_asyncio nest_asyncio.apply()
carol_data_storage – bool, default False Deprecated, Use storage_only
storage_only – bool, default False Send data only to CDS.
carol_sync – bool, default False Send and wait data to be processed in Carol
- send_schema(schema, staging_name=None, connector_id=None, connector_name=None, overwrite=False)[source]
Send schema to Carol
- Parameters
schema – dict Dictianary with the schema to be sent
staging_name – str, default None Staging name to send the data. If empty it will get from the schema
connector_id – str, default None Connector name. If empty it will get from the schema
connector_name – str, default None Connector name. If empty it will get from the schema
overwrite – bool, default False if already exists, overwrite current schema
Returns: None