"""
This submodule has all the classes to query data from RT layer in Carol.
"""
import json
import itertools
from joblib import Parallel, delayed
import dask
import pandas as pd
from datetime import datetime
from .connectors import Connectors
from .named_query import NamedQuery
from .filter import Filter, MAXIMUM, MINIMUM, TYPE_FILTER, TERM_FILTER
from .filter import RANGE_FILTER as RF
from .utils.miscellaneous import ranges
import copy
[docs]def delete_golden(carol, dm_name, now=None):
"""
Delete Golden records.
It Will delete all golden records of a given data model based on lastUpdate.
Args:
carol: `pycarol.carol.Carol`
Carol instance
dm_name: `str`
Data model name
now: `str`
Delete records where last update is less the `now`. Any date time ISO format is accepted.
Usage:
.. code:: python
from pycarol.query import delete_golden
from pycarol.auth.PwdAuth import PwdAuth
from pycarol.carol import Carol
login = Carol()
delete_golden(login, dm_name=my_dm)
#To delete based on a date.
delete_golden(login, dm_name=my_dm, now='2018-11-16')
Attention:
This API will delete all data in the DataModel, and if there is a DataModel View related to this DataModel
one needs to reprocess it.
"""
if now is None:
now = datetime.utcnow().isoformat(timespec='seconds')
json_query = Filter.Builder() \
.should(TYPE_FILTER(value=dm_name + "Golden")) \
.should(TYPE_FILTER(value=dm_name + "Master")) \
.must(RF("mdmLastUpdated", [None, now])) \
.build().to_json()
try:
Query(carol).delete(json_query)
except:
# if it it too many records, one would have a timeout but the records will be deleted anyway
pass
json_query = Filter.Builder() \
.type(dm_name + "Rejected") \
.must(RF("mdmLastUpdated", [None, now])) \
.build().to_json()
try:
Query(carol, index_type='STAGING').delete(json_query)
except:
# if it it too many records, one would have a timeout but the records will be deleted anyway
pass
[docs]class Query:
"""
Class to query data from Carol.
This class can be used to query data from data models and stagings tables, query using named queries and delete
records.
Args:
carol: carol: Carol object
Carol object.
max_hits: `int`, default float('inf')
Number of records that will be downloaded.
offset: `int`, default 0
Offset for pagination. Only used when `scrollable=False`
page_size: `int`, default 100
Number of records downloaded in each pagination. The maximum value is 1000
sort_order: `str`, default 'ASC'
Sort ascending ('ASC') vs. descending ('DESC').
sort_by: `str`, default `None`
Name to sort by.
scrollable: `bool`, default True
Use scroll for pagination. This should be the main way of doing, unless you are querying few data.
index_type: `str`, default 'MASTER'
Query data from 'MASTER', 'STAGING'
only_hits: `bool`, default 'True'
Return only results in the response path $hits.mdmGoldenFieldAndValues
fields: `list`, default `None`
Fields to return in response. e.g., ["mdmGoldenFieldAndValues.mdmtaxid", "mdmGoldenFieldAndValues.date"]
get_aggs: `bool`, default `False`
To be used if the query/named query has aggravations
save_results: `bool`, default `False`
If save the result of the query in the file specified in `filename`
filename: `str`, default `query_result.json`
File path to save the response.
print_status: `bool`, default `True`
Print the number of records in each interaction.
safe_check: `bool`, default `False`
To be used if there are repeated records (same mdmId)
get_errors: `bool`, default `False`
To get the errors in the goldenRecords, if any.
flush_result: `bool`, default `False`
To be used with save_results, it will not copy the result to memory, only to the file.
use_stream: `bool`, default `False`
Use the stram of data.
get_times: `bool`, default `False`
It will create a list of times that each pagination took.
"""
def __init__(self, carol, max_hits=float('inf'), offset=0, page_size=100, sort_order='ASC', sort_by=None,
scrollable=True, index_type='MASTER', only_hits=True, fields=None, get_aggs=False,
save_results=False, filename='query_result.json', print_status=True, safe_check=False,
get_errors=False, flush_result=False, use_stream=False, get_times=False):
self.carol = carol
self.max_hits = max_hits
self.offset = offset
self.page_size = page_size
self.sort_order = sort_order
self.sort_by = sort_by
self.scrollable = scrollable
self.index_type = index_type
self.only_hits = only_hits
self.fields = fields
self.get_aggs = get_aggs
self.use_stream = use_stream
self.save_results = save_results
self.filename = filename
self.print_status = print_status
self.safe_check = safe_check
self.get_errors = get_errors
self.flush_result = flush_result
self.get_times = get_times
self.named_query = None
self.callback = None
# Crated to send to the Rest API
self.query_params = None
self.drop_list = None
self.json_query = None
self.total_hits = None
self.query_times = []
self.results = []
if self.max_hits == float('inf'):
self.get_all = True
else:
self.get_all = False
def _build_query_params(self):
self.query_params = {"offset": self.offset, "pageSize": self.page_size, "sortOrder": self.sort_order,
"indexType": self.index_type}
if self.sort_by is not None:
self.query_params["sortBy"] = self.sort_by
if self.scrollable:
self.query_params["scrollable"] = self.scrollable
if self.fields:
self.query_params["fields"] = self.fields
def _build_return_fields(self):
if isinstance(self.fields, str):
self.fields = [self.fields]
if self.fields is not None:
self.fields = ','.join(self.fields)
[docs] def go(self, callback=None):
"""
Args:
callback: `callable` object
This function will receive the current batch of records from the filter made.
Returns: `None`
"""
self.results = []
if self.json_query is None:
raise ValueError("You must call all() or filter() or named() before calling go()")
self._build_return_fields()
self._build_query_params()
self._scrollable_query_handler(callback)
return self
def _scrollable_query_handler(self, callback=None):
if not self.offset == 0:
raise ValueError('It is not possible to use offset when using scroll for pagination')
set_param = True
count = self.offset
if self.save_results:
file = open(self.filename, 'w', encoding='utf8')
if self.named_query is None:
url_filter = "v2/queries/filter"
else:
url_filter = "v2/queries/named/{}".format(self.named_query)
to_get = float("inf")
downloaded = 0
while count < to_get:
result = self.carol.call_api(url_filter, data=self.json_query, params=self.query_params, timeout=240,
method_whitelist=frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS', 'DELETE', 'POST']))
if set_param:
self.total_hits = result["totalHits"]
if self.get_all:
to_get = result["totalHits"]
elif self.max_hits <= result["totalHits"]:
to_get = self.max_hits
else:
to_get = result["totalHits"]
set_param = False
if self.safe_check:
self.mdmId_list = []
if self.get_errors:
self.query_errors = {}
count += result['count']
downloaded += result['count']
scroll_id = result.get('scrollId', None)
if (scroll_id is not None) or (self.get_aggs):
url_filter = "v2/queries/filter/{}".format(scroll_id)
elif (result['count'] == 0):
if count < self.total_hits:
print(f'Total records downloaded: {count}/{self.total_hits}')
print(f'Something is wrong, no scrollId to continue \n')
break
else:
raise Exception('No Scroll Id to use. Something is wrong')
if self.get_times:
self.query_times.append(result.pop('took'))
if self.only_hits:
result = result['hits']
if self.safe_check:
self.mdmId_list.extend([mdm_id['mdmId'] for mdm_id in result])
if len(self.mdmId_list) > len(set(self.mdmId_list)):
raise Exception('There are repeated records')
if self.get_errors:
self.query_errors.update(
{elem.get('mdmId', elem): elem.get('mdmErrors', elem) for elem in result if elem['mdmErrors']})
result = [elem.get('mdmGoldenFieldAndValues', elem) for elem in result if
elem.get('mdmGoldenFieldAndValues',
None)] # get mdmGoldenFieldAndValues if not empty and if it exists
if not self.flush_result:
self.results.extend(result)
else:
result.pop('count')
result.pop('totalHits')
# result.pop('scrollId')
if not self.flush_result:
self.results.append(result)
if self.get_aggs:
if self.save_results:
file.write(json.dumps(result, ensure_ascii=False))
file.write('\n')
file.flush()
break
if callback:
if callable(callback):
callback(result)
else:
raise Exception(
f'"{callback}" is a {type(callback)} and is not callable. This variable must be a function.')
if self.print_status:
print('{}/{}'.format(downloaded, to_get), end='\r')
if self.save_results:
file.write(json.dumps(result, ensure_ascii=False))
file.write('\n')
file.flush()
if self.save_results:
file.close()
[docs] def check_total_hits(self, json_query, index_type="MASTER"):
"""
Check the total hits for a given query
:param json_query: Json object with the query to use
:param index_type: Index type to query.
:return: number of records for this query
"""
querystring = {"indexType": index_type}
self.json_query = json_query
url_filter = "v2/queries/count"
result = self.carol.call_api(url_filter, data=self.json_query, method='POST', params=querystring,
method_whitelist=frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS', 'DELETE', 'POST']))
self.total_hits = result
return result
def named(self, named_query, params=None, json_query=None):
if json_query is not None and params is not None:
raise Exception("You can only specify either params or json_query, but not both!")
self.results = []
self.named_query = named_query
if json_query is not None:
self.json_query = json_query
else:
if params is None:
params = {}
self.json_query = params
return self
def named_query_params(self, named_query):
named = NamedQuery(self.carol)
named.by_name(named_query=named_query)
return named.param_dict
def query(self, json_query):
self.json_query = json_query
return self
def all(self, dm_name):
if not dm_name.endswith('Golden'):
dm_name = dm_name + 'Golden'
self.json_query = Filter.Builder().type(dm_name).build().to_json()
self.index_type = 'MASTER'
return self
def delete(self, json_query):
# TODO: we should check the number of records to be deleted. If too many,
# it can be a problem.
self.json_query = json_query
self.querystring = {"indexType": self.index_type}
url_filter = "v2/queries/filter"
result = self.carol.call_api(url_filter, data=self.json_query,
params=self.querystring, method='DELETE',
method_whitelist=frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS']))
print('Deleted: ', result)
class ParQuery:
def __init__(self, carol, backend='dask', return_df=True, verbose=50, n_jobs=4):
"""
:param carol:
:param backend:
:param return_df:
:param verbose:
:param n_jobs:
"""
self._stag_mdm_key_range = None
self._multiplier = None
self.carol = carol
self.return_df = return_df
self.backend = backend
self.verbose = verbose
self.n_jobs = n_jobs
assert self.backend == 'dask' or self.backend == 'joblib'
def _get_min_max(self, datamodel_name, mdm_key, index_type, custom_filter=None, multiplier=None ):
if custom_filter is not None:
j = custom_filter
else:
j = Filter.Builder() \
.type(datamodel_name) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key), MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
query = Query(self.carol, index_type=index_type, only_hits=False, get_aggs=True, save_results=False,
print_status=True, page_size=1, ).query(j).go()
if query.results[0].get('aggs') is None:
return None, None, None
sample = query.results[0].get('hits')[0]
min_v = query.results[0]['aggs']['MINIMUM']['value']
max_v = query.results[0]['aggs']['MAXIMUM']['value']
print(f"Total Hits to download: {query.total_hits}")
if multiplier is not None:
min_v = int(min_v*multiplier) - 10
max_v = int(max_v*multiplier) + 10
return min_v, max_v, sample
def _get_staging_from_golden_rejected(self,datamodel_name, connector_id, staging_name, fields):
index_type = 'STAGING'
self.datamodel_name = f"{datamodel_name}Rejected"
self.fields = 'mdmStagingRecord'
self.filter_stag = f"{connector_id}_{staging_name}"
only_hits = False
mdm_key = 'mdmStagingRecord.mdmCounterForEntity'
j = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag)) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key),
MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, custom_filter=j)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
# rejected
print(f"Number of chunks for rejected: {len(chunks)}")
self.fields_to_get = [self.fields + '.' + i for i in sample.get(self.fields).keys() for j in fields if
j + '_' in i]
self.custom_filter = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag))
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
return list_to_compute
def _get_staging_from_golden(self, datamodel_name=None, staging_name=None, fields=None,
connector_id=None, connector_name=None):
cc = Connectors(self.carol)
st = cc.get_dm_mappings(all_connectors=True)
if datamodel_name is None:
assert staging_name is not None, "staging_name should be set."
if (connector_id is None) and (connector_name is not None):
connector_id = Connectors(self.carol).get_by_name(connector_name)['mdmId']
entity = [i['mdmMasterEntityName'] for i in st
if (i.get('mdmConnectorId') == connector_id) and
(i.get('mdmStagingType') == staging_name)]
assert len(entity) == 1, (f'No data model mapped for {staging_name}')
datamodel_name = entity[0]
elif connector_id is None:
entity = [i for i in st if (i.get('mdmStagingType') == staging_name)]
if len(entity) > 1:
raise KeyError(f'There are more than one connector for staging {staging_name}')
elif len(entity) < 1:
raise KeyError(f'No data model mapped for {staging_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
datamodel_name = entity['mdmMasterEntityName']
elif connector_id:
entity = [i['mdmMasterEntityName'] for i in st
if (i.get('mdmConnectorId') == connector_id) and
(i.get('mdmStagingType') == staging_name)]
assert len(entity) == 1, (f'No data model mapped for {staging_name}')
datamodel_name = entity[0]
elif staging_name is None:
entity = [i for i in st
if (i.get('mdmMasterEntityName') == datamodel_name)]
if len(entity) > 1:
raise KeyError(
f'There are more than one staging mapped for the data model {datamodel_name}\n Use "staging_name" for disambiguation ')
elif len(entity) < 1:
raise KeyError(f'No mapping fo data model {datamodel_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
staging_name = entity['mdmStagingType']
else:
assert staging_name is not None, "staging_name should be set."
entity = [i for i in st if (i.get('mdmStagingType') == staging_name)]
if len(entity) > 1:
raise KeyError(
f'There are more than one connector with this staging name\n Use "connector_id" for disambiguation ')
elif len(entity) < 1:
raise KeyError(f'No mapping for data model {datamodel_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
staging_name = staging_name
index_type = 'MASTER'
self.datamodel_name = f"{datamodel_name}Master"
self.fields = 'mdmStagingRecord'
self.filter_stag = f"{connector_id}_{staging_name}"
only_hits = False
mdm_key = 'mdmCounterForEntity'
j = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag)) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key),
MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, custom_filter=j)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
print("Getting staging from Golden, after will get from rejected too. ")
self.fields_to_get = [self.fields + '.' + i for i in sample.get(self.fields).keys() for j in fields if
j + '_' in i]
self.custom_filter = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag))
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
list_to_compute_rejected = self._get_staging_from_golden_rejected(datamodel_name=datamodel_name,
connector_id=connector_id,
staging_name=staging_name,
fields=fields)
print("Getting staging from rejected")
list_to_compute.extend(list_to_compute_rejected)
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def _get_golden(self, datamodel_name=None, fields=None):
index_type = 'MASTER'
self.datamodel_name = f"{datamodel_name}Golden"
self.fields = 'mdmGoldenFieldAndValues'
self.fields_to_get = [self.fields + '.' + i if self.fields not in i else i for i in fields]
only_hits = True
mdm_key = 'mdmCounterForEntity'
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def _get_staging(self, connector_id=None, connector_name=None, staging_name=None, fields=None):
if not connector_id:
connector_id = Connectors(self.carol).get_by_name(connector_name)['mdmId']
index_type = 'STAGING'
self.datamodel_name = f"{connector_id}_{staging_name}"
self.fields_to_get = fields
self.fields = None
only_hits = False
mdm_key = 'mdmCounterForEntity'
if self._stag_mdm_key_range is not None:
mdm_key = self._stag_mdm_key_range
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, multiplier=self._multiplier )
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def go(self, datamodel_name=None, slices=1000, page_size=1000, staging_name=None, connector_id=None,
connector_name=None,
get_staging_from_golden=False, fields=None):
assert slices < 9999, '10k is the largest slice possible'
self.slices = slices
self.page_size = page_size
if fields is None:
fields = []
self.custom_filter = None
if get_staging_from_golden:
list_to_compute = self._get_staging_from_golden(datamodel_name=datamodel_name, staging_name=staging_name,
connector_id=connector_id, connector_name=connector_name,
fields=fields)
return list_to_compute
if datamodel_name is None:
assert connector_id or connector_name
assert staging_name
return self._get_staging(connector_id=connector_id, connector_name=connector_name, staging_name=staging_name,
fields=fields)
else:
return self._get_golden(datamodel_name=datamodel_name, fields=fields)
def _dask_backend(carol, chunks, datamodel_name, page_size, index_type, fields,
only_hits, mdm_key, return_df, fields_to_get, custom_filter):
list_to_compute = []
for RANGE_FILTER in chunks:
y = dask.delayed(_par_query)(
datamodel_name=datamodel_name,
RANGE_FILTER=RANGE_FILTER,
page_size=page_size,
login=carol,
index_type=index_type,
fields=fields,
only_hits=only_hits,
mdm_key=mdm_key,
return_df=return_df,
fields_to_get=fields_to_get,
custom_filter=custom_filter,
)
list_to_compute.append(y)
return dask.compute(*list_to_compute)
def _joblib_backend(carol, chunks, datamodel_name, page_size, index_type, fields,
only_hits, mdm_key, return_df, fields_to_get, custom_filter, n_jobs, verbose, ):
list_to_compute = Parallel(n_jobs=n_jobs,
verbose=verbose)(delayed(_par_query)(
datamodel_name=datamodel_name,
RANGE_FILTER=RANGE_FILTER,
page_size=page_size,
login=carol,
index_type=index_type,
fields=fields,
only_hits=only_hits,
mdm_key=mdm_key,
return_df=return_df,
fields_to_get=fields_to_get,
custom_filter=custom_filter,
)
for RANGE_FILTER in chunks)
return list_to_compute
def _par_query(datamodel_name, RANGE_FILTER, page_size=1000, login=None, index_type='MASTER', fields=None, mdm_key=None,
only_hits=True, return_df=True, fields_to_get=None, custom_filter=None):
if custom_filter is not None:
json_query = copy.deepcopy(custom_filter)
json_query = json_query.must(RF(key=mdm_key, value=RANGE_FILTER)).build().to_json()
else:
json_query = Filter.Builder() \
.type(datamodel_name) \
.must(RF(key=mdm_key, value=RANGE_FILTER)) \
.build().to_json()
query = Query(login, page_size=page_size, save_results=False, print_status=False, index_type=index_type,
only_hits=only_hits,
fields=fields_to_get).query(json_query).go()
query = query.results
if not only_hits:
query = [i['hits'] for i in query]
query = list(itertools.chain(*query))
if fields:
query = [elem.get(fields, elem) for elem in query if
elem.get(fields, None)]
if return_df:
return pd.DataFrame(query)
return query