Source code for pycarol.query

"""Contain all the classes to query data from RT layer in Carol."""
import copy
from datetime import datetime
import json
import itertools
import typing as T

from retry import retry

from .connectors import Connectors
from .exceptions import NoScrollIdException, RepeatedMDMIdsException
from .filter import Filter, MAXIMUM, MINIMUM, TYPE_FILTER, TERM_FILTER
from .filter import RANGE_FILTER as RF
from .named_query import NamedQuery
from .utils.miscellaneous import ranges


[docs]def delete_golden(carol, dm_name, now=None): """ Delete Golden records. It Will delete all golden records of a given data model based on lastUpdate. Args: carol: `pycarol.carol.Carol` Carol instance dm_name: `str` Data model name now: `str` Delete records where last update is less the `now`. Any date time ISO format is accepted. Usage: .. code:: python from pycarol.query import delete_golden from pycarol.auth.PwdAuth import PwdAuth from pycarol.carol import Carol login = Carol() delete_golden(login, dm_name=my_dm) #To delete based on a date. delete_golden(login, dm_name=my_dm, now='2018-11-16') Attention: This API will delete all data in the DataModel, and if there is a DataModel View related to this DataModel one needs to reprocess it. """ if now is None: now = datetime.utcnow().isoformat(timespec="seconds") json_query = ( Filter.Builder() .should(TYPE_FILTER(value=dm_name + "Golden")) .should(TYPE_FILTER(value=dm_name + "Master")) .must(RF("mdmLastUpdated", [None, now])) .build() .to_json() ) try: Query(carol).delete(json_query) except: # if it it too many records, one would have a timeout but the records will be deleted anyway pass json_query = ( Filter.Builder() .type(dm_name + "Rejected") .must(RF("mdmLastUpdated", [None, now])) .build() .to_json() ) try: Query(carol, index_type="STAGING").delete(json_query) except: # if it it too many records, one would have a timeout but the records will be deleted anyway pass
[docs]class Query: """ Class to query data from Carol. This class can be used to query data from data models and stagings tables, query using named queries and delete records. Args: carol: carol: Carol object Carol object. max_hits: `int`, default float('inf') Number of records that will be downloaded. offset: `int`, default 0 Offset for pagination. Only used when `scrollable=False` page_size: `int`, default 100 Number of records downloaded in each pagination. The maximum value is 1000 sort_order: `str`, default 'ASC' Sort ascending ('ASC') vs. descending ('DESC'). sort_by: `str`, default `None` Name to sort by. scrollable: `bool`, default True Use scroll for pagination. This should be the main way of doing, unless you are querying few data. index_type: `str`, default 'MASTER' Query data from 'MASTER', 'STAGING' only_hits: `bool`, default 'True' Return only results in the response path $hits.mdmGoldenFieldAndValues fields: `list`, default `None` Fields to return in response. e.g., ["mdmGoldenFieldAndValues.mdmtaxid", "mdmGoldenFieldAndValues.date"] get_aggs: `bool`, default `False` To be used if the query/named query has aggregations save_results: `bool`, default `False` If save the result of the query in the file specified in `filename` filename: `str`, default `query_result.json` File path to save the response. print_status: `bool`, default `True` Print the number of records in each interaction. safe_check: `bool`, default `False` To be used if there are repeated records (same mdmId) get_errors: `bool`, default `False` To get the errors in the goldenRecords, if any. flush_result: `bool`, default `False` To be used with save_results, it will not copy the result to memory, only to the file. use_stream: `bool`, default `False` Use the stram of data. get_times: `bool`, default `False` It will create a list of times that each pagination took. kwargs: `dict` Extra parameters to be passed to Carol.call_api """ def __init__( self, carol, max_hits=float("inf"), offset=0, page_size=100, sort_order="ASC", sort_by=None, scrollable=True, index_type="MASTER", only_hits=True, fields=None, get_aggs=False, save_results=False, filename="query_result.json", print_status=True, safe_check=False, get_errors=False, flush_result=False, use_stream=False, get_times=False, **kwargs, ): self.carol = carol self.max_hits = max_hits self.offset = offset self.page_size = page_size self.sort_order = sort_order self.sort_by = sort_by self.scrollable = scrollable self.index_type = index_type self.only_hits = only_hits self.fields = fields self.get_aggs = get_aggs self.use_stream = use_stream self.save_results = save_results self.filename = filename self.print_status = print_status self.safe_check = safe_check self.get_errors = get_errors self.flush_result = flush_result self.get_times = get_times self.named_query = None self.callback = None self.query_errors: T.Dict[str, T.Any] = {} # Crated to send to the Rest API self.query_params = None self.drop_list = None self.json_query = None self.total_hits = None self.query_times = [] self.kwargs = kwargs self.results = [] if self.max_hits == float("inf"): self.get_all = True else: self.get_all = False def _build_query_params(self): self.query_params = { "offset": self.offset, "pageSize": self.page_size, "sortOrder": self.sort_order, "indexType": self.index_type, } if self.sort_by is not None: self.query_params["sortBy"] = self.sort_by if self.scrollable: self.query_params["scrollable"] = self.scrollable if self.fields: self.query_params["fields"] = self.fields def _build_return_fields(self): if isinstance(self.fields, str): self.fields = [self.fields] if self.fields is not None: self.fields = ",".join(self.fields)
[docs] def page(self, offset=0): """ Get only one page of the result using offset. Args: offset: `int`, default 0 Offset to get. To properly paginate manually, offset should be `offset + page_size`. Returns: Query json response """ self.offset = offset self.scrollable = False self.results = [] if self.json_query is None: raise ValueError("You must call query() or named() before calling page()") self._build_return_fields() self._build_query_params() if self.named_query is None: url_filter = "v2/queries/filter" else: url_filter = f"v2/queries/named/{self.named_query}" result = self.carol.call_api( url_filter, data=self.json_query, params=self.query_params, timeout=240, method_whitelist=frozenset(["POST"]), **self.kwargs, ) if self.only_hits is True: result = result["hits"] return [ elem.get("mdmGoldenFieldAndValues", elem) for elem in result if elem.get("mdmGoldenFieldAndValues", None) ] else: return result
[docs] def go(self, callback: T.Optional[T.Callable] = None) -> "Query": """Run the query. Args: callback: This function will receive the current batch of records from the filter made. Returns: Query self. """ self.results = [] if self.json_query is None: raise ValueError( "You must call all() or query() or named() before calling go()" ) self._build_return_fields() self._build_query_params() if not self.offset == 0: raise ValueError( "It is not possible to use offset when using scroll for pagination" ) self._scrollable_query_handler(callback) return self
def _scrollable_query_handler( self, callback: T.Optional[T.Callable] = None ) -> None: if self.named_query is None: url_filter = "v2/queries/filter" else: url_filter = f"v2/queries/named/{self.named_query}" count = self.offset downloaded = 0 set_param = True to_get = float("inf") self.query_errors = {} self.mdmId_list = [] while count < to_get: result = self._query_request(url_filter) if set_param is True: self.total_hits = result["totalHits"] to_get = min(self.max_hits, self.total_hits) set_param = False count += result["count"] downloaded += result["count"] scroll_id = result.get("scrollId", None) url_filter = f"v2/queries/filter/{scroll_id}" if self.get_times is True: self.query_times.append(result.pop("took")) if self.safe_check is True: self.mdmId_list.extend([mdm_id["mdmId"] for mdm_id in result["hits"]]) if len(self.mdmId_list) > len(set(self.mdmId_list)): raise RepeatedMDMIdsException if self.get_errors is True: errors = { elem.get("mdmId", elem): elem.get("mdmErrors", elem) for elem in result["hits"] if "mdmErrors" in elem and elem["mdmErrors"] } self.query_errors.update(errors) if self.only_hits is True: result = result["hits"] result = [ elem.get("mdmGoldenFieldAndValues", elem) for elem in result if elem.get("mdmGoldenFieldAndValues", None) ] # get mdmGoldenFieldAndValues if not empty and if it exists if self.flush_result is False: self.results.extend(result) else: result.pop("count") result.pop("totalHits") if self.flush_result is False: self.results.append(result) if callback is not None: callback(result) if self.print_status is True: print(f"{downloaded}/{to_get}", end="\r") if self.save_results is True: _write_results(self.filename, result) if self.get_aggs is True and self.only_hits is False: break @retry(exceptions=NoScrollIdException, tries=5) def _query_request(self, url: str) -> T.Dict: if url == "v2/queries/filter/None": raise NoScrollIdException result = self.carol.call_api( url, data=self.json_query, params=self.query_params, timeout=240, method_whitelist=frozenset(["POST"]), **self.kwargs, ) return result
[docs] def check_total_hits(self, json_query, index_type="MASTER"): """ Check the total hits for a given query :param json_query: Json object with the query to use :param index_type: Index type to query. :return: number of records for this query """ querystring = {"indexType": index_type} self.json_query = json_query url_filter = "v2/queries/count" result = self.carol.call_api( url_filter, data=self.json_query, method="POST", params=querystring, method_whitelist=frozenset( ["HEAD", "TRACE", "GET", "PUT", "OPTIONS", "DELETE", "POST"] ), ) self.total_hits = result return result
def named(self, named_query, params=None, json_query=None): if json_query is not None and params is not None: raise Exception( "You can only specify either params or json_query, but not both!" ) self.results = [] self.named_query = named_query if json_query is not None: self.json_query = json_query else: if params is None: params = {} self.json_query = params return self def named_query_params(self, named_query): named = NamedQuery(self.carol) named.by_name(named_query=named_query) return named.param_dict def query(self, json_query): self.json_query = json_query return self def all(self, dm_name): if not dm_name.endswith("Golden"): dm_name = dm_name + "Golden" self.json_query = Filter.Builder().type(dm_name).build().to_json() self.index_type = "MASTER" return self def delete(self, json_query): # TODO: we should check the number of records to be deleted. If too many, # it can be a problem. self.json_query = json_query self.querystring = {"indexType": self.index_type} url_filter = "v2/queries/filter" result = self.carol.call_api( url_filter, data=self.json_query, params=self.querystring, method="DELETE", method_whitelist=frozenset(["HEAD", "TRACE", "GET", "PUT", "OPTIONS"]), ) print("Deleted: ", result)
class ParQuery: def __init__(self, carol, backend="dask", return_df=True, verbose=50, n_jobs=4): """ :param carol: :param backend: :param return_df: :param verbose: :param n_jobs: """ import pandas as pd self._stag_mdm_key_range = None self._multiplier = None self.carol = carol self.return_df = return_df if return_df: import pandas as pd self.backend = backend self.verbose = verbose self.n_jobs = n_jobs assert self.backend == "dask" or self.backend == "joblib" def _get_min_max( self, datamodel_name, mdm_key, index_type, custom_filter=None, multiplier=None ): if custom_filter is not None: j = custom_filter else: j = ( Filter.Builder() .type(datamodel_name) .aggregation_list( [ MINIMUM(name="MINIMUM", params=mdm_key), MAXIMUM(name="MAXIMUM", params=mdm_key), ] ) .build() .to_json() ) query = ( Query( self.carol, index_type=index_type, only_hits=False, get_aggs=True, save_results=False, print_status=True, page_size=1, ) .query(j) .go() ) if query.results[0].get("aggs") is None: return None, None, None sample = query.results[0].get("hits")[0] min_v = query.results[0]["aggs"]["MINIMUM"]["value"] max_v = query.results[0]["aggs"]["MAXIMUM"]["value"] print(f"Total Hits to download: {query.total_hits}") if multiplier is not None: min_v = int(min_v * multiplier) - 10 max_v = int(max_v * multiplier) + 10 return min_v, max_v, sample def _get_staging_from_golden_rejected( self, datamodel_name, connector_id, staging_name, fields ): index_type = "STAGING" self.datamodel_name = f"{datamodel_name}Rejected" self.fields = "mdmStagingRecord" self.filter_stag = f"{connector_id}_{staging_name}" only_hits = False mdm_key = "mdmStagingRecord.mdmCounterForEntity" j = ( Filter.Builder() .type(self.datamodel_name) .must(TERM_FILTER(key="mdmStagingEntityName.raw", value=self.filter_stag)) .aggregation_list( [ MINIMUM(name="MINIMUM", params=mdm_key), MAXIMUM(name="MAXIMUM", params=mdm_key), ] ) .build() .to_json() ) min_v, max_v, sample = self._get_min_max( datamodel_name=self.datamodel_name, mdm_key=mdm_key, index_type=index_type, custom_filter=j, ) if (min_v is None) and (max_v is None): return [] chunks = ranges(min_v, max_v, self.slices) # rejected print(f"Number of chunks for rejected: {len(chunks)}") self.fields_to_get = [ self.fields + "." + i for i in sample.get(self.fields).keys() for j in fields if j + "_" in i ] self.custom_filter = ( Filter.Builder() .type(self.datamodel_name) .must(TERM_FILTER(key="mdmStagingEntityName.raw", value=self.filter_stag)) ) if self.backend == "dask": list_to_compute = _dask_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, ) elif self.backend == "joblib": list_to_compute = _joblib_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, n_jobs=self.n_jobs, verbose=self.verbose, ) else: raise KeyError return list_to_compute def _get_golden(self, datamodel_name=None, fields=None): index_type = "MASTER" self.datamodel_name = f"{datamodel_name}Golden" self.fields = "mdmGoldenFieldAndValues" self.fields_to_get = [ self.fields + "." + i if self.fields not in i else i for i in fields ] only_hits = True mdm_key = "mdmCounterForEntity" min_v, max_v, sample = self._get_min_max( datamodel_name=self.datamodel_name, mdm_key=mdm_key, index_type=index_type ) if (min_v is None) and (max_v is None): return [] chunks = ranges(min_v, max_v, self.slices) print(f"Number of chunks: {len(chunks)}") if self.backend == "dask": list_to_compute = _dask_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, ) elif self.backend == "joblib": list_to_compute = _joblib_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, n_jobs=self.n_jobs, verbose=self.verbose, ) if self.return_df: return pd.concat(list_to_compute, ignore_index=True, sort=True) list_to_compute = list(itertools.chain(*list_to_compute)) return list_to_compute def _get_staging( self, connector_id=None, connector_name=None, staging_name=None, fields=None ): if not connector_id: connector_id = Connectors(self.carol).get_by_name(connector_name)["mdmId"] index_type = "STAGING" self.datamodel_name = f"{connector_id}_{staging_name}" self.fields_to_get = fields self.fields = None only_hits = False mdm_key = "mdmCounterForEntity" if self._stag_mdm_key_range is not None: mdm_key = self._stag_mdm_key_range min_v, max_v, sample = self._get_min_max( datamodel_name=self.datamodel_name, mdm_key=mdm_key, index_type=index_type, multiplier=self._multiplier, ) if (min_v is None) and (max_v is None): return [] chunks = ranges(min_v, max_v, self.slices) print(f"Number of chunks: {len(chunks)}") if self.backend == "dask": list_to_compute = _dask_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, ) elif self.backend == "joblib": list_to_compute = _joblib_backend( carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name, page_size=self.page_size, index_type=index_type, fields=self.fields, only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df, fields_to_get=self.fields_to_get, custom_filter=self.custom_filter, n_jobs=self.n_jobs, verbose=self.verbose, ) else: raise KeyError if self.return_df: return pd.concat(list_to_compute, ignore_index=True, sort=True) list_to_compute = list(itertools.chain(*list_to_compute)) return list_to_compute def go( self, datamodel_name=None, slices=1000, page_size=1000, staging_name=None, connector_id=None, connector_name=None, fields=None, ): assert slices < 9999, "10k is the largest slice possible" self.slices = slices self.page_size = page_size if fields is None: fields = [] self.custom_filter = None if datamodel_name is None: assert connector_id or connector_name assert staging_name return self._get_staging( connector_id=connector_id, connector_name=connector_name, staging_name=staging_name, fields=fields, ) else: return self._get_golden(datamodel_name=datamodel_name, fields=fields) def _dask_backend( carol, chunks, datamodel_name, page_size, index_type, fields, only_hits, mdm_key, return_df, fields_to_get, custom_filter, ): list_to_compute = [] for RANGE_FILTER in chunks: y = dask.delayed(_par_query)( datamodel_name=datamodel_name, RANGE_FILTER=RANGE_FILTER, page_size=page_size, login=carol, index_type=index_type, fields=fields, only_hits=only_hits, mdm_key=mdm_key, return_df=return_df, fields_to_get=fields_to_get, custom_filter=custom_filter, ) list_to_compute.append(y) return dask.compute(*list_to_compute) def _joblib_backend( carol, chunks, datamodel_name, page_size, index_type, fields, only_hits, mdm_key, return_df, fields_to_get, custom_filter, n_jobs, verbose, ): from joblib import Parallel, delayed list_to_compute = Parallel(n_jobs=n_jobs, verbose=verbose)( delayed(_par_query)( datamodel_name=datamodel_name, RANGE_FILTER=RANGE_FILTER, page_size=page_size, login=carol, index_type=index_type, fields=fields, only_hits=only_hits, mdm_key=mdm_key, return_df=return_df, fields_to_get=fields_to_get, custom_filter=custom_filter, ) for RANGE_FILTER in chunks ) return list_to_compute def _par_query( datamodel_name, RANGE_FILTER, page_size=1000, login=None, index_type="MASTER", fields=None, mdm_key=None, only_hits=True, return_df=True, fields_to_get=None, custom_filter=None, ): if custom_filter is not None: json_query = copy.deepcopy(custom_filter) json_query = ( json_query.must(RF(key=mdm_key, value=RANGE_FILTER)).build().to_json() ) else: json_query = ( Filter.Builder() .type(datamodel_name) .must(RF(key=mdm_key, value=RANGE_FILTER)) .build() .to_json() ) query = ( Query( login, page_size=page_size, save_results=False, print_status=False, index_type=index_type, only_hits=only_hits, fields=fields_to_get, ) .query(json_query) .go() ) query = query.results if not only_hits: query = [i["hits"] for i in query] query = list(itertools.chain(*query)) if fields: query = [elem.get(fields, elem) for elem in query if elem.get(fields, None)] if return_df: return pd.DataFrame(query) return query def _write_results(filepath: str, results) -> None: with open(filepath, "a", encoding="utf8") as file: file.write(json.dumps(results, ensure_ascii=False)) file.write("\n")