import ClarkePark
import networkx
import numpy
import pandapower
import pandas
import sympy
import scipy.stats as stats
from filterpy.kalman import KalmanFilter
from Application import pandapower_model
[docs]
def update_nxgraph(cts: dict, network: pandapower.pandapowerNet
) -> (networkx.MultiGraph, list):
"""
In this method, the network of the power system is cut off at
the balancing boundaries and exposed nodes are deleted. This is
illustrated in the network graph update. The boundary nodes
should become red, for which they must first be collected in
the “nodes” data structure.
:param cts: dictionary containing the in the kalman filter \
consideration necessary current transformer
:type cts: dict
:param network: Network instance of pandapower module
:type network: pandapower.pandapowerNet
:return: - **nxg_network** (networkx.MultiGraph) - Updated \
networkx MultiGraph instance
- **nodes** (list) - List holding the balancing \
boundary nodes
"""
# get the nxgraph of the complete network incl. the parts outside
# the consideration
nxg_network, pos = pandapower_model.create_nxgraph(network=network)
nodes = []
# iterate threw the dictionary of current transformer
for ct in cts:
# create a local copy of the nxgraph of the network
nxg_network_copy = nxg_network.copy()
# iterate threw network nodes
for node in nxg_network.nodes:
# remove the nodes outside the area of consideration
if node == int(cts[ct]["current_source"][-1]):
nxg_network_copy.remove_node(node)
# make the copy the new networkx graph
nxg_network = nxg_network_copy.copy()
# collect the nodes of incoming currents
nodes.append(int(cts[ct]["current_sink"][-1]))
return nxg_network, nodes
[docs]
def get_temporal_resolution(measurement_data: str) -> float:
"""
Calculate the temporal resolution of the time series to be
considered within the Kalman Filter simulation.
:param measurement_data: path where the measurement data is \
stored. User Input in the streamlit GUI
:type measurement_data: str
:return: **dt** (float) - temporal resolution of the \
investigated time series
"""
# read csv data to pandas data frame
measurement_df = pandas.read_csv(measurement_data)
# reset data frame index column
measurement_df = measurement_df.reset_index(drop=False)
# calculate temporal resolution
dt = abs(float(measurement_df["Alle Berechnungsarten"][2])
- float(measurement_df["Alle Berechnungsarten"][1]))
# return temporal resolution
return dt
[docs]
def calc_inductance_from_reactance(reactance: float,
frequency: float) -> float:
"""
Since the line parameters in Powerfactory consist of impedance
and susceptance, the reactance will be converted into a
inductance in this method.
.. math::
&
L = \\frac{X}{2 \\cdot \\pi \\cdot f}
:param reactance: reactance of the transmission line in ohm
:type reactance: float
:param frequency: frequency of the considered power system
:type frequency: float
:return: **capacity** (float): calculated capacity of the \
considered transmission line
"""
inductance = reactance / (2 * numpy.pi * frequency)
return inductance
[docs]
def get_vertices_parameters(
buses: list, lines: list, network: pandapower.pandapowerNet
) -> (dict, dict, dict):
"""
Method to extract the vertex parameters from the pandapower
network instance for easier handling in later kalman filter
methods.
:param buses: list holding all power system buses
:type buses: list
:param lines: list holding all power system lines
:type lines: list
:param network: pandapower network instance of the considered \
power system
:type network: pandapower.pandapowerNet
:return: - **vertices_capacities** (dict): dictionary holding \
the vertices capacities which are calculate within this \
method since parallel capacitors of the transmission lines \
add up in the denominator in the kalman filter matrices.
- **edges_inductances** (dict): dictionary holding \
the combination of the edge label and its calculated
inductance.
- **edges_resistance** (dict): dictionary holding \
the combination of the edge label and its resistance.
"""
# create empty dicts for data collection
vertice_capacities = {}
edges_resistance = {}
edges_inductance = {}
# iterate over the power system buses
for bus in buses:
# get a list of edges connected to the currently considered bus
edges = [s for s in lines if str(bus) in s]
# iterate over the collected edges
for edge in edges:
# get the two vertices at the ends of the considered edge
node_1 = int(edge.split("_")[0])
node_2 = int(edge.split("_")[1])
# try if the vertices are in the correct order
line = network["line"].query(
"from_bus == @node_1 and to_bus == @node_2"
)
# if not use the opposite direction
if line.empty:
line = network["line"].query(
"from_bus == @node_2 and to_bus == @node_1"
)
# unit conversion of line capacity
cap = list(line["c_nf_per_km"])[0] * 0.000000001
res = list(line["r_ohm_per_km"])[0]
# calculation of the line inductance from its reactance
ind = calc_inductance_from_reactance(
reactance=list(line["x_ohm_per_km"])[0],
frequency=50
)
# collection of bus capacities by adding a new entry or
# adding the new capacity to the already stored one
if bus in vertice_capacities.keys():
vertice_capacities[bus] += cap
else:
vertice_capacities.update({bus: cap})
# collection of the line inductance and resistance
edges_resistance.update({edge: res})
edges_inductance.update({edge: ind})
return vertice_capacities, edges_inductance, edges_resistance
[docs]
def voltage_differential_equations(
buses: list, lines: list, vertice_capacities: dict,
matrix_A: sympy.MutableDenseMatrix, dt: float
) -> sympy.MutableDenseMatrix:
"""
Method to fill in the relevant matrix cells for the voltage
differential equations in the system matrix A. The voltage
differential equation for continuous systems is given by:
.. math::
&
\\frac{dv}{dt} = \\frac{2}{\\sum{C}} \\cdot (\\sum{i})
thereby incoming currents are represented by positive entries
and outgoing currents are represented by negative entries.
The conversion of this equation to a discrete representation
by using the euler conversion leads to the following equation.
.. math::
&
\\frac{dv}{dt} = \\frac{2 \\cdot dt}{\\sum{C}} \\cdot (\\sum{i})
:param buses: list of all power system buses
:type buses: list
:param lines: list of all power system lines
:type lines: list
:param vertice_capacities: dictionary holding the vertex \
specific sum of all its connected lines capacities
:type vertice_capacities: dict
:param matrix_A: system matrix of the upcoming kalman filter \
simulations. Since the voltage differential equations are \
the first step of the system matrix creation the system \
matrix shall consist of zeros.
:type matrix_A: sympy.MutuableDenseMatrix
:param dt: temporal resolution of the investigated measurement \
data
:type dt: float
:return: - **matrix_A** (sympy.MutuableDenseMatrix) - power \
system system matrix for the state space representation \
now holding the voltage differential equation information.
"""
index = len(buses) - 1
# iterate threw all buses
for bus in buses:
# get edges where bus is in edge label
edges = [s for s in lines if str(bus) in s]
# since a discrete time series is analyzed add a 1 for each
# voltage entry
if len(edges) != 0:
matrix_A[bus, bus] = 1
# iterate threw the edges
for edge in edges:
sending_end = int(edge.split("_")[0])
receving_end = int(edge.split("_")[1])
# separate the from and to bus and calc the difference to
# get the entries position
col = index + (receving_end - sending_end)
if sending_end == bus:
matrix_A[bus, col] = -(2 * dt) / (vertice_capacities[bus])
bus2 = bus + (receving_end - sending_end)
matrix_A[bus2, col] = ((2 * dt)
/ (vertice_capacities[receving_end]))
index += (8 - bus)
return matrix_A
[docs]
def current_differential_equations(
matrix_A: sympy.MutableDenseMatrix, lines: list, edge_resistance: dict,
edge_inductance: dict, dt: float, buses: list
):
"""
Method to fill in the relevant matrix cells for the current
differential equations in the system matrix A. The current
differential equation for continuous systems is given by:
.. math::
&
\\frac{di}{dt} = \\frac{1}{L} \\cdot v1 - \\frac{R}{L} \\cdot i - \\frac{1}{L} \\cdot v2
thereby v1 represents the sending end which is simply set to
the lower bus index and v2 represents the receiving end.
The conversion of this equation to a discrete representation
by using the euler conversion leads to the following equation.
.. math::
&
\\frac{di}{dt} = \\frac{dt}{L} \\cdot v1 - \\frac{R \\cdot dt}{L} \\cdot i - \\frac{dt}{L} \\cdot v2
:param matrix_A: system matrix of the upcoming kalman filter \
simulations.
:type matrix_A: sympy.MutuableDenseMatrix
:param lines: list of all power system lines
:type lines: list
:param edge_resistance: dictionary holding the edge \
specific resistance
:type edge_resistance: dict
:param edge_inductance: dictionary holding the edge \
specific inductance
:type edge_inductance: dict
:param dt: temporal resolution of the investigated measurement \
data
:type dt: float
:param buses: list of all power system buses
:type buses: list
:return: **matrix_A** (sympy.MutuableDenseMatrix) - power \
system system matrix for the state space representation \
now holding the current differential equation information.
"""
index = len(buses) - 1
# iterate over all power system buses
for bus in buses:
# iterate over all power system lines
for line in lines:
# get the line sending and receiving ends from its key
sending_end = int(line.split("_")[0])
receiving_end = int(line.split("_")[1])
# check if the sending end equals the currently considered
# bus
if sending_end == bus:
# calculate the row of the new entries in the system
# matrix A
row = index + (receiving_end - sending_end)
# set the sending end part of the equation
matrix_A[row, sending_end] = dt/edge_inductance[line]
# set the receiving end part of the equation
matrix_A[row, receiving_end] = -dt/edge_inductance[line]
# set the current part of the equation, 1 results from
# the discretization of the differential equation
matrix_A[row, row] = 1 - ((edge_resistance[line]*dt)
/ edge_inductance[line])
# set the index for the next bus entries
index += (len(buses) - 1 - bus)
return matrix_A
[docs]
def calculate_system_matrix_A(
network: pandapower.pandapowerNet, dt: float
) -> (sympy.MutableDenseMatrix, dict):
"""
Main method to create the kalman filter system matrix of a
system of overhead lines. Therefore the set of voltage and
current differential equations is calculated automatically to
avoid creation errors.
:param network: pandapower network instance representing the \
investigated power system
:type network: pandapower.pandapowerNet
:param dt: temporal resolution of the investigated simulation \
measurement data
:type dt: float
:return: - **matrix_A** (sympy.MutableDenseMatrix) - kalman \
filter system matrix filled with current and voltage \
differential equations to represent the state space.
- **vertice_capacities** (dict) - dictionary holding \
the vertex specific sum of all its connected lines \
capacities
"""
# vertices
buses = list(network["bus"].index)
# edges = OH-Lines
lines = []
for index, line in network["line"].iterrows():
if line["from_bus"] < line["to_bus"]:
lines.append(str(line["from_bus"]) + "_" + str(line["to_bus"]))
else:
lines.append(str(line["to_bus"]) + "_" + str(line["from_bus"]))
# extract line parameters from pandapower pandas DataFrames
vertice_capacities, edge_inductance, edge_resistance = (
get_vertices_parameters(buses=buses, lines=lines, network=network)
)
# calculate the dimension of the system matrix since the maximum
# size is a complete graph where each vertex is conected to each
# over
dim_matrix = len(buses) + int((len(buses) * (len(buses) - 1)) / 2)
# create an empty sympy matrix of the calculated size
matrix_A = sympy.zeros(dim_matrix, dim_matrix)
# calc voltage differential equations
matrix_A = voltage_differential_equations(
buses=buses,
lines=lines,
vertice_capacities=vertice_capacities,
matrix_A=matrix_A,
dt=dt
)
# calc current differential equations
matrix_A = current_differential_equations(
matrix_A=matrix_A,
lines=lines,
edge_resistance=edge_resistance,
edge_inductance=edge_inductance,
dt=dt,
buses=buses
)
return matrix_A, vertice_capacities
[docs]
def calculate_system_matrix_B(
measured_nodes: list, vertice_capacities: dict, rows_A: int, dt: float
) -> sympy.MutableDenseMatrix:
"""
Main method to create the kalman filter control matrix of a
system of overhead lines. Therefore the set of voltage
differential equations is completed by adding the term for
the extern current influence.
:param measured_nodes: list holding all measured nodes
:type measured_nodes: list
:param vertice_capacities: dictionary holding the vertex \
specific sum of all its connected lines capacities
:type vertice_capacities: dict
:param rows_A: dimension of the previously create system matrix
:type rows_A: int
:param dt: temporal resolution of the investigated simulation \
measurement data
:type dt: float
:return: - **matrix_B** (sympy.MutableDenseMatrix) - kalman \
filter control matrix of the considered power system
"""
# sort the measured nodes list
measured_nodes.sort()
# create an empty matrix for matrix B
matrix_B = sympy.zeros(rows_A, len(measured_nodes))
index = 0
# iterate threw the measured nodes
for node in measured_nodes:
# add the voltage differential equation part for each measured
# current
matrix_B[node, index] = (2*dt)/vertice_capacities[node]
index += 1
return matrix_B
[docs]
def calculate_system_matrix_H(
measured_nodes: list, rows_A: int
) -> sympy.MutableDenseMatrix:
"""
Main method to create the kalman filter observation matrix of a
system of overhead lines. Therefore measured nodes are
evaluated and their corresponding matrix cells are set to one.
:param measured_nodes: list of nodes where the voltages are \
measured by voltage transformers
:type measured_nodes: list
:param rows_A: dimension of the system matrix
:type rows_A: int
:return: - **matrix_H** (sympy.MutableDenseMatrix) - filled \
kalman filter observation matrix
"""
# sort the list of voltage measurement nodes
measured_nodes.sort()
# create an empty observation matrix
matrix_H = sympy.zeros(len(measured_nodes), rows_A)
index = 0
# iterate over all voltage measurements
for node in measured_nodes:
# set the corresponding cells to one
matrix_H[index, node] = 1
index += 1
return matrix_H
[docs]
def convert_matrices_to_clarke(
matrix_A: sympy.MutableDenseMatrix,
matrix_B: sympy.MutableDenseMatrix,
matrix_H: sympy.MutableDenseMatrix
) -> (numpy.ndarray, numpy.ndarray, numpy.ndarray):
"""
Convert discrete kalman matrices to matrices usable with clarke
converted signals. Therefore the one line matrices need to be
doubled since alpha and beta component of the voltage or
current signals need to be represented.
:param matrix_A: Kalman filter system matrix one line \
representation
:type matrix_A: sympy.MutableDenseMatrix
:param matrix_B: Kalman filter control matrix one line \
representation
:type matrix_B: sympy.MutableDenseMatrix
:param matrix_H: Kalman filter observation matrix one line \
representation
:type matrix_H: sympy.MutableDenseMatrix
:return: - **matrix_A** (numpy.ndarray) - numpy converted \
kalman filter system matrix for discrete clark transformed \
measurement data
- **matrix_B** (numpy.ndarray) - numpy converted \
kalman filter control matrix for discrete clark \
transformed measurement data
- **matrix_H** (numpy.ndarray) - numpy converted \
kalman filter observation matrix for discrete clark \
transformed measurement data
"""
# get the dimension of the system matrix of the one line
# representation
dim_A = matrix_A.rows
# create a zeros matrix of the size of the system matrix
matrix_zeros = sympy.zeros(dim_A, dim_A)
# append the zeros matrix to the system matrix horizontally
matrix_A_alpha = matrix_A.col_join(matrix_zeros)
matrix_A_beta = matrix_zeros.col_join(matrix_A)
# append the two new matrices vertically to each other
matrix_A = matrix_A_alpha.row_join(matrix_A_beta)
# create a zeros matrix of the size of the control matrix
matrix_zeros = sympy.zeros(dim_A, matrix_B.cols)
# append the zeros matrix to the system matrix horizontally
matrix_B_alpha = matrix_B.col_join(matrix_zeros)
matrix_B_beta = matrix_zeros.col_join(matrix_B)
# append the two new matrices vertically to each other
matrix_B = matrix_B_alpha.row_join(matrix_B_beta)
# create a zeros matrix of the size of the observation matrix
matrix_zeros = sympy.zeros(matrix_H.rows, dim_A)
# append the zeros matrix to the system matrix horizontally
matrix_H_alpha = matrix_H.col_join(matrix_zeros)
matrix_H_beta = matrix_zeros.col_join(matrix_H)
# append the two new matrices vertically to each other
matrix_H = matrix_H_alpha.row_join(matrix_H_beta)
# convert the resulting matrices to numpy arrays for later usage
matrix_A = numpy.array(matrix_A.tolist()).astype(numpy.float64)
matrix_B = numpy.array(matrix_B.tolist()).astype(numpy.float64)
matrix_H = numpy.array(matrix_H.tolist()).astype(numpy.float64)
return matrix_A, matrix_B, matrix_H
[docs]
def array_from_measurement_data(
measurement_df: pandas.DataFrame,
measured_nodes: list,
network: pandapower.pandapowerNet
) -> (numpy.ndarray, numpy.ndarray):
"""
Converts the measurement data exported from powerfactory from
their RST Form into Alpha, Beta, 0 Form using the clarke
transformation.
TODO: currently the label of voltage transformer can only be \
the german one "Spannungswandler"
:param measurement_df: Dataframe containing the measurement \
data exported from powerfactory
:type measurement_df: pandas.DataFrame
:param measured_nodes: List of the considered buses labels \
e.g. [5, 7]
:type measured_nodes: list
:param network: pandapower network instance representing the \
investigated power system
:type network: pandapower.pandapowerNet
:returns: - **measurement_u** (numpy.ndarray): Containing the \
Alpha Beta Form of the powerfactory current measurement data
- **measurement_y** (numpy.ndarray): Containing the \
Alpha Beta Form of the powerfactory voltage measurement data
"""
# measurement u represents the input values for the prediction step
measurements_u = []
# measurement y represents the input values for the correction in
# the control step
measurements_y = []
buses = []
# sort the inserted measured buses and get their pandas data frame
# entry from the dataframe of power system buses
measured_nodes.sort()
for node in measured_nodes:
buses.append(network["bus"].iloc[node]["name"].replace(" ", ""))
# iterate threw the imported measured data set
for _, row in measurement_df.iterrows():
y_alpha = []
y_beta = []
u_alpha = []
u_beta = []
for bus in buses:
# get the bus specific VT by its power factory label
voltages_bus_entry = row[row.index.str.contains(
'{}'.format("Spannungswandler_" + bus))]
# get the voltages of the current bus in alpha beta
# coordinates
alpha, beta = clarke_transformation(measurement=voltages_bus_entry)
# add the results to a list of alpha and a list of beta
# values
y_alpha.append(alpha)
y_beta.append(beta)
# get the bus specific CT by its power factory label
current_bus_entry = row[
row.index.str.contains('{}'.format("_" + bus))
& ~row.index.str.contains('Spannungswandler')
]
# get the currents of the current bus in alpha beta
# coordinates
alpha, beta = clarke_transformation(measurement=current_bus_entry)
# add the results to a list of alpha and a list of beta
# values
u_alpha.append(alpha)
u_beta.append(beta)
# append the new timestep on the measurement y list
measurements_y.append(y_alpha + y_beta)
measurements_u.append(u_alpha + u_beta)
# convert the measurement lists into numpy array for later
# processing
measurements_u = numpy.array(measurements_u)
measurements_y = numpy.array(measurements_y)
return measurements_u, measurements_y
[docs]
def initialize_matrices(
process_covariance_volt: float,
process_covariance_curr: float,
measurement_covariance: float,
dim_x: int, dim_z: int,
network: pandapower.pandapowerNet
) -> (numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray):
"""
Method for initializing the matrices representing the system
uncertainties.
:param process_covariance_volt: Uncertainty of the voltage \
estimation of the kalman filter
:type process_covariance_volt: float
:param process_covariance_curr: Uncertainty of the current \
estimation of the kalman filter
:type process_covariance_curr: float
:param measurement_covariance: Uncertainty of the measurement \
resulting from mean free gaussian noise.
:type measurement_covariance: float
:param dim_x: dimension of the state space vector of the \
investigated system
:type dim_x: int
:param dim_z: dimension of the measurement vector z of the \
investigated system
:type dim_z: int
:param network: pandapower network instance representing the \
investigated power system
:type network: pandapower.pandapowerNet
:return: - **matrix_P** (numpy.ndarray) - kalman filter \
covariance matrix. Holding dimx x dimx zeros.
- **matrix_R** (numpy.ndarray) - kalman filter \
measurement noise covariance matrix.
- **matrix_Q** (numpy.ndarray) - kalman filter \
process noise covariance matrix.
- **vector_x** (numpy.ndarray) - kalman filter \
state space vector consisting of dimx zeros.
"""
# get the number of power system buses
buses = len(network["bus"])
# create zero matrices for process noise covariance and the central
# covariance matrix
matrix_P = numpy.zeros((dim_x, dim_x))
matrix_Q = numpy.zeros((dim_x, dim_x))
# create an identity matrix for the measurement vector covariance
# matrix since all values are of the same kind (here: voltage)
matrix_R = numpy.eye(dim_z) * float(measurement_covariance)
# process noise covariance matrix creation process
# a distinction between current and voltage entries is necessary
num_curr = ((buses * (buses - 1)) / 2)
# iterate over voltage and currents in alpha and beta form
for i in range(int(2 * (buses + num_curr))):
# alpha voltages
if i < buses:
matrix_Q[i, i] = float(process_covariance_volt)
# alpha currents
elif buses <= i < (buses + num_curr):
matrix_Q[i, i] = float(process_covariance_curr)
# beta voltages
elif (buses + num_curr) <= i < (2 * buses + num_curr):
matrix_Q[i, i] = float(process_covariance_volt)
# beta currents
else:
matrix_Q[i, i] = float(process_covariance_curr)
# create an empty state space vector x
vector_x = numpy.array([0] * dim_x)
return matrix_P, matrix_R, matrix_Q, vector_x
[docs]
def compute_results(
network: pandapower.pandapowerNet, timestep: int,
return_dict: dict, vec_x: numpy.ndarray
) -> dict:
"""
Method to compute the state space vector to easy interpretable
dictionary for easier result visualization.
:param network: pandapower network instance representing the \
investigated power system
:type network: pandapower.pandapowerNet
:param timestep: currently considered timestep
:type timestep: int
:param return_dict: dictionary holding the result data for \
later visualization
:type return_dict: dict
:param vec_x: current iteration step state space vector
:type vec_x: numpy.ndarray
:return: - **return_dict** (dict) - dictionary holding the \
result data for later visualization. Within this method \
the new entry timestep has been appended.
"""
# calculate number of voltage entries and currents per clarke
# component
num_volt = len(list(network["bus"]))
num_cur = ((num_volt * (num_volt - 1)) / 2)
# initialize counter for the correct labeling of the resulting
# state vector
send_end = 0
counter = 0
counter2 = 1
# iterate threw the state vector
for row in vec_x:
# if no entry for the current timestep exists create one
if timestep not in return_dict.keys():
return_dict.update({timestep: {}})
# extract the entry corresponding line entry from the data
# frame of lines
recv_end = counter2
# filter the line data frame on entries where from and
# to bus are in sending and receiving end
filtered_df = network["line"][
network["line"]['to_bus'].isin([send_end, recv_end])
| network["line"]['from_bus'].isin([send_end, recv_end])
]
# check if there exists a dataframe entry after the
# usage of the filter
if not filtered_df.empty:
# get the line from sending end to receiving end
line = filtered_df[(filtered_df["from_bus"] == send_end)
& (filtered_df["to_bus"] == recv_end)]
# check if this direction results in a dataframe
# entry else try the other way around
if line.empty:
line = filtered_df[
(filtered_df["to_bus"] == send_end)
& (filtered_df["from_bus"] == recv_end)
]
else:
line = None
# handling of the alpha voltages
if counter < num_volt:
return_dict[timestep].update(
{network["bus"].iloc[counter]["name"] + "_alpha": row})
# handling of the alpha currents
elif num_volt <= counter < (num_volt + num_cur):
# check if this direction results in a dataframe
# entry if yes append its value to the return dict
if line is not None and not line.empty:
return_dict[timestep].update(
{list(line["name"])[0] + "_alpha": row})
if counter2 == (num_volt - 1):
counter2 = send_end + 1
send_end += 1
counter2 += 1
# handling beta voltages
elif (num_volt + num_cur) <= counter < (2 * num_volt + num_cur):
bus_num = int(counter - (num_volt + num_cur))
return_dict[timestep].update(
{network["bus"].iloc[bus_num]["name"] + "_beta": row})
counter2 = 1
# resetting send_end for the upcoming beta currents
send_end = 0
# handling beta currents
else:
if line is not None and not line.empty:
return_dict[timestep].update(
{list(line["name"])[0] + "_beta": row})
if counter2 == (num_volt - 1):
counter2 = send_end + 1
send_end += 1
counter2 += 1
counter += 1
return return_dict
[docs]
def run(
system_matrices: dict, measurements: dict,
kalman_matrices: dict, network: pandapower.pandapowerNet
) -> dict:
"""
Method to start the kalman filter simulation. Therefore all
matrices previously calculated are stored in the kalman filter
equivalent variables and a iteration over each time step is
rolled out.
:param system_matrices: dict holding the kalman filter \
matrices (A, B and H)
:type system_matrices: dict
:param measurements: dict holding the measurement vectors (u, y)
:type measurements: dict
:param kalman_matrices: dict holding the covariance matrices \
(P, R and Q) as well as the state space vector (x)
:type kalman_matrices: dict
:param network: pandapower network instance representing the \
considered power system
:type network: pandapower.pandapowerNet
:return: - **return_dict** (dict) - dictionary holding \
timestep wise extracted kalman filter results
"""
# initalize the KalmanFiler class of the python package filterpy
kf = KalmanFilter(
dim_x=system_matrices["matrix_A"].shape[0],
dim_z=measurements["measurement_y"][0].shape[0],
dim_u=measurements["measurement_u"][0].shape[0]
)
# place the input parameters in their corresponding kalman filter
# variable
kf.F = system_matrices["matrix_A"]
kf.B = system_matrices["matrix_B"]
kf.H = system_matrices["matrix_H"]
kf.x = kalman_matrices["vector_x"]
kf.P = kalman_matrices["matrix_P"]
kf.R = kalman_matrices["matrix_R"]
kf.Q = kalman_matrices["matrix_Q"]
# create empty return dict as well as a timestep counter
timestep = 0
return_dict = {}
# iterate threw the simulated time steps
for vec_u, vec_y in zip(measurements["measurement_u"],
measurements["measurement_y"]):
# run kalman filter prediction step
kf.update(z=vec_y)
# compute the state space vector
return_dict = compute_results(
network=network,
timestep=timestep,
return_dict=return_dict,
vec_x=kf.x
)
# run kalman filter prediction
kf.predict(u=vec_u)
timestep += 1
return return_dict
[docs]
def calc_wse(
results: dict, measured_nodes: list, measurement_y: numpy.ndarray,
network: pandapower.pandapowerNet, weighting_factor: float
) -> dict:
"""
In the Kalman filter protection algorithm, the protection
decision is based on a hypothesis test. For this purpose, the
difference between the variables measured in the system and the
state space vector must be converted to a parameter. This is
possible with the help of the weighted squared error (wse),
which can then be used to test the hypothesis with a chi2
distribution. In this method, the weighted squared errors (wse)
are calculated.
:param results: holding the dictionary of human readable data \
the state space vector from the kalman filter simulation
:type results: dict
:param measured_nodes: list of the buses where the voltage is \
measured by a voltage transformer
:type measured_nodes: list
:param measurement_y: measurement vectors of the real signals \
resulting from the powerfactory simulation
:type measurement_y: numpy.ndarray
:param network: pandapower network instance representing the \
investigated power system
:type network: pandapower.pandapowerNet
:param weighting_factor: weight factor for the calculation of \
the weighted squared error
:type weighting_factor: float
:return: - **return_dict** (dict) - dictionary holding the \
weighted squared error foreach timestep simulated
"""
# create an empty return dictionary for data collection
return_dict = {}
# sort the measured nodes indices ascending
measured_nodes.sort()
# collect the measured nodes bus names
buses = []
for i in measured_nodes:
buses.append(network["bus"]["name"][i])
# create the weighting matrix
matrix_W = numpy.eye(2*len(measured_nodes)) * float(weighting_factor)
# iterate over the result dictionary
for timestep in results.keys():
diffs = []
# iterate over alpha and beta results of the considered buses
for index in range(2 * len(measured_nodes)):
# for the first len nodes the resulting entries are alpha
# voltage entries
if index < len(measured_nodes):
# calculate the difference of the state space result
# and the powerfactory simulation
row = (results[timestep]["{}_alpha".format(
buses[index])]
- measurement_y[timestep][index])
# for the second len nodes the resulting entries are beta
# voltage entries
else:
# calculate the difference of the state space result
# and the powerfactory simulation
row = (results[timestep]["{}_beta".format(
buses[index - len(measured_nodes)])]
- measurement_y[timestep][index]
)
# append the calculated values to the list of differences
diffs.append(row)
# create a vector of the list of differences
vec_epsilon = numpy.array(diffs)
# calculate the weighted squared error
wse = numpy.dot(
numpy.dot(vec_epsilon, matrix_W),
vec_epsilon.T
)
# append the weighted squared error to the return dict for
# later visualization
return_dict.update({timestep: [wse]})
return return_dict
[docs]
def calc_chi2(wse: dict, df: int) -> dict:
"""
Within this method the evaluation of the hypothesis is carried
out by evaluating the weighted squared error on the basis of
a chi-squared distribution (chi2). The resulting values are
return in a dictionary for later visualization.
:param wse: dictionary holding the weighted squared errors \
foreach timestep calculated before.
:type wse: dict
:param df: number of the degrees of freedom (df). In the \
context of chi2 distributions the df is the number of not \
tracked state space vector entries vary from 0.
:type df: int
:return: - **return_dict** (dict) - dictionary holding the \
chi2 result foreach timestep
"""
# create an empty dictionary for data collection
result_dict = {}
# iterrate threw all entries of the weighted squared error dict
for timestep in wse:
# evaluate the value of chi2 for the given wse and substract
# it from one to get the confidential in the hypothesis no error
chi2 = 1 - stats.chi2.cdf(wse[timestep], df=df)
# append the result to the return dict for later visualization
result_dict.update({timestep: chi2})
return result_dict