pycarol.staging

class pycarol.staging.Staging(carol)[source]

Class to send data to Carol.

create_schema(fields_dict=None, staging_name=None, connector_id=None, connector_name=None, mdm_flexible=False, crosswalk_name=None, crosswalk_list=None, overwrite=False, auto_send=True, export_data=False, data=None)[source]
Args:
fields_dict: dict, list of dicts, pandas.DataFrame, default None
Data to create schema from. fields_dict will be removed in the future. Use parameter data
staging_name: str,
Staging name to send the data.
connector_id: str, default None
Connector name where the staging should be.
connector_name: str, default None
Connector name where the staging should be.
mdm_flexible: bool, default False
If flexible schema.
crosswalk_name: None, default staging_name
Crosswalk name. Most of the time it should be the staging name.
crosswalk_list: list, default None
Crosswalk list of fields.
overwrite: bool, default False
if already exists, overwrite current schema
auto_send: bool, default True
Send the schema after creating.
export_data: bool, default False
Export data to CDS for this staging. This is a manual export.
data: json, list of dicts, pandas.DataFrame, default None
Data to create schema from.
drop_staging(staging_name=None, connector_id=None, connector_name=None, reject_on_no_schema=False, reject_on_etl_existence=False, reject_on_mapping_existence=False)[source]

Args:

staging_name: str,
Staging name.
connector_name: str, default None
Connector name.
connector_id: str, default None
Connector Id.
reject_on_no_schema: bool default False
Do not drop if no schema found.
reject_on_etl_existence: bool default False
Raise an error if there exists a ETL for this staging table
reject_on_mapping_existence: bool default False
Raise an error if there exists a mapping for this staging table

Returns:

Carol response
export(staging_name, connector_id=None, connector_name=None, sync_staging=True, full_export=False, delete_previous=False)[source]

Export Staging from RT to CDS

This method will trigger or pause the export of the data in the staging to CDS.

Args:
staging_name: str, default None
Staging name to send the data. If empty it will get from the schema
connector_id: str, default None
Connector name. If empty it will get from the schema
connector_name: str, default None
Connector name. If empty it will get from the schema
sync_staging: bool default True
Start export for this staging
full_export: bool default False
Full export sync for this staging
delete_previous: bool default False
Delete previous data.

Returns: None

Usage: To trigger the export the first time:

from pycarol.staging import Staging
from pycarol.auth.PwdAuth import PwdAuth
from pycarol.carol import Carol
login = Carol()
stag  = Staging(login)
stag.export(staging, connector_name=connector_name,sync_staging=True)

#To do a resync, that is, start the sync from the begining without delete old data
stag.export(staging, connector_name=connector_name,sync_staging=True, full_export=True)

#To delete the old data:
stag.export(staging, connector_name=connector_name,sync_staging=True, full_export=True, delete_previous=True)

#To Pause a sync:
stag.export(staging, connector_name=connector_name,sync_staging=False)
export_all(connector_id=None, connector_name=None, sync_staging=True, full_export=False, delete_previous=False)[source]

Export all Stagings from a connector to s3

This method will trigger or pause the export of all stagings to s3.

Parameters:
  • sync_stagingbool, default True Sync the data model
  • connector_namestr Connector name
  • connector_idstr Connector id
  • full_exportbool, default True Do a resync of the data model
  • delete_previousbool, default False Delete previous exported files.
Returns:

None

Usage: See Staging.export()

fetch_parquet(staging_name, connector_id=None, connector_name=None, backend='pandas', merge_records=True, return_dask_graph=False, columns=None, max_hits=None, return_metadata=False, callback=None, cds=False, max_workers=None, file_pattern=None, return_callback_result=False)[source]

Fetch parquet from a staging table.

Args:
staging_name: str,
Staging name to fetch parquet of
connector_id: str, default None
Connector id to fetch parquet of
connector_name: str, default None
Connector name to fetch parquet of
backend: [‘dask’,’pandas’], default dask
if to use either dask or pandas to fetch the data
merge_records: bool, default True
This will keep only the most recent record exported. Sometimes there are updates and/or deletions and one should keep only the last records.
return_dask_graph: bool, default false
If to return the dask graph or the dataframe.
columns: list, default None
List of columns to fetch.
max_hits: int, default None
Number of records to get. This only should be user for tests.
return_metadata: bool, default False
To return or not the fields [‘mdmId’, ‘mdmCounterForEntity’]
callback: callable, default None
Function to be called each downloaded file.
cds: bool, default False
Get staging data from CDS.
max_workers: int default None
Number of workers to use when downloading parquet files with pandas back-end.
file_pattern: str default None
File pattern to filter data when fetching from CDS. e.g. file_pattern=‘2019-11-25’ will fetch only CDS files that start with 2019-11-25.
return_callback_result bool default False
If a callback is used, it will return the result of the response of the callback. This will skip all the operation to merge records and return selected columns.
Returns: pandas.DataFrame
DataFrame with the staging data.
get_schema(staging_name, connector_name=None, connector_id=None)[source]

Return the staging table schema.

Args:
staging_name: str,
Staging name
connector_name: str, default None
Connector name. Either connector_name or connector_id need to be set.
connector_id: str, default None
Connector Id. Either connector_name or connector_id need to be set.
Returns: dict
Staging table schema
send_data(staging_name, data=None, connector_name=None, connector_id=None, step_size=500, print_stats=True, gzip=True, auto_create_schema=False, crosswalk_auto_create=None, flexible_schema=False, force=False, max_workers=2, dm_to_delete=None, async_send=False, carol_data_storage=False, storage_only=False)[source]

Send data to a staging table in Carol.

Args:
staging_name: str,
Staging name to send the data.
data: pandas data frame, json. default None
Data to be send to Carol
connector_name: str, default None
Connector name where the staging should be. Either connector_name or connector_id need to be set.
connector_id: str, default None
Connector Id where the staging should be. Either connector_name or connector_id need to be set.
step_size: int, default 500
Number of records to be sent in each iteration. Max size for each batch is 10MB.
print_stats:bool, default True
If print the number of records sent
gzip:bool, default True
If send each batch as a gzip file.
auto_create_schema:bool, default False
If to auto create the schema for the data being sent.
crosswalk_auto_create: list, default None
If auto_create_schema=True, crosswalk list of fields.
flexible_schema: bool, default False
If auto_create_schema=True, to use a flexible schema.
force: bool, default False
pycarol will check for duplicated values given the crosswalk. If force=True it will not check. If False it will check for duplicates and raise an error.
max_workers: int, default 2
To be used with async_send=True. Number of threads to use when sending.
dm_to_delete: str, default None DEPRECATED.
Name of the data model to be erased before send the data.
async_send: bool, default False

To use async to send the data. This is much faster than a sequential send. It can conflict with jupyter notebooks process. To run this inside a jupyter notebook, use .. code:: python

import nest_asyncio nest_asyncio.apply()
carol_data_storage: bool, default False
Deprecated, Use storage_only
storage_only: bool, default False
Send data only to CDS.
send_schema(schema, staging_name=None, connector_id=None, connector_name=None, overwrite=False)[source]

Send schema to Carol

Args:
schema: dict
Dictianary with the schema to be sent
staging_name: str, default None
Staging name to send the data. If empty it will get from the schema
connector_id: str, default None
Connector name. If empty it will get from the schema
connector_name: str, default None
Connector name. If empty it will get from the schema
overwrite: bool, default False
if already exists, overwrite current schema

Returns: None