diff --git a/custom-recipes/pi-system-retrieve-list/recipe.json b/custom-recipes/pi-system-retrieve-list/recipe.json index 01cf6e4..a742514 100644 --- a/custom-recipes/pi-system-retrieve-list/recipe.json +++ b/custom-recipes/pi-system-retrieve-list/recipe.json @@ -101,6 +101,23 @@ "description": "", "defaultValue": false }, + { + "name": "use_batch_mode", + "label": "Use batch mode", + "type": "BOOLEAN", + "description": "", + "visibilityCondition": "model.show_advanced_parameters==true", + "defaultValue": false + }, + { + "name": "batch_size", + "label": "Batch size", + "type": "INT", + "description": "", + "visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true", + "minI": 1, + "defaultValue": 500 + }, { "type": "SEPARATOR", "description": "Source" @@ -200,7 +217,7 @@ { "name": "boundary_type", "label": "Boundary type", - "visibilityCondition": "((model.must_retrieve_metrics) && ['InterpolatedData'].includes(model.data_type))", + "visibilityCondition": "(['InterpolatedData'].includes(model.data_type))", "type": "SELECT", "selectChoices":[ {"value": "Inside", "label": "Inside"}, @@ -211,7 +228,7 @@ { "name": "record_boundary_type", "label": "Boundary type", - "visibilityCondition": "((model.must_retrieve_metrics) && ['RecordedData'].includes(model.data_type))", + "visibilityCondition": "(['RecordedData'].includes(model.data_type))", "type": "SELECT", "selectChoices":[ {"value": "Inside", "label": "Inside"}, diff --git a/custom-recipes/pi-system-retrieve-list/recipe.py b/custom-recipes/pi-system-retrieve-list/recipe.py index 1508ddf..88b4fbb 100644 --- a/custom-recipes/pi-system-retrieve-list/recipe.py +++ b/custom-recipes/pi-system-retrieve-list/recipe.py @@ -7,7 +7,7 @@ get_credentials, get_interpolated_parameters, normalize_af_path, get_combined_description, get_base_for_data_type, check_debug_mode, PerformanceTimer, get_max_count, check_must_convert_object_to_string, - convert_schema_objects_to_string, get_summary_parameters + convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters ) from osisoft_client import OSIsoftClient from osisoft_constants import OSIsoftConstants @@ -58,6 +58,7 @@ def get_step_value(item): use_end_time_column = config.get("use_end_time_column", False) end_time_column = config.get("end_time_column") server_url_column = config.get("server_url_column") +use_batch_mode, batch_size = get_advanced_parameters(config) interval, sync_time, boundary_type = get_interpolated_parameters(config) record_boundary_type = config.get("record_boundary_type") if data_type == "RecordedData" else None summary_type, summary_duration = get_summary_parameters(config) @@ -81,12 +82,17 @@ def get_step_value(item): with output_dataset.get_writer() as writer: first_dataframe = True + absolute_index = 0 + batch_buffer_size = 0 + buffer = [] for index, input_parameters_row in input_parameters_dataframe.iterrows(): + absolute_index += 1 server_url = input_parameters_row.get(server_url_column, server_url) if use_server_url_column else server_url start_time = input_parameters_row.get(start_time_column, start_time) if use_start_time_column else start_time end_time = input_parameters_row.get(end_time_column, end_time) if use_end_time_column else end_time row_name = input_parameters_row.get("Name") duplicate_initial_row = {} + nb_rows_to_process = input_parameters_dataframe.shape[0] for input_column in input_columns: duplicate_initial_row[input_column] = input_parameters_row.get(input_column) @@ -127,6 +133,29 @@ def get_step_value(item): summary_type=summary_type, summary_duration=summary_duration ) + elif use_batch_mode: + buffer.append({"WebId": object_id}) + batch_buffer_size += 1 + if (batch_buffer_size >= batch_size) or (absolute_index == nb_rows_to_process): + rows = client.get_rows_from_webids( + buffer, data_type, max_count=max_count, + start_date=start_time, + end_date=end_time, + interval=interval, + sync_time=sync_time, + boundary_type=boundary_type, + record_boundary_type=record_boundary_type, + can_raise=False, + batch_size=batch_size, + object_id=object_id, + summary_type=summary_type, + summary_duration=summary_duration, + endpoint_type="AF" + ) + batch_buffer_size = 0 + buffer = [] + else: + continue else: rows = client.recursive_get_rows_from_webid( object_id, diff --git a/custom-recipes/pi-system-write/recipe.json b/custom-recipes/pi-system-write/recipe.json new file mode 100644 index 0000000..e4fa265 --- /dev/null +++ b/custom-recipes/pi-system-write/recipe.json @@ -0,0 +1,148 @@ +{ + "meta": { + "label": "Write to tag", + "description": "Write values to tags", + "icon": "icon-pi-system icon-pencil" + }, + "kind": "PYTHON", + "selectableFromDataset": "input_dataset", + "inputRoles": [ + { + "name": "input_dataset", + "label": "Dataset containing paths or tags", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + + "outputRoles": [ + { + "name": "api_output", + "label": "Main output displayed name", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + "params": [ + { + "type": "SEPARATOR", + "label": "Authentication" + }, + { + "name": "credentials", + "label": "User preset", + "type": "PRESET", + "parameterSetId": "basic-auth" + }, + { + "name": "show_advanced_parameters", + "label": "Show advanced parameters", + "type": "BOOLEAN", + "definition": "", + "defaultValue": false + }, + { + "name": "use_server_url_column", + "label": "Use server value per row", + "visibilityCondition": "model.show_advanced_parameters==true && false", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "visibilityCondition": "(model.use_server_url_column==true) && (model.show_advanced_parameters==true)", + "name": "server_url_column", + "label": "Server domain columnn", + "description": "Should match the required path for each row", + "type": "COLUMN", + "columnRole": "input_dataset" + }, + { + "visibilityCondition": "(model.use_server_url_column==false) && (model.show_advanced_parameters==true)", + "name": "server_url", + "label": "Server URL", + "type": "STRING", + "definition": "https://my_server:8082", + "defaultValue": "" + }, + { + "name": "is_ssl_check_disabled", + "label": "Disable SSL check", + "visibilityCondition": "model.show_advanced_parameters==true", + "type": "BOOLEAN", + "definition": "", + "defaultValue": false + }, + { + "name": "ssl_cert_path", + "label": "Path to SSL certificate", + "type": "STRING", + "description": "(optional)", + "visibilityCondition": "model.show_advanced_parameters==true && model.is_ssl_check_disabled==false", + "mandatory": false + }, + { + "name": "is_debug_mode", + "label": "Verbose logging", + "visibilityCondition": "model.show_advanced_parameters==true", + "type": "BOOLEAN", + "description": "", + "defaultValue": false + }, + { + "name": "max_streak_buffer_size", + "label": "Size of streak buffer", + "visibilityCondition": "model.show_advanced_parameters==true", + "type": "INT", + "minI": 1, + "defaultValue": 500, + "description": "" + }, + { + "name": "max_requests_buffer_size", + "label": "Size of batch buffer", + "visibilityCondition": "model.show_advanced_parameters==true", + "type": "INT", + "minI": 1, + "defaultValue": 500, + "description": "" + }, + { + "type": "SEPARATOR", + "description": "Source" + }, + { + "name": "path_column", + "label": "Path column", + "description": "", + "type": "COLUMN", + "columnRole": "input_dataset" + }, + { + "name": "webid_column", + "label": "WebID column", + "description": "(faster than using path)", + "type": "COLUMN", + "columnRole": "input_dataset" + }, + { + "name": "time_column", + "label": "Time column", + "description": "", + "type": "COLUMN", + "columnRole": "input_dataset" + }, + { + "name": "value_column", + "label": "Value column", + "description": "", + "type": "COLUMN", + "columnRole": "input_dataset" + } + ], + "resourceKeys": [] +} diff --git a/custom-recipes/pi-system-write/recipe.py b/custom-recipes/pi-system-write/recipe.py new file mode 100644 index 0000000..4a88f3c --- /dev/null +++ b/custom-recipes/pi-system-write/recipe.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +import dataiku +from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role +from safe_logger import SafeLogger +from osisoft_plugin_common import ( + get_credentials, normalize_af_path, check_debug_mode, + PerformanceTimer, get_max_count, +) +from osisoft_client import OSIsoftClient, OSIsoftBatchWriter +from osisoft_constants import OSIsoftConstants +import pandas + +DEFAULT_BATCH_SIZE = 500 + +logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"]) + +logger.info("Write to tag recipe v{}".format( + OSIsoftConstants.PLUGIN_VERSION +)) + +input_dataset = get_input_names_for_role('input_dataset') +output_names_stats = get_output_names_for_role('api_output') +config = get_recipe_config() +dku_flow_variables = dataiku.get_flow_variables() + +logger.info("Initialization with config config={}".format(logger.filter_secrets(config))) + +auth_type, username, password, server_url, is_ssl_check_disabled = get_credentials(config) +is_debug_mode = check_debug_mode(config) +max_count = get_max_count(config) +summary_type = config.get("summary_type") + +use_server_url_column = config.get("use_server_url_column", False) +if not server_url and not use_server_url_column: + raise ValueError("Server domain not set") + +path_column = config.get("path_column", "") +webid_column = config.get("webid_column", "") +if not path_column and not webid_column: + raise ValueError("There is no path nor webid column selected.") + +time_column = config.get("time_column", "") +if not time_column: + raise ValueError("There is no time column selected.") + +value_column = config.get("value_column", "") +if not value_column: + raise ValueError("There is no value column selected.") + +max_streak_buffer_size = config.get("max_streak_buffer_size", DEFAULT_BATCH_SIZE) +max_requests_buffer_size = config.get("max_requests_buffer_size", DEFAULT_BATCH_SIZE) + +server_url_column = config.get("server_url_column") + +network_timer = PerformanceTimer() +processing_timer = PerformanceTimer() +processing_timer.start() + +input_parameters_dataset = dataiku.Dataset(input_dataset[0]) +output_dataset = dataiku.Dataset(output_names_stats[0]) +input_parameters_dataframe = input_parameters_dataset.get_dataframe() + +results = [] +time_last_request = None +client = None +pi_writer = None +previous_server_url = "" +time_not_parsed = True + +input_columns = list(input_parameters_dataframe.columns) + +output_schema = [] +output_schema.append({'name': 'Path', 'type': 'string'}) +output_schema.append({'name': 'Timestamp', 'type': 'string'}) +output_schema.append({'name': 'Value', 'type': 'string'}) +output_schema.append({'name': 'Result', 'type': 'string'}) +output_schema.append({'name': 'Error', 'type': 'string'}) +output_dataset.write_schema(output_schema) + +with output_dataset.get_writer() as output_writer: + first_dataframe = True + pi_writer = None + initial_requests = [] # Storing initial request to display in output dataset + instant_responses = [] + previous_path = None + previous_object_id = None + for index, input_parameters_row in input_parameters_dataframe.iterrows(): + server_url = input_parameters_row.get(server_url_column, server_url) if use_server_url_column else server_url + time = input_parameters_row.get(time_column) + if isinstance(time, pandas.Timestamp): + time = time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + value = input_parameters_row.get(value_column) + + row_name = input_parameters_row.get("Name") + duplicate_initial_row = {} + for input_column in input_columns: + duplicate_initial_row[input_column] = input_parameters_row.get(input_column) + + if client is None or previous_server_url != server_url: + client = OSIsoftClient( + server_url, auth_type, username, password, + is_ssl_check_disabled=is_ssl_check_disabled, + is_debug_mode=is_debug_mode, network_timer=network_timer + ) + previous_server_url = server_url + if time_not_parsed: + # make sure all OSIsoft time string format are evaluated at the same time + # rather than at every request, at least for start / end times set in the UI + time_not_parsed = False + time = client.parse_pi_time(time) + if not pi_writer: + pi_writer = OSIsoftBatchWriter( + client, + max_requests_buffer_size=max_requests_buffer_size, + max_streak_buffer_size=max_streak_buffer_size + ) + + if webid_column: + object_id = input_parameters_row.get(webid_column) + else: + path = input_parameters_row.get(path_column) + path = normalize_af_path(path) + if previous_path != path: + object_id = client.get_item_from_path(path).get("WebId", "WebId could not be found") + previous_path = path + previous_object_id = object_id + else: + object_id = previous_object_id + row = (time, value) + instant_response = pi_writer.write_row(object_id, time, value) # usually none, could contain error + instant_responses.append(instant_response) + initial_requests.append((object_id, time, value)) + responses = pi_writer.close() + for initial_request, response, instant_response in zip(initial_requests, responses, instant_responses): + row = {} + row["Path"] = initial_request[0] + row["Timestamp"] = initial_request[1] + row["Value"] = initial_request[2] + row["Result"] = response.get("Status") + content = response.get("Content") + if content and isinstance(content, dict) and "Errors" in content: + row["Error"] = content.get("Errors") + if instant_response: + row["Error"] = instant_response.get("Error") + output_writer.write_row_dict(row) + +processing_timer.stop() +logger.info("Overall timer:{}".format(processing_timer.get_report())) +logger.info("Network timer:{}".format(network_timer.get_report())) diff --git a/python-lib/osisoft_client.py b/python-lib/osisoft_client.py index cae86c8..3536026 100644 --- a/python-lib/osisoft_client.py +++ b/python-lib/osisoft_client.py @@ -241,12 +241,8 @@ def get_rows_from_webid(self, webid, data_type, **kwargs): yield item def get_rows_from_webids(self, input_rows, data_type, **kwargs): - search_full_hierarchy = kwargs.get("search_full_hierarchy") - max_count = kwargs.get("max_count") endpoint_type = kwargs.get("endpoint_type", "event_frames") batch_size = kwargs.get("batch_size", 500) - summary_type = kwargs.get("summary_type") - summary_duration = kwargs.get("summary_duration") batch_requests_parameters = [] number_processed_webids = 0 @@ -263,10 +259,7 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs): else: webid = input_row url = self.endpoint.get_data_from_webid_url(endpoint_type, data_type, webid) - requests_kwargs = self.generic_get_kwargs( - search_full_hierarchy=search_full_hierarchy, max_count=max_count, - summary_type=summary_type, summary_duration=summary_duration - ) + requests_kwargs = self.generic_get_kwargs(**kwargs) requests_kwargs['url'] = build_query_string(url, requests_kwargs.get("params")) web_ids.append(webid) event_start_times.append(event_start_time) @@ -278,38 +271,46 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs): batch_requests_parameters = [] response_index = 0 for json_response in json_responses: + response_content = json_response.get("Content", {}) webid = web_ids[response_index] event_start_time = event_start_times[response_index] event_end_time = event_end_times[response_index] - if OSIsoftConstants.DKU_ERROR_KEY in json_response: - json_response['event_frame_webid'] = "{}".format(webid) - yield json_response - items = json_response.get(OSIsoftConstants.API_ITEM_KEY, []) + if OSIsoftConstants.DKU_ERROR_KEY in response_content: + if endpoint_type == "event_frames": + response_content['event_frame_webid'] = "{}".format(webid) + yield response_content + items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) for item in items: if event_start_time: item['StartTime'] = event_start_time if event_end_time: item['EndTime'] = event_end_time - item['event_frame_webid'] = "{}".format(webid) + if endpoint_type == "event_frames": + item['event_frame_webid'] = "{}".format(webid) yield item response_index += 1 web_ids = [] - def _batch_requests(self, batch_requests_parameters): + def _batch_requests(self, batch_requests_parameters, method=None): + method = method or "GET" batch_endpoint = self.endpoint.get_batch_endpoint() batch_body = {} index = 0 for row_request_parameters in batch_requests_parameters: - batch_body["{}".format(index)] = { - "Method": "GET", - "Resource": "{}".format(row_request_parameters.get("url")) - } + batch_request = {} + batch_request["Method"] = method + batch_request["Resource"] = "{}".format(row_request_parameters.get("url")) + if "data" in row_request_parameters: + batch_request["Content"] = "{}".format(row_request_parameters.get("data")) + if "json" in row_request_parameters: + batch_request["Content"] = "{}".format(row_request_parameters.get("json")) + batch_body["{}".format(index)] = batch_request index += 1 response = self.post_value(url=batch_endpoint, data=batch_body) json_response = simplejson.loads(response.content) for index in range(0, len(batch_requests_parameters)): batch_section = json_response.get("{}".format(index), {}) - yield batch_section.get("Content", {}) + yield batch_section def generic_get_kwargs(self, **kwargs): headers = self.get_requests_headers() @@ -540,6 +541,15 @@ def post_value(self, url, data): ) return response + def prepare_post_all_values(self, webid, buffer): + url = self.endpoint.get_stream_record_url(webid) + headers = OSIsoftConstants.WRITE_HEADERS + params = {} + requests_kwargs = self.generic_get_kwargs(url=url, headers=headers, params=params, data=buffer) + requests_kwargs['url'] = url + requests_kwargs['json'] = buffer + return requests_kwargs + def post(self, url, headers, params, data, can_raise=True, error_source=None): url = build_query_string(url, params) logger.info("Trying to post to {}".format(url)) @@ -878,11 +888,11 @@ def write_row(self, row): "Value": row[self.value_rank] } if self.value_url: - self.client.post_value(self.path, data) + return self.client.post_value(self.path, data) else: - self.client.post_stream_value(self.webid, data) + return self.client.post_stream_value(self.webid, data) - def timestamp_convertion(self, timestamp): + def timestamp_conversion(self, timestamp): return timestamp def close(self): @@ -896,6 +906,103 @@ def close(self): # } +class OSIsoftBatchWriter(object): + # Each row of data added (write_row) first goes in a streak buffer + # streak buffer is meant to be used to push a flow of values to one AF path / webid + # If the new row concerns another webid, the current streak buffer is flushed into the request buffer + # When request buffer is full, it is flushed into the batch endpoint + + # Each write_row call is adds an entry to the responses list + # The pointer into that list is past as a _dku_counter parameter, extracted from the requests just before sending the batch + # Uppon receiving the batch response, each individual reponse is reordered + # and the status code / error messages are stored into the responses list at the right pointer + + # Possible improvement: flush the responses list (and result writing in the recipe.py) at each _flush_requests to keep memory from going up. + + # My Regards to the unsung Hero who had to review this code + + def __init__(self, client, max_streak_buffer_size=500, max_requests_buffer_size=500): + logger.info("Initializing OSIsoftBatchWriter, msbs={}, mrbs={}".format(max_streak_buffer_size, max_requests_buffer_size)) + self.client = client + self.streak_buffer = [] # list of points from a single webid -> can be all sent in one request + self.requests_buffer = [] # list of independant requests -> sent one batch at a time + self.responses = [] # building a list of reponses in same order as write_row calls + self.current_webid = None + self.max_buffer_size = max_streak_buffer_size + self.max_requests_buffer_size = max_requests_buffer_size + self.current_streak = 0 + self.row_number = 0 + + def write_row(self, webid, timestamp, value): + response = None + if not validate_timestamp(timestamp): + error_message = "Timestamp '{}' has an invalid format".format( + timestamp + ) + logger.error(error_message) + # No valid timestamp so we skip this row + return { + "Error": error_message + } + # mark in self.responses that status of this write depends on result of streak X + if self.current_webid is None: + logger.info("webid now is {}".format(webid)) + self.current_webid = webid + if webid != self.current_webid or len(self.streak_buffer) >= self.max_buffer_size: + logger.info("webid: {} / {}".format(webid, self.current_webid)) + self._flush_streak() + self.current_webid = webid + self.streak_buffer.append( + { + "Timestamp": "{}".format(timestamp), + "Value": "{}".format(value), + "_dku_counter": self.row_number + } + ) + self.row_number += 1 + logger.info("Pushed in buffer, now {}".format(len(self.streak_buffer))) + self.responses.append({"streak": self.current_streak}) + return response + + def _flush_streak(self): + # streak buffer must be flushed every time the webid changes or before closing the session + logger.info("flushing streak") + kwargs = self.client.prepare_post_all_values(self.current_webid, self.streak_buffer) + self.requests_buffer.append(kwargs) + logger.info("pushed to requests buffer {}".format(len(self.requests_buffer))) + if len(self.requests_buffer) >= self.max_requests_buffer_size: + logger.info("Buffer is full, flushing current requests") + self._flush_requests() + self.streak_buffer = [] + self.current_streak += 1 + + def _flush_requests(self): + logger.info("flushing current requests") + request_buffer, streaks_pointers = prepare_request_buffer(self.requests_buffer) + json_responses = self.client._batch_requests(request_buffer, method='POST') + for json_response, streak_pointers in zip(json_responses, streaks_pointers): + for streak_pointer in streak_pointers: + self.responses[streak_pointer] = json_response + self.requests_buffer = [] + + def close(self): + logger.info("closing") + self._flush_streak() + self._flush_requests() + return self.responses + + +def validate_timestamp(timestamp): + valid_formats=["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"] + for valid_format in valid_formats: + try: + datetime.strptime(timestamp, valid_format) + return True + except Exception: + pass + return False + + def formatted_error_source(error_source): return "({}) ".format(error_source) if error_source else "" @@ -951,3 +1058,18 @@ def apply_manual_inputs(kwargs): def is_parameter_greater_than_max_allowed(error_message): return "Error 400" in "{}".format(error_message) and "is greater than the maximum allowed" in "{}".format(error_message) + + +def prepare_request_buffer(request_buffer): + # remove _dku_counter from the request, + # produce a list of write_row call number per streak + # later used to determine which response status code / error applies to which write_row call + row_counter = [] + for streak in request_buffer: + json = streak.get("json", []) + dku_counter = [] + for row in json: + _dku_counter = row.pop("_dku_counter", None) + dku_counter.append(_dku_counter) + row_counter.append(dku_counter) + return request_buffer, row_counter diff --git a/python-lib/osisoft_endpoints.py b/python-lib/osisoft_endpoints.py index 1d2bd03..8c9ff96 100644 --- a/python-lib/osisoft_endpoints.py +++ b/python-lib/osisoft_endpoints.py @@ -43,6 +43,10 @@ def get_stream_value_url(self, webid): url = self.get_base_url() + "/streams/{webid}/value".format(webid=webid) return url + def get_stream_record_url(self, webid): + url = self.get_base_url() + "/streams/{webid}/recorded".format(webid=webid) + return url + def get_asset_servers_url(self): url = self.get_base_url() + "/assetservers" return url