pycarol.bigquery

Back-end for BigQuery-related code.

class pycarol.bigquery.BQ(carol: Carol, service_account: Optional[Dict[str, Any]] = None, cache_cds: bool = True)[source]

Handles BigQuery queries.

Parameters
  • carol – object from Carol class.

  • service_account – Google Cloud SA with access to BQ.

  • cache_cds – if SA should be cached for subsequent uses.

query(query: str, dataset_id: Optional[str] = None, return_dataframe: bool = True) Union[pandas.DataFrame, List[Dict[str, Any]]][source]

Run query. This will generate a SA if necessary.

Parameters
  • query – BigQuery SQL query.

  • dataset_id – BigQuery dataset ID, if not provided, it will use the default one.

  • return_dataframe – Return dataframe if True.

Returns

Query result.

Usage:

from pycarol import BQ, Carol

bq = BQ(Carol())
query = 'select * from invoice limit 10'
df = bq.query(query, return_dataframe=True)
class pycarol.bigquery.BQStorage(carol: Carol, service_account: Optional[Dict[str, Any]] = None, cache_cds: bool = True)[source]

Handles BigQuery Storage API queries.

Parameters
  • carol – object from Carol class.

  • service_account – Google Cloud SA with access to BQ.

  • cache_cds – if SA should be cached for subsequent uses.

query(table_name: str, columns_names: Optional[List[str]] = None, return_dataframe: bool = True) Union[pandas.DataFrame, List[ReadRowsPage]][source]

Read from BigQuery Storage API.

Parameters
  • table_name – name of the table (views are not supported).

  • columns_names – names of columns to return.

  • return_dataframe – if True, return a pandas DataFrame.

Returns

Query result.

Usage:

from pycarol import BQStorage, Carol

bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
df = bq.query(table_name, col_names, return_dataframe=True)
class pycarol.bigquery.Token(service_account: Dict[str, Any], env: Dict[str, str])[source]

Stores Token/Service Account information.

Parameters
  • service_account – service account.

  • env – dict from Carol.get_current().

expiration_time

datetime of service account expiration.

env

dict from Carol.get_current().

service_account

provided by Carol.

expired() bool[source]

Check if token has expired.

Return True if has expired.

to_dict() Dict[str, Any][source]

Convert Token to dictionary.

Returns: dict Token.

class pycarol.bigquery.TokenManager(carol: Carol, provided_sa: Optional[Dict[str, Any]] = None, cache_cds: bool = True, expiration_window: int = 24)[source]

Manages Google service accounts to allow BigQuery operations.

Parameters
  • carol – Carol object.

  • provided_sa – if not provided, it will automatically generate.

  • cache_cds – either cache service account to google cloud or not so that other uses may share it. This may be helpful to reduce the number of generated service accounts.

  • expiration_window – Number of hours for credentials to expire. Max value 24.

expiration_window

Number of hours for credentials to expire. Max value 24.

cache_cds

either cache service account to google cloud or not so that other uses may share it. This may be helpful to reduce the number of generated service accounts.

token

Token object containing service account and metadata.

get_forced_token() Token[source]

Get new service account for BigQuery and cache it.

Returns

Service account

from pycarol import Carol
from pycarol.bigquery import TokenManager

tm = TokenManager(Carol())
service_account = tm.get_forced_token().service_account
get_token() Token[source]

Get service account for BigQuery and cache it.

Returns

Service account

from pycarol import Carol
from pycarol.bigquery import TokenManager

tm = TokenManager(Carol())
service_account = tm.get_token().service_account