Source code for Application.kalman_filter

import ClarkePark
import networkx
import numpy
import pandapower
import pandas
import sympy

import scipy.stats as stats

from filterpy.kalman import KalmanFilter

from Application import pandapower_model


[docs] def translate_current_transformers(cts: str) -> dict: """ Since the user input in the GUI is a string list of the considered current transformers (cts) in the investigations, its information needs to be split up and stored separately. Current transformer input information needs to follow the following scheme: "<measured bus>: <measured direction>." e.g. "bus4: -bus6" means that the current from bus 6 to bus 4 is measured and the direction from bus 6 to bus 4 is measured positive. Removing the minus leads to a conversion of the positive measurement direction. :param cts: String holding the current transformer information \ inserted by the user in the GUI :type cts: str :return: **ct_translated** (dict) - dictionary holding the \ separated current transformer information """ index = 0 # dictionary to collect the separated information ct_translated = {} # iterate threw the users entries which are separated by commas for ct in cts.split(","): # remove spaces used after the separation comma while ct[0] == " ": ct = ct[1:] # get the measuring spot measuring_facility = ct.split(":")[0] direction = ct.split(":")[1] # remove leading spaces while direction[0] == " ": direction = direction[1:] while measuring_facility[-1] == " ": measuring_facility = measuring_facility[:-1] # check if direction starts with a - identifying that the # measured positive current equals a current in the # opposite direction e.g. bus8: - bus4 means currents from # bus4 to bus8 are measured positive if direction[0] == "-": current_source = direction[1:] current_sink = measuring_facility else: current_source = measuring_facility current_sink = direction # create a dictionary entry holding the considered ct data ct_translated.update({ "CT {}".format(index): { "facility": measuring_facility, "current_source": current_source, "current_sink": current_sink } }) index += 1 return ct_translated
[docs] def translate_voltage_transformers(vts: str) -> list: """ Translate the user input string holding the considered vts into their pandapower equivalent index. :param vts: the user input string holding the considered vts :type vts: str :return: **vt_nodes** (list) - list of the pandapower indices \ of the considered vts """ vt_nodes = [] # iterate threw the user inputs of vts (seperated by comma) for vt in vts.split(","): # remove leading spaces in list while vt[0] == " ": vt = vt[1:] # add dict entry of the pandapower equivalent index vt_nodes.append(int(vt[-1])) return vt_nodes
[docs] def update_nxgraph(cts: dict, network: pandapower.pandapowerNet ) -> (networkx.MultiGraph, list): """ In this method, the network of the power system is cut off at the balancing boundaries and exposed nodes are deleted. This is illustrated in the network graph update. The boundary nodes should become red, for which they must first be collected in the “nodes” data structure. :param cts: dictionary containing the in the kalman filter \ consideration necessary current transformer :type cts: dict :param network: Network instance of pandapower module :type network: pandapower.pandapowerNet :return: - **nxg_network** (networkx.MultiGraph) - Updated \ networkx MultiGraph instance - **nodes** (list) - List holding the balancing \ boundary nodes """ # get the nxgraph of the complete network incl. the parts outside # the consideration nxg_network, pos = pandapower_model.create_nxgraph(network=network) nodes = [] # iterate threw the dictionary of current transformer for ct in cts: # create a local copy of the nxgraph of the network nxg_network_copy = nxg_network.copy() # iterate threw network nodes for node in nxg_network.nodes: # remove the nodes outside the area of consideration if node == int(cts[ct]["current_source"][-1]): nxg_network_copy.remove_node(node) # make the copy the new networkx graph nxg_network = nxg_network_copy.copy() # collect the nodes of incoming currents nodes.append(int(cts[ct]["current_sink"][-1])) return nxg_network, nodes
[docs] def get_temporal_resolution(measurement_data: str) -> float: """ Calculate the temporal resolution of the time series to be considered within the Kalman Filter simulation. :param measurement_data: path where the measurement data is \ stored. User Input in the streamlit GUI :type measurement_data: str :return: **dt** (float) - temporal resolution of the \ investigated time series """ # read csv data to pandas data frame measurement_df = pandas.read_csv(measurement_data) # reset data frame index column measurement_df = measurement_df.reset_index(drop=False) # calculate temporal resolution dt = abs(float(measurement_df["Alle Berechnungsarten"][2]) - float(measurement_df["Alle Berechnungsarten"][1])) # return temporal resolution return dt
[docs] def calc_inductance_from_reactance(reactance: float, frequency: float) -> float: """ Since the line parameters in Powerfactory consist of impedance and susceptance, the reactance will be converted into a inductance in this method. .. math:: & L = \\frac{X}{2 \\cdot \\pi \\cdot f} :param reactance: reactance of the transmission line in ohm :type reactance: float :param frequency: frequency of the considered power system :type frequency: float :return: **capacity** (float): calculated capacity of the \ considered transmission line """ inductance = reactance / (2 * numpy.pi * frequency) return inductance
[docs] def get_vertices_parameters( buses: list, lines: list, network: pandapower.pandapowerNet ) -> (dict, dict, dict): """ Method to extract the vertex parameters from the pandapower network instance for easier handling in later kalman filter methods. :param buses: list holding all power system buses :type buses: list :param lines: list holding all power system lines :type lines: list :param network: pandapower network instance of the considered \ power system :type network: pandapower.pandapowerNet :return: - **vertices_capacities** (dict): dictionary holding \ the vertices capacities which are calculate within this \ method since parallel capacitors of the transmission lines \ add up in the denominator in the kalman filter matrices. - **edges_inductances** (dict): dictionary holding \ the combination of the edge label and its calculated inductance. - **edges_resistance** (dict): dictionary holding \ the combination of the edge label and its resistance. """ # create empty dicts for data collection vertice_capacities = {} edges_resistance = {} edges_inductance = {} # iterate over the power system buses for bus in buses: # get a list of edges connected to the currently considered bus edges = [s for s in lines if str(bus) in s] # iterate over the collected edges for edge in edges: # get the two vertices at the ends of the considered edge node_1 = int(edge.split("_")[0]) node_2 = int(edge.split("_")[1]) # try if the vertices are in the correct order line = network["line"].query( "from_bus == @node_1 and to_bus == @node_2" ) # if not use the opposite direction if line.empty: line = network["line"].query( "from_bus == @node_2 and to_bus == @node_1" ) # unit conversion of line capacity cap = list(line["c_nf_per_km"])[0] * 0.000000001 res = list(line["r_ohm_per_km"])[0] # calculation of the line inductance from its reactance ind = calc_inductance_from_reactance( reactance=list(line["x_ohm_per_km"])[0], frequency=50 ) # collection of bus capacities by adding a new entry or # adding the new capacity to the already stored one if bus in vertice_capacities.keys(): vertice_capacities[bus] += cap else: vertice_capacities.update({bus: cap}) # collection of the line inductance and resistance edges_resistance.update({edge: res}) edges_inductance.update({edge: ind}) return vertice_capacities, edges_inductance, edges_resistance
[docs] def voltage_differential_equations( buses: list, lines: list, vertice_capacities: dict, matrix_A: sympy.MutableDenseMatrix, dt: float ) -> sympy.MutableDenseMatrix: """ Method to fill in the relevant matrix cells for the voltage differential equations in the system matrix A. The voltage differential equation for continuous systems is given by: .. math:: & \\frac{dv}{dt} = \\frac{2}{\\sum{C}} \\cdot (\\sum{i}) thereby incoming currents are represented by positive entries and outgoing currents are represented by negative entries. The conversion of this equation to a discrete representation by using the euler conversion leads to the following equation. .. math:: & \\frac{dv}{dt} = \\frac{2 \\cdot dt}{\\sum{C}} \\cdot (\\sum{i}) :param buses: list of all power system buses :type buses: list :param lines: list of all power system lines :type lines: list :param vertice_capacities: dictionary holding the vertex \ specific sum of all its connected lines capacities :type vertice_capacities: dict :param matrix_A: system matrix of the upcoming kalman filter \ simulations. Since the voltage differential equations are \ the first step of the system matrix creation the system \ matrix shall consist of zeros. :type matrix_A: sympy.MutuableDenseMatrix :param dt: temporal resolution of the investigated measurement \ data :type dt: float :return: - **matrix_A** (sympy.MutuableDenseMatrix) - power \ system system matrix for the state space representation \ now holding the voltage differential equation information. """ index = len(buses) - 1 # iterate threw all buses for bus in buses: # get edges where bus is in edge label edges = [s for s in lines if str(bus) in s] # since a discrete time series is analyzed add a 1 for each # voltage entry if len(edges) != 0: matrix_A[bus, bus] = 1 # iterate threw the edges for edge in edges: sending_end = int(edge.split("_")[0]) receving_end = int(edge.split("_")[1]) # separate the from and to bus and calc the difference to # get the entries position col = index + (receving_end - sending_end) if sending_end == bus: matrix_A[bus, col] = -(2 * dt) / (vertice_capacities[bus]) bus2 = bus + (receving_end - sending_end) matrix_A[bus2, col] = ((2 * dt) / (vertice_capacities[receving_end])) index += (8 - bus) return matrix_A
[docs] def current_differential_equations( matrix_A: sympy.MutableDenseMatrix, lines: list, edge_resistance: dict, edge_inductance: dict, dt: float, buses: list ): """ Method to fill in the relevant matrix cells for the current differential equations in the system matrix A. The current differential equation for continuous systems is given by: .. math:: & \\frac{di}{dt} = \\frac{1}{L} \\cdot v1 - \\frac{R}{L} \\cdot i - \\frac{1}{L} \\cdot v2 thereby v1 represents the sending end which is simply set to the lower bus index and v2 represents the receiving end. The conversion of this equation to a discrete representation by using the euler conversion leads to the following equation. .. math:: & \\frac{di}{dt} = \\frac{dt}{L} \\cdot v1 - \\frac{R \\cdot dt}{L} \\cdot i - \\frac{dt}{L} \\cdot v2 :param matrix_A: system matrix of the upcoming kalman filter \ simulations. :type matrix_A: sympy.MutuableDenseMatrix :param lines: list of all power system lines :type lines: list :param edge_resistance: dictionary holding the edge \ specific resistance :type edge_resistance: dict :param edge_inductance: dictionary holding the edge \ specific inductance :type edge_inductance: dict :param dt: temporal resolution of the investigated measurement \ data :type dt: float :param buses: list of all power system buses :type buses: list :return: **matrix_A** (sympy.MutuableDenseMatrix) - power \ system system matrix for the state space representation \ now holding the current differential equation information. """ index = len(buses) - 1 # iterate over all power system buses for bus in buses: # iterate over all power system lines for line in lines: # get the line sending and receiving ends from its key sending_end = int(line.split("_")[0]) receiving_end = int(line.split("_")[1]) # check if the sending end equals the currently considered # bus if sending_end == bus: # calculate the row of the new entries in the system # matrix A row = index + (receiving_end - sending_end) # set the sending end part of the equation matrix_A[row, sending_end] = dt/edge_inductance[line] # set the receiving end part of the equation matrix_A[row, receiving_end] = -dt/edge_inductance[line] # set the current part of the equation, 1 results from # the discretization of the differential equation matrix_A[row, row] = 1 - ((edge_resistance[line]*dt) / edge_inductance[line]) # set the index for the next bus entries index += (len(buses) - 1 - bus) return matrix_A
[docs] def calculate_system_matrix_A( network: pandapower.pandapowerNet, dt: float ) -> (sympy.MutableDenseMatrix, dict): """ Main method to create the kalman filter system matrix of a system of overhead lines. Therefore the set of voltage and current differential equations is calculated automatically to avoid creation errors. :param network: pandapower network instance representing the \ investigated power system :type network: pandapower.pandapowerNet :param dt: temporal resolution of the investigated simulation \ measurement data :type dt: float :return: - **matrix_A** (sympy.MutableDenseMatrix) - kalman \ filter system matrix filled with current and voltage \ differential equations to represent the state space. - **vertice_capacities** (dict) - dictionary holding \ the vertex specific sum of all its connected lines \ capacities """ # vertices buses = list(network["bus"].index) # edges = OH-Lines lines = [] for index, line in network["line"].iterrows(): if line["from_bus"] < line["to_bus"]: lines.append(str(line["from_bus"]) + "_" + str(line["to_bus"])) else: lines.append(str(line["to_bus"]) + "_" + str(line["from_bus"])) # extract line parameters from pandapower pandas DataFrames vertice_capacities, edge_inductance, edge_resistance = ( get_vertices_parameters(buses=buses, lines=lines, network=network) ) # calculate the dimension of the system matrix since the maximum # size is a complete graph where each vertex is conected to each # over dim_matrix = len(buses) + int((len(buses) * (len(buses) - 1)) / 2) # create an empty sympy matrix of the calculated size matrix_A = sympy.zeros(dim_matrix, dim_matrix) # calc voltage differential equations matrix_A = voltage_differential_equations( buses=buses, lines=lines, vertice_capacities=vertice_capacities, matrix_A=matrix_A, dt=dt ) # calc current differential equations matrix_A = current_differential_equations( matrix_A=matrix_A, lines=lines, edge_resistance=edge_resistance, edge_inductance=edge_inductance, dt=dt, buses=buses ) return matrix_A, vertice_capacities
[docs] def calculate_system_matrix_B( measured_nodes: list, vertice_capacities: dict, rows_A: int, dt: float ) -> sympy.MutableDenseMatrix: """ Main method to create the kalman filter control matrix of a system of overhead lines. Therefore the set of voltage differential equations is completed by adding the term for the extern current influence. :param measured_nodes: list holding all measured nodes :type measured_nodes: list :param vertice_capacities: dictionary holding the vertex \ specific sum of all its connected lines capacities :type vertice_capacities: dict :param rows_A: dimension of the previously create system matrix :type rows_A: int :param dt: temporal resolution of the investigated simulation \ measurement data :type dt: float :return: - **matrix_B** (sympy.MutableDenseMatrix) - kalman \ filter control matrix of the considered power system """ # sort the measured nodes list measured_nodes.sort() # create an empty matrix for matrix B matrix_B = sympy.zeros(rows_A, len(measured_nodes)) index = 0 # iterate threw the measured nodes for node in measured_nodes: # add the voltage differential equation part for each measured # current matrix_B[node, index] = (2*dt)/vertice_capacities[node] index += 1 return matrix_B
[docs] def calculate_system_matrix_H( measured_nodes: list, rows_A: int ) -> sympy.MutableDenseMatrix: """ Main method to create the kalman filter observation matrix of a system of overhead lines. Therefore measured nodes are evaluated and their corresponding matrix cells are set to one. :param measured_nodes: list of nodes where the voltages are \ measured by voltage transformers :type measured_nodes: list :param rows_A: dimension of the system matrix :type rows_A: int :return: - **matrix_H** (sympy.MutableDenseMatrix) - filled \ kalman filter observation matrix """ # sort the list of voltage measurement nodes measured_nodes.sort() # create an empty observation matrix matrix_H = sympy.zeros(len(measured_nodes), rows_A) index = 0 # iterate over all voltage measurements for node in measured_nodes: # set the corresponding cells to one matrix_H[index, node] = 1 index += 1 return matrix_H
[docs] def convert_matrices_to_clarke( matrix_A: sympy.MutableDenseMatrix, matrix_B: sympy.MutableDenseMatrix, matrix_H: sympy.MutableDenseMatrix ) -> (numpy.ndarray, numpy.ndarray, numpy.ndarray): """ Convert discrete kalman matrices to matrices usable with clarke converted signals. Therefore the one line matrices need to be doubled since alpha and beta component of the voltage or current signals need to be represented. :param matrix_A: Kalman filter system matrix one line \ representation :type matrix_A: sympy.MutableDenseMatrix :param matrix_B: Kalman filter control matrix one line \ representation :type matrix_B: sympy.MutableDenseMatrix :param matrix_H: Kalman filter observation matrix one line \ representation :type matrix_H: sympy.MutableDenseMatrix :return: - **matrix_A** (numpy.ndarray) - numpy converted \ kalman filter system matrix for discrete clark transformed \ measurement data - **matrix_B** (numpy.ndarray) - numpy converted \ kalman filter control matrix for discrete clark \ transformed measurement data - **matrix_H** (numpy.ndarray) - numpy converted \ kalman filter observation matrix for discrete clark \ transformed measurement data """ # get the dimension of the system matrix of the one line # representation dim_A = matrix_A.rows # create a zeros matrix of the size of the system matrix matrix_zeros = sympy.zeros(dim_A, dim_A) # append the zeros matrix to the system matrix horizontally matrix_A_alpha = matrix_A.col_join(matrix_zeros) matrix_A_beta = matrix_zeros.col_join(matrix_A) # append the two new matrices vertically to each other matrix_A = matrix_A_alpha.row_join(matrix_A_beta) # create a zeros matrix of the size of the control matrix matrix_zeros = sympy.zeros(dim_A, matrix_B.cols) # append the zeros matrix to the system matrix horizontally matrix_B_alpha = matrix_B.col_join(matrix_zeros) matrix_B_beta = matrix_zeros.col_join(matrix_B) # append the two new matrices vertically to each other matrix_B = matrix_B_alpha.row_join(matrix_B_beta) # create a zeros matrix of the size of the observation matrix matrix_zeros = sympy.zeros(matrix_H.rows, dim_A) # append the zeros matrix to the system matrix horizontally matrix_H_alpha = matrix_H.col_join(matrix_zeros) matrix_H_beta = matrix_zeros.col_join(matrix_H) # append the two new matrices vertically to each other matrix_H = matrix_H_alpha.row_join(matrix_H_beta) # convert the resulting matrices to numpy arrays for later usage matrix_A = numpy.array(matrix_A.tolist()).astype(numpy.float64) matrix_B = numpy.array(matrix_B.tolist()).astype(numpy.float64) matrix_H = numpy.array(matrix_H.tolist()).astype(numpy.float64) return matrix_A, matrix_B, matrix_H
[docs] def clarke_transformation(measurement: pandas.Series) -> (float, float): """ Transform the RST values to alpha beta 0 values using the clark transformation matrix. :param measurement: voltages or currents of a power system \ measurement in RST :type measurement: pandas.Series :returns: - **alpha** (float) : Alpha component of voltage or \ current. - **beta** (float) : Beta component of voltage or \ current. """ # create the alpha beta 0 vector alpha, beta, null = ClarkePark.abc_to_alphaBeta0( a=float(measurement.iloc[0]), b=float(measurement.iloc[1]), c=float(measurement.iloc[2])) return alpha, beta
[docs] def array_from_measurement_data( measurement_df: pandas.DataFrame, measured_nodes: list, network: pandapower.pandapowerNet ) -> (numpy.ndarray, numpy.ndarray): """ Converts the measurement data exported from powerfactory from their RST Form into Alpha, Beta, 0 Form using the clarke transformation. TODO: currently the label of voltage transformer can only be \ the german one "Spannungswandler" :param measurement_df: Dataframe containing the measurement \ data exported from powerfactory :type measurement_df: pandas.DataFrame :param measured_nodes: List of the considered buses labels \ e.g. [5, 7] :type measured_nodes: list :param network: pandapower network instance representing the \ investigated power system :type network: pandapower.pandapowerNet :returns: - **measurement_u** (numpy.ndarray): Containing the \ Alpha Beta Form of the powerfactory current measurement data - **measurement_y** (numpy.ndarray): Containing the \ Alpha Beta Form of the powerfactory voltage measurement data """ # measurement u represents the input values for the prediction step measurements_u = [] # measurement y represents the input values for the correction in # the control step measurements_y = [] buses = [] # sort the inserted measured buses and get their pandas data frame # entry from the dataframe of power system buses measured_nodes.sort() for node in measured_nodes: buses.append(network["bus"].iloc[node]["name"].replace(" ", "")) # iterate threw the imported measured data set for _, row in measurement_df.iterrows(): y_alpha = [] y_beta = [] u_alpha = [] u_beta = [] for bus in buses: # get the bus specific VT by its power factory label voltages_bus_entry = row[row.index.str.contains( '{}'.format("Spannungswandler_" + bus))] # get the voltages of the current bus in alpha beta # coordinates alpha, beta = clarke_transformation(measurement=voltages_bus_entry) # add the results to a list of alpha and a list of beta # values y_alpha.append(alpha) y_beta.append(beta) # get the bus specific CT by its power factory label current_bus_entry = row[ row.index.str.contains('{}'.format("_" + bus)) & ~row.index.str.contains('Spannungswandler') ] # get the currents of the current bus in alpha beta # coordinates alpha, beta = clarke_transformation(measurement=current_bus_entry) # add the results to a list of alpha and a list of beta # values u_alpha.append(alpha) u_beta.append(beta) # append the new timestep on the measurement y list measurements_y.append(y_alpha + y_beta) measurements_u.append(u_alpha + u_beta) # convert the measurement lists into numpy array for later # processing measurements_u = numpy.array(measurements_u) measurements_y = numpy.array(measurements_y) return measurements_u, measurements_y
[docs] def initialize_matrices( process_covariance_volt: float, process_covariance_curr: float, measurement_covariance: float, dim_x: int, dim_z: int, network: pandapower.pandapowerNet ) -> (numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray): """ Method for initializing the matrices representing the system uncertainties. :param process_covariance_volt: Uncertainty of the voltage \ estimation of the kalman filter :type process_covariance_volt: float :param process_covariance_curr: Uncertainty of the current \ estimation of the kalman filter :type process_covariance_curr: float :param measurement_covariance: Uncertainty of the measurement \ resulting from mean free gaussian noise. :type measurement_covariance: float :param dim_x: dimension of the state space vector of the \ investigated system :type dim_x: int :param dim_z: dimension of the measurement vector z of the \ investigated system :type dim_z: int :param network: pandapower network instance representing the \ investigated power system :type network: pandapower.pandapowerNet :return: - **matrix_P** (numpy.ndarray) - kalman filter \ covariance matrix. Holding dimx x dimx zeros. - **matrix_R** (numpy.ndarray) - kalman filter \ measurement noise covariance matrix. - **matrix_Q** (numpy.ndarray) - kalman filter \ process noise covariance matrix. - **vector_x** (numpy.ndarray) - kalman filter \ state space vector consisting of dimx zeros. """ # get the number of power system buses buses = len(network["bus"]) # create zero matrices for process noise covariance and the central # covariance matrix matrix_P = numpy.zeros((dim_x, dim_x)) matrix_Q = numpy.zeros((dim_x, dim_x)) # create an identity matrix for the measurement vector covariance # matrix since all values are of the same kind (here: voltage) matrix_R = numpy.eye(dim_z) * float(measurement_covariance) # process noise covariance matrix creation process # a distinction between current and voltage entries is necessary num_curr = ((buses * (buses - 1)) / 2) # iterate over voltage and currents in alpha and beta form for i in range(int(2 * (buses + num_curr))): # alpha voltages if i < buses: matrix_Q[i, i] = float(process_covariance_volt) # alpha currents elif buses <= i < (buses + num_curr): matrix_Q[i, i] = float(process_covariance_curr) # beta voltages elif (buses + num_curr) <= i < (2 * buses + num_curr): matrix_Q[i, i] = float(process_covariance_volt) # beta currents else: matrix_Q[i, i] = float(process_covariance_curr) # create an empty state space vector x vector_x = numpy.array([0] * dim_x) return matrix_P, matrix_R, matrix_Q, vector_x
[docs] def compute_results( network: pandapower.pandapowerNet, timestep: int, return_dict: dict, vec_x: numpy.ndarray ) -> dict: """ Method to compute the state space vector to easy interpretable dictionary for easier result visualization. :param network: pandapower network instance representing the \ investigated power system :type network: pandapower.pandapowerNet :param timestep: currently considered timestep :type timestep: int :param return_dict: dictionary holding the result data for \ later visualization :type return_dict: dict :param vec_x: current iteration step state space vector :type vec_x: numpy.ndarray :return: - **return_dict** (dict) - dictionary holding the \ result data for later visualization. Within this method \ the new entry timestep has been appended. """ # calculate number of voltage entries and currents per clarke # component num_volt = len(list(network["bus"])) num_cur = ((num_volt * (num_volt - 1)) / 2) # initialize counter for the correct labeling of the resulting # state vector send_end = 0 counter = 0 counter2 = 1 # iterate threw the state vector for row in vec_x: # if no entry for the current timestep exists create one if timestep not in return_dict.keys(): return_dict.update({timestep: {}}) # extract the entry corresponding line entry from the data # frame of lines recv_end = counter2 # filter the line data frame on entries where from and # to bus are in sending and receiving end filtered_df = network["line"][ network["line"]['to_bus'].isin([send_end, recv_end]) | network["line"]['from_bus'].isin([send_end, recv_end]) ] # check if there exists a dataframe entry after the # usage of the filter if not filtered_df.empty: # get the line from sending end to receiving end line = filtered_df[(filtered_df["from_bus"] == send_end) & (filtered_df["to_bus"] == recv_end)] # check if this direction results in a dataframe # entry else try the other way around if line.empty: line = filtered_df[ (filtered_df["to_bus"] == send_end) & (filtered_df["from_bus"] == recv_end) ] else: line = None # handling of the alpha voltages if counter < num_volt: return_dict[timestep].update( {network["bus"].iloc[counter]["name"] + "_alpha": row}) # handling of the alpha currents elif num_volt <= counter < (num_volt + num_cur): # check if this direction results in a dataframe # entry if yes append its value to the return dict if line is not None and not line.empty: return_dict[timestep].update( {list(line["name"])[0] + "_alpha": row}) if counter2 == (num_volt - 1): counter2 = send_end + 1 send_end += 1 counter2 += 1 # handling beta voltages elif (num_volt + num_cur) <= counter < (2 * num_volt + num_cur): bus_num = int(counter - (num_volt + num_cur)) return_dict[timestep].update( {network["bus"].iloc[bus_num]["name"] + "_beta": row}) counter2 = 1 # resetting send_end for the upcoming beta currents send_end = 0 # handling beta currents else: if line is not None and not line.empty: return_dict[timestep].update( {list(line["name"])[0] + "_beta": row}) if counter2 == (num_volt - 1): counter2 = send_end + 1 send_end += 1 counter2 += 1 counter += 1 return return_dict
[docs] def run( system_matrices: dict, measurements: dict, kalman_matrices: dict, network: pandapower.pandapowerNet ) -> dict: """ Method to start the kalman filter simulation. Therefore all matrices previously calculated are stored in the kalman filter equivalent variables and a iteration over each time step is rolled out. :param system_matrices: dict holding the kalman filter \ matrices (A, B and H) :type system_matrices: dict :param measurements: dict holding the measurement vectors (u, y) :type measurements: dict :param kalman_matrices: dict holding the covariance matrices \ (P, R and Q) as well as the state space vector (x) :type kalman_matrices: dict :param network: pandapower network instance representing the \ considered power system :type network: pandapower.pandapowerNet :return: - **return_dict** (dict) - dictionary holding \ timestep wise extracted kalman filter results """ # initalize the KalmanFiler class of the python package filterpy kf = KalmanFilter( dim_x=system_matrices["matrix_A"].shape[0], dim_z=measurements["measurement_y"][0].shape[0], dim_u=measurements["measurement_u"][0].shape[0] ) # place the input parameters in their corresponding kalman filter # variable kf.F = system_matrices["matrix_A"] kf.B = system_matrices["matrix_B"] kf.H = system_matrices["matrix_H"] kf.x = kalman_matrices["vector_x"] kf.P = kalman_matrices["matrix_P"] kf.R = kalman_matrices["matrix_R"] kf.Q = kalman_matrices["matrix_Q"] # create empty return dict as well as a timestep counter timestep = 0 return_dict = {} # iterate threw the simulated time steps for vec_u, vec_y in zip(measurements["measurement_u"], measurements["measurement_y"]): # run kalman filter prediction step kf.update(z=vec_y) # compute the state space vector return_dict = compute_results( network=network, timestep=timestep, return_dict=return_dict, vec_x=kf.x ) # run kalman filter prediction kf.predict(u=vec_u) timestep += 1 return return_dict
[docs] def calc_wse( results: dict, measured_nodes: list, measurement_y: numpy.ndarray, network: pandapower.pandapowerNet, weighting_factor: float ) -> dict: """ In the Kalman filter protection algorithm, the protection decision is based on a hypothesis test. For this purpose, the difference between the variables measured in the system and the state space vector must be converted to a parameter. This is possible with the help of the weighted squared error (wse), which can then be used to test the hypothesis with a chi2 distribution. In this method, the weighted squared errors (wse) are calculated. :param results: holding the dictionary of human readable data \ the state space vector from the kalman filter simulation :type results: dict :param measured_nodes: list of the buses where the voltage is \ measured by a voltage transformer :type measured_nodes: list :param measurement_y: measurement vectors of the real signals \ resulting from the powerfactory simulation :type measurement_y: numpy.ndarray :param network: pandapower network instance representing the \ investigated power system :type network: pandapower.pandapowerNet :param weighting_factor: weight factor for the calculation of \ the weighted squared error :type weighting_factor: float :return: - **return_dict** (dict) - dictionary holding the \ weighted squared error foreach timestep simulated """ # create an empty return dictionary for data collection return_dict = {} # sort the measured nodes indices ascending measured_nodes.sort() # collect the measured nodes bus names buses = [] for i in measured_nodes: buses.append(network["bus"]["name"][i]) # create the weighting matrix matrix_W = numpy.eye(2*len(measured_nodes)) * float(weighting_factor) # iterate over the result dictionary for timestep in results.keys(): diffs = [] # iterate over alpha and beta results of the considered buses for index in range(2 * len(measured_nodes)): # for the first len nodes the resulting entries are alpha # voltage entries if index < len(measured_nodes): # calculate the difference of the state space result # and the powerfactory simulation row = (results[timestep]["{}_alpha".format( buses[index])] - measurement_y[timestep][index]) # for the second len nodes the resulting entries are beta # voltage entries else: # calculate the difference of the state space result # and the powerfactory simulation row = (results[timestep]["{}_beta".format( buses[index - len(measured_nodes)])] - measurement_y[timestep][index] ) # append the calculated values to the list of differences diffs.append(row) # create a vector of the list of differences vec_epsilon = numpy.array(diffs) # calculate the weighted squared error wse = numpy.dot( numpy.dot(vec_epsilon, matrix_W), vec_epsilon.T ) # append the weighted squared error to the return dict for # later visualization return_dict.update({timestep: [wse]}) return return_dict
[docs] def calc_chi2(wse: dict, df: int) -> dict: """ Within this method the evaluation of the hypothesis is carried out by evaluating the weighted squared error on the basis of a chi-squared distribution (chi2). The resulting values are return in a dictionary for later visualization. :param wse: dictionary holding the weighted squared errors \ foreach timestep calculated before. :type wse: dict :param df: number of the degrees of freedom (df). In the \ context of chi2 distributions the df is the number of not \ tracked state space vector entries vary from 0. :type df: int :return: - **return_dict** (dict) - dictionary holding the \ chi2 result foreach timestep """ # create an empty dictionary for data collection result_dict = {} # iterrate threw all entries of the weighted squared error dict for timestep in wse: # evaluate the value of chi2 for the given wse and substract # it from one to get the confidential in the hypothesis no error chi2 = 1 - stats.chi2.cdf(wse[timestep], df=df) # append the result to the return dict for later visualization result_dict.update({timestep: chi2}) return result_dict