"""
This submodule has all the classes to query data from RT layer in Carol.
"""
import json
import itertools
from datetime import datetime
from .connectors import Connectors
from .named_query import NamedQuery
from .filter import Filter, MAXIMUM, MINIMUM, TYPE_FILTER, TERM_FILTER
from .filter import RANGE_FILTER as RF
from .utils.miscellaneous import ranges
import copy
[docs]def delete_golden(carol, dm_name, now=None):
"""
Delete Golden records.
It Will delete all golden records of a given data model based on lastUpdate.
Args:
carol: `pycarol.carol.Carol`
Carol instance
dm_name: `str`
Data model name
now: `str`
Delete records where last update is less the `now`. Any date time ISO format is accepted.
Usage:
.. code:: python
from pycarol.query import delete_golden
from pycarol.auth.PwdAuth import PwdAuth
from pycarol.carol import Carol
login = Carol()
delete_golden(login, dm_name=my_dm)
#To delete based on a date.
delete_golden(login, dm_name=my_dm, now='2018-11-16')
Attention:
This API will delete all data in the DataModel, and if there is a DataModel View related to this DataModel
one needs to reprocess it.
"""
if now is None:
now = datetime.utcnow().isoformat(timespec='seconds')
json_query = Filter.Builder() \
.should(TYPE_FILTER(value=dm_name + "Golden")) \
.should(TYPE_FILTER(value=dm_name + "Master")) \
.must(RF("mdmLastUpdated", [None, now])) \
.build().to_json()
try:
Query(carol).delete(json_query)
except:
# if it it too many records, one would have a timeout but the records will be deleted anyway
pass
json_query = Filter.Builder() \
.type(dm_name + "Rejected") \
.must(RF("mdmLastUpdated", [None, now])) \
.build().to_json()
try:
Query(carol, index_type='STAGING').delete(json_query)
except:
# if it it too many records, one would have a timeout but the records will be deleted anyway
pass
[docs]class Query:
"""
Class to query data from Carol.
This class can be used to query data from data models and stagings tables, query using named queries and delete
records.
Args:
carol: carol: Carol object
Carol object.
max_hits: `int`, default float('inf')
Number of records that will be downloaded.
offset: `int`, default 0
Offset for pagination. Only used when `scrollable=False`
page_size: `int`, default 100
Number of records downloaded in each pagination. The maximum value is 1000
sort_order: `str`, default 'ASC'
Sort ascending ('ASC') vs. descending ('DESC').
sort_by: `str`, default `None`
Name to sort by.
scrollable: `bool`, default True
Use scroll for pagination. This should be the main way of doing, unless you are querying few data.
index_type: `str`, default 'MASTER'
Query data from 'MASTER', 'STAGING'
only_hits: `bool`, default 'True'
Return only results in the response path $hits.mdmGoldenFieldAndValues
fields: `list`, default `None`
Fields to return in response. e.g., ["mdmGoldenFieldAndValues.mdmtaxid", "mdmGoldenFieldAndValues.date"]
get_aggs: `bool`, default `False`
To be used if the query/named query has aggravations
save_results: `bool`, default `False`
If save the result of the query in the file specified in `filename`
filename: `str`, default `query_result.json`
File path to save the response.
print_status: `bool`, default `True`
Print the number of records in each interaction.
safe_check: `bool`, default `False`
To be used if there are repeated records (same mdmId)
get_errors: `bool`, default `False`
To get the errors in the goldenRecords, if any.
flush_result: `bool`, default `False`
To be used with save_results, it will not copy the result to memory, only to the file.
use_stream: `bool`, default `False`
Use the stram of data.
get_times: `bool`, default `False`
It will create a list of times that each pagination took.
kwargs: `dict`
Extra parameters to be passed to Carol.call_api
"""
def __init__(self, carol, max_hits=float('inf'), offset=0, page_size=100, sort_order='ASC', sort_by=None,
scrollable=True, index_type='MASTER', only_hits=True, fields=None, get_aggs=False,
save_results=False, filename='query_result.json', print_status=True, safe_check=False,
get_errors=False, flush_result=False, use_stream=False, get_times=False, **kwargs):
self.carol = carol
self.max_hits = max_hits
self.offset = offset
self.page_size = page_size
self.sort_order = sort_order
self.sort_by = sort_by
self.scrollable = scrollable
self.index_type = index_type
self.only_hits = only_hits
self.fields = fields
self.get_aggs = get_aggs
self.use_stream = use_stream
self.save_results = save_results
self.filename = filename
self.print_status = print_status
self.safe_check = safe_check
self.get_errors = get_errors
self.flush_result = flush_result
self.get_times = get_times
self.named_query = None
self.callback = None
# Crated to send to the Rest API
self.query_params = None
self.drop_list = None
self.json_query = None
self.total_hits = None
self.query_times = []
self.kwargs = kwargs
self.results = []
if self.max_hits == float('inf'):
self.get_all = True
else:
self.get_all = False
def _build_query_params(self):
self.query_params = {"offset": self.offset, "pageSize": self.page_size, "sortOrder": self.sort_order,
"indexType": self.index_type}
if self.sort_by is not None:
self.query_params["sortBy"] = self.sort_by
if self.scrollable:
self.query_params["scrollable"] = self.scrollable
if self.fields:
self.query_params["fields"] = self.fields
def _build_return_fields(self):
if isinstance(self.fields, str):
self.fields = [self.fields]
if self.fields is not None:
self.fields = ','.join(self.fields)
[docs] def page(self, offset=0):
"""
Get only one page of the result using offset.
Args:
offset: `int`, default 0
Offset to get. To properly paginate manually, offset should be `offset + page_size`.
Returns:
Query json response
"""
self.offset = offset
self.scrollable = False
self.results = []
if self.json_query is None:
raise ValueError("You must call query() or named() before calling page()")
self._build_return_fields()
self._build_query_params()
if self.named_query is None:
url_filter = "v2/queries/filter"
else:
url_filter = "v2/queries/named/{}".format(self.named_query)
result = self.carol.call_api(url_filter, data=self.json_query, params=self.query_params, timeout=240,
method_whitelist=frozenset(['POST']), **self.kwargs)
if self.only_hits:
result = result['hits']
return [elem.get('mdmGoldenFieldAndValues', elem)
for elem in result if elem.get('mdmGoldenFieldAndValues', None)]
else:
return result
[docs] def go(self, callback=None):
"""
Args:
callback: `callable` object
This function will receive the current batch of records from the filter made.
Returns: `None`
"""
self.results = []
if self.json_query is None:
raise ValueError("You must call all() or query() or named() before calling go()")
self._build_return_fields()
self._build_query_params()
self._scrollable_query_handler(callback)
return self
def _scrollable_query_handler(self, callback=None):
if not self.offset == 0:
raise ValueError('It is not possible to use offset when using scroll for pagination')
set_param = True
count = self.offset
if self.save_results:
file = open(self.filename, 'w', encoding='utf8')
if self.named_query is None:
url_filter = "v2/queries/filter"
else:
url_filter = "v2/queries/named/{}".format(self.named_query)
to_get = float("inf")
downloaded = 0
while count < to_get:
result = self.carol.call_api(url_filter, data=self.json_query, params=self.query_params, timeout=240,
method_whitelist=frozenset(['POST']), **self.kwargs)
if set_param:
self.total_hits = result["totalHits"]
if self.get_all:
to_get = result["totalHits"]
elif self.max_hits <= result["totalHits"]:
to_get = self.max_hits
else:
to_get = result["totalHits"]
set_param = False
if self.safe_check:
self.mdmId_list = []
if self.get_errors:
self.query_errors = {}
count += result['count']
downloaded += result['count']
scroll_id = result.get('scrollId', None)
if (scroll_id is not None) or (self.get_aggs):
url_filter = "v2/queries/filter/{}".format(scroll_id)
elif (result['count'] == 0):
if count < self.total_hits:
print(f'Total records downloaded: {count}/{self.total_hits}')
print(f'Something is wrong, no scrollId to continue \n')
break
else:
raise Exception('No Scroll Id to use. Something is wrong')
if self.get_times:
self.query_times.append(result.pop('took'))
if self.only_hits:
result = result['hits']
if self.safe_check:
self.mdmId_list.extend([mdm_id['mdmId'] for mdm_id in result])
if len(self.mdmId_list) > len(set(self.mdmId_list)):
raise Exception('There are repeated records')
if self.get_errors:
self.query_errors.update(
{elem.get('mdmId', elem): elem.get('mdmErrors', elem) for elem in result if elem['mdmErrors']})
result = [elem.get('mdmGoldenFieldAndValues', elem) for elem in result if
elem.get('mdmGoldenFieldAndValues',
None)] # get mdmGoldenFieldAndValues if not empty and if it exists
if not self.flush_result:
self.results.extend(result)
else:
result.pop('count')
result.pop('totalHits')
# result.pop('scrollId')
if not self.flush_result:
self.results.append(result)
if self.get_aggs:
if self.save_results:
file.write(json.dumps(result, ensure_ascii=False))
file.write('\n')
file.flush()
break
if callback:
if callable(callback):
callback(result)
else:
raise Exception(
f'"{callback}" is a {type(callback)} and is not callable. This variable must be a function.')
if self.print_status:
print('{}/{}'.format(downloaded, to_get), end='\r')
if self.save_results:
file.write(json.dumps(result, ensure_ascii=False))
file.write('\n')
file.flush()
if self.save_results:
file.close()
[docs] def check_total_hits(self, json_query, index_type="MASTER"):
"""
Check the total hits for a given query
:param json_query: Json object with the query to use
:param index_type: Index type to query.
:return: number of records for this query
"""
querystring = {"indexType": index_type}
self.json_query = json_query
url_filter = "v2/queries/count"
result = self.carol.call_api(url_filter, data=self.json_query, method='POST', params=querystring,
method_whitelist=frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS', 'DELETE', 'POST']))
self.total_hits = result
return result
def named(self, named_query, params=None, json_query=None):
if json_query is not None and params is not None:
raise Exception("You can only specify either params or json_query, but not both!")
self.results = []
self.named_query = named_query
if json_query is not None:
self.json_query = json_query
else:
if params is None:
params = {}
self.json_query = params
return self
def named_query_params(self, named_query):
named = NamedQuery(self.carol)
named.by_name(named_query=named_query)
return named.param_dict
def query(self, json_query):
self.json_query = json_query
return self
def all(self, dm_name):
if not dm_name.endswith('Golden'):
dm_name = dm_name + 'Golden'
self.json_query = Filter.Builder().type(dm_name).build().to_json()
self.index_type = 'MASTER'
return self
def delete(self, json_query):
# TODO: we should check the number of records to be deleted. If too many,
# it can be a problem.
self.json_query = json_query
self.querystring = {"indexType": self.index_type}
url_filter = "v2/queries/filter"
result = self.carol.call_api(url_filter, data=self.json_query,
params=self.querystring, method='DELETE',
method_whitelist=frozenset(['HEAD', 'TRACE', 'GET',
'PUT', 'OPTIONS']))
print('Deleted: ', result)
class ParQuery:
def __init__(self, carol, backend='dask', return_df=True, verbose=50, n_jobs=4):
"""
:param carol:
:param backend:
:param return_df:
:param verbose:
:param n_jobs:
"""
import pandas as pd
self._stag_mdm_key_range = None
self._multiplier = None
self.carol = carol
self.return_df = return_df
if return_df:
import pandas as pd
self.backend = backend
self.verbose = verbose
self.n_jobs = n_jobs
assert self.backend == 'dask' or self.backend == 'joblib'
def _get_min_max(self, datamodel_name, mdm_key, index_type, custom_filter=None, multiplier=None ):
if custom_filter is not None:
j = custom_filter
else:
j = Filter.Builder() \
.type(datamodel_name) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key), MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
query = Query(self.carol, index_type=index_type, only_hits=False, get_aggs=True, save_results=False,
print_status=True, page_size=1, ).query(j).go()
if query.results[0].get('aggs') is None:
return None, None, None
sample = query.results[0].get('hits')[0]
min_v = query.results[0]['aggs']['MINIMUM']['value']
max_v = query.results[0]['aggs']['MAXIMUM']['value']
print(f"Total Hits to download: {query.total_hits}")
if multiplier is not None:
min_v = int(min_v*multiplier) - 10
max_v = int(max_v*multiplier) + 10
return min_v, max_v, sample
def _get_staging_from_golden_rejected(self,datamodel_name, connector_id, staging_name, fields):
index_type = 'STAGING'
self.datamodel_name = f"{datamodel_name}Rejected"
self.fields = 'mdmStagingRecord'
self.filter_stag = f"{connector_id}_{staging_name}"
only_hits = False
mdm_key = 'mdmStagingRecord.mdmCounterForEntity'
j = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag)) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key),
MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, custom_filter=j)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
# rejected
print(f"Number of chunks for rejected: {len(chunks)}")
self.fields_to_get = [self.fields + '.' + i for i in sample.get(self.fields).keys() for j in fields if
j + '_' in i]
self.custom_filter = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag))
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
return list_to_compute
def _get_staging_from_golden(self, datamodel_name=None, staging_name=None, fields=None,
connector_id=None, connector_name=None):
cc = Connectors(self.carol)
st = cc.get_dm_mappings(all_connectors=True)
if datamodel_name is None:
assert staging_name is not None, "staging_name should be set."
if (connector_id is None) and (connector_name is not None):
connector_id = Connectors(self.carol).get_by_name(connector_name)['mdmId']
entity = [i['mdmMasterEntityName'] for i in st
if (i.get('mdmConnectorId') == connector_id) and
(i.get('mdmStagingType') == staging_name)]
assert len(entity) == 1, (f'No data model mapped for {staging_name}')
datamodel_name = entity[0]
elif connector_id is None:
entity = [i for i in st if (i.get('mdmStagingType') == staging_name)]
if len(entity) > 1:
raise KeyError(f'There are more than one connector for staging {staging_name}')
elif len(entity) < 1:
raise KeyError(f'No data model mapped for {staging_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
datamodel_name = entity['mdmMasterEntityName']
elif connector_id:
entity = [i['mdmMasterEntityName'] for i in st
if (i.get('mdmConnectorId') == connector_id) and
(i.get('mdmStagingType') == staging_name)]
assert len(entity) == 1, (f'No data model mapped for {staging_name}')
datamodel_name = entity[0]
elif staging_name is None:
entity = [i for i in st
if (i.get('mdmMasterEntityName') == datamodel_name)]
if len(entity) > 1:
raise KeyError(
f'There are more than one staging mapped for the data model {datamodel_name}\n Use "staging_name" for disambiguation ')
elif len(entity) < 1:
raise KeyError(f'No mapping fo data model {datamodel_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
staging_name = entity['mdmStagingType']
else:
assert staging_name is not None, "staging_name should be set."
entity = [i for i in st if (i.get('mdmStagingType') == staging_name)]
if len(entity) > 1:
raise KeyError(
f'There are more than one connector with this staging name\n Use "connector_id" for disambiguation ')
elif len(entity) < 1:
raise KeyError(f'No mapping for data model {datamodel_name}')
entity = entity[0]
connector_id = entity['mdmConnectorId']
staging_name = staging_name
index_type = 'MASTER'
self.datamodel_name = f"{datamodel_name}Master"
self.fields = 'mdmStagingRecord'
self.filter_stag = f"{connector_id}_{staging_name}"
only_hits = False
mdm_key = 'mdmCounterForEntity'
j = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag)) \
.aggregation_list([MINIMUM(name='MINIMUM', params=mdm_key),
MAXIMUM(name='MAXIMUM', params=mdm_key)]) \
.build().to_json()
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, custom_filter=j)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
print("Getting staging from Golden, after will get from rejected too. ")
self.fields_to_get = [self.fields + '.' + i for i in sample.get(self.fields).keys() for j in fields if
j + '_' in i]
self.custom_filter = Filter.Builder() \
.type(self.datamodel_name) \
.must(TERM_FILTER(key='mdmStagingEntityName.raw',
value=self.filter_stag))
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
list_to_compute_rejected = self._get_staging_from_golden_rejected(datamodel_name=datamodel_name,
connector_id=connector_id,
staging_name=staging_name,
fields=fields)
print("Getting staging from rejected")
list_to_compute.extend(list_to_compute_rejected)
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def _get_golden(self, datamodel_name=None, fields=None):
index_type = 'MASTER'
self.datamodel_name = f"{datamodel_name}Golden"
self.fields = 'mdmGoldenFieldAndValues'
self.fields_to_get = [self.fields + '.' + i if self.fields not in i else i for i in fields]
only_hits = True
mdm_key = 'mdmCounterForEntity'
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type)
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def _get_staging(self, connector_id=None, connector_name=None, staging_name=None, fields=None):
if not connector_id:
connector_id = Connectors(self.carol).get_by_name(connector_name)['mdmId']
index_type = 'STAGING'
self.datamodel_name = f"{connector_id}_{staging_name}"
self.fields_to_get = fields
self.fields = None
only_hits = False
mdm_key = 'mdmCounterForEntity'
if self._stag_mdm_key_range is not None:
mdm_key = self._stag_mdm_key_range
min_v, max_v, sample = self._get_min_max(datamodel_name=self.datamodel_name, mdm_key=mdm_key,
index_type=index_type, multiplier=self._multiplier )
if (min_v is None) and (max_v is None):
return []
chunks = ranges(min_v, max_v, self.slices)
print(f"Number of chunks: {len(chunks)}")
if self.backend == 'dask':
list_to_compute = _dask_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter)
elif self.backend == 'joblib':
list_to_compute = _joblib_backend(carol=self.carol, chunks=chunks, datamodel_name=self.datamodel_name,
page_size=self.page_size, index_type=index_type, fields=self.fields,
only_hits=only_hits, mdm_key=mdm_key, return_df=self.return_df,
fields_to_get=self.fields_to_get, custom_filter=self.custom_filter,
n_jobs=self.n_jobs, verbose=self.verbose)
else:
raise KeyError
if self.return_df:
return pd.concat(list_to_compute, ignore_index=True, sort=True)
list_to_compute = list(itertools.chain(*list_to_compute))
return list_to_compute
def go(self, datamodel_name=None, slices=1000, page_size=1000, staging_name=None, connector_id=None,
connector_name=None,
get_staging_from_golden=False, fields=None):
assert slices < 9999, '10k is the largest slice possible'
self.slices = slices
self.page_size = page_size
if fields is None:
fields = []
self.custom_filter = None
if get_staging_from_golden:
list_to_compute = self._get_staging_from_golden(datamodel_name=datamodel_name, staging_name=staging_name,
connector_id=connector_id, connector_name=connector_name,
fields=fields)
return list_to_compute
if datamodel_name is None:
assert connector_id or connector_name
assert staging_name
return self._get_staging(connector_id=connector_id, connector_name=connector_name, staging_name=staging_name,
fields=fields)
else:
return self._get_golden(datamodel_name=datamodel_name, fields=fields)
def _dask_backend(carol, chunks, datamodel_name, page_size, index_type, fields,
only_hits, mdm_key, return_df, fields_to_get, custom_filter):
list_to_compute = []
for RANGE_FILTER in chunks:
y = dask.delayed(_par_query)(
datamodel_name=datamodel_name,
RANGE_FILTER=RANGE_FILTER,
page_size=page_size,
login=carol,
index_type=index_type,
fields=fields,
only_hits=only_hits,
mdm_key=mdm_key,
return_df=return_df,
fields_to_get=fields_to_get,
custom_filter=custom_filter,
)
list_to_compute.append(y)
return dask.compute(*list_to_compute)
def _joblib_backend(carol, chunks, datamodel_name, page_size, index_type, fields,
only_hits, mdm_key, return_df, fields_to_get, custom_filter, n_jobs, verbose, ):
from joblib import Parallel, delayed
list_to_compute = Parallel(n_jobs=n_jobs,
verbose=verbose)(delayed(_par_query)(
datamodel_name=datamodel_name,
RANGE_FILTER=RANGE_FILTER,
page_size=page_size,
login=carol,
index_type=index_type,
fields=fields,
only_hits=only_hits,
mdm_key=mdm_key,
return_df=return_df,
fields_to_get=fields_to_get,
custom_filter=custom_filter,
)
for RANGE_FILTER in chunks)
return list_to_compute
def _par_query(datamodel_name, RANGE_FILTER, page_size=1000, login=None, index_type='MASTER', fields=None, mdm_key=None,
only_hits=True, return_df=True, fields_to_get=None, custom_filter=None):
if custom_filter is not None:
json_query = copy.deepcopy(custom_filter)
json_query = json_query.must(RF(key=mdm_key, value=RANGE_FILTER)).build().to_json()
else:
json_query = Filter.Builder() \
.type(datamodel_name) \
.must(RF(key=mdm_key, value=RANGE_FILTER)) \
.build().to_json()
query = Query(login, page_size=page_size, save_results=False, print_status=False, index_type=index_type,
only_hits=only_hits,
fields=fields_to_get).query(json_query).go()
query = query.results
if not only_hits:
query = [i['hits'] for i in query]
query = list(itertools.chain(*query))
if fields:
query = [elem.get(fields, elem) for elem in query if
elem.get(fields, None)]
if return_df:
return pd.DataFrame(query)
return query