Source code for pyinduct.simulation

"""
Simulation infrastructure with helpers and data structures for preprocessing of the given equations
and functions for postprocessing of simulation data.
"""

import warnings
from abc import ABCMeta, abstractmethod
from collections import OrderedDict, Callable
from copy import copy
from itertools import chain

import numpy as np
from scipy.integrate import ode
from scipy.interpolate import interp1d
from scipy.linalg import block_diag

from .core import (Domain, Parameters, Function,
                   domain_intersection, integrate_function,
                   calculate_scalar_product_matrix,
                   vectorize_scalar_product, sanitize_input,
                   StackedBase, get_weight_transformation,
                   get_transformation_info,
                   EvalData, project_on_bases)
from .placeholder import (Scalars, TestFunction, Input, FieldVariable,
                          EquationTerm, get_common_target, get_common_form,
                          ObserverGain, ScalarTerm, IntegralTerm,
                          ScalarProductTerm)
from .registry import get_base, register_base

__all__ = ["SimulationInput", "SimulationInputSum", "WeakFormulation",
           "parse_weak_formulation",
           "create_state_space", "StateSpace", "simulate_state_space",
           "simulate_system", "simulate_systems",
           "get_sim_result", "evaluate_approximation",
           "parse_weak_formulations",
           "get_sim_results", "set_dominant_labels", "CanonicalEquation",
           "CanonicalForm", "SimulationInputVector"]


[docs]class SimulationInput(object, metaclass=ABCMeta): """ Base class for all objects that want to act as an input for the time-step simulation. The calculated values for each time-step are stored in internal memory and can be accessed by :py:meth:`.get_results` (after the simulation is finished). Note: Due to the underlying solver, this handle may get called with time arguments, that lie outside of the specified integration domain. This should not be a problem for a feedback controller but might cause problems for a feedforward or trajectory implementation. """ def __init__(self, name=""): self._time_storage = [] self._value_storage = {} self.name = name self._res = np.array([0]) def __call__(self, **kwargs): """ handle that is used by the simulator to retrieve input. """ out = self._calc_output(**kwargs) self._time_storage.append(kwargs["time"]) for key, value in out.items(): entries = self._value_storage.get(key, []) entries.append(copy(value)) self._value_storage[key] = entries return np.atleast_1d(out["output"]) @abstractmethod def _calc_output(self, **kwargs): """ Handle that has to be implemented for output calculation. Keyword Args: time: The current simulation time. weights: The current weight vector. weight_lbl: The label of the weights used. Returns: dict: Dictionary with mandatory key ``output``. """ return dict(output=self._res)
[docs] def get_results(self, time_steps, result_key="output", interpolation="nearest", as_eval_data=False): """ Return results from internal storage for given time steps. Raises: Error: If calling this method before a simulation was run. Args: time_steps: Time points where values are demanded. result_key: Type of values to be returned. interpolation: Interpolation method to use if demanded time-steps are not covered by the storage, see :func:`scipy.interpolate.interp1d` for all possibilities. as_eval_data (bool): Return results as :py:class:`.EvalData` object for straightforward display. Return: Corresponding function values to the given time steps. """ t_data = np.array(self._time_storage) res_data = np.array(self._value_storage[result_key]) invalid_idxs = np.logical_not(np.isnan(res_data)) mask = [np.all(a) for a in invalid_idxs] func = interp1d(t_data[mask], res_data[mask], kind=interpolation, assume_sorted=False, bounds_error=False, fill_value=(res_data[mask][0], res_data[mask][-1]), axis=0) values = func(time_steps) if as_eval_data: return EvalData([time_steps], values, name=".".join([self.name, result_key]), fill_axes=True) return values
[docs] def clear_cache(self): """ Clear the internal value storage. When the same *SimulationInput* is used to perform various simulations, there is no possibility to distinguish between the different runs when :py:meth:`.get_results` gets called. Therefore this method can be used to clear the cache. """ self._time_storage.clear() self._value_storage.clear()
[docs]class EmptyInput(SimulationInput): def __init__(self, dim): SimulationInput.__init__(self) self.dim = dim def _calc_output(self, **kwargs): return dict(output=np.zeros((len(np.atleast_1d(kwargs['time'])), self.dim)))
[docs]class SimulationInputSum(SimulationInput): """ Helper that represents a signal mixer. """ def __init__(self, inputs): SimulationInput.__init__(self) self.inputs = inputs def _calc_output(self, **kwargs): outs = [handle(**kwargs) for handle in self.inputs] return dict(output=np.sum(outs, axis=0))
[docs]class WeakFormulation(object): r""" This class represents the weak formulation of a spatial problem. It can be initialized with several terms (see children of :py:class:`.EquationTerm`). The equation is interpreted as .. math:: term_0 + term_1 + ... + term_N = 0. Args: terms (list): List of object(s) of type EquationTerm. name (string): Name of this weak form. dominant_lbl (string): Name of the variable that dominates this weak form. """ def __init__(self, terms, name, dominant_lbl=None): self.terms = sanitize_input(terms, EquationTerm) self.name = name self.dominant_lbl = dominant_lbl
[docs]class StateSpace(object): r""" Wrapper class that represents the state space form of a dynamic system where .. math:: \boldsymbol{\dot{x}}(t) &= \sum\limits_{k=0}^{L}\boldsymbol{A}_{k} \boldsymbol{x}^{p_k}(t) + \sum\limits_{j=0}^{V} \sum\limits_{k=0}^{L}\boldsymbol{B}_{j, k} \frac{\mathrm{d}^j u^{p_k}}{\mathrm{d}t^j}(t) + \boldsymbol{L}\tilde{\boldsymbol{y}}(t)\\ \boldsymbol{y}(t) &= \boldsymbol{C}\boldsymbol{x}(t) + \boldsymbol{D}u(t) which has been approximated by projection on a base given by weight_label. Args: a_matrices (dict): State transition matrices :math:`\boldsymbol{A}_{p_k}` for the corresponding powers of :math:`\boldsymbol{x}`. b_matrices (dict): Cascaded dictionary for the input matrices :math:`\boldsymbol{B}_{j, k}` in the sequence: temporal derivative order, exponent. input_handle (:py:class:`.SimulationInput`): System input :math:`u(t)`. c_matrix: :math:`\boldsymbol{C}` d_matrix: :math:`\boldsymbol{D}` """ def __init__(self, a_matrices, b_matrices, base_lbl=None, input_handle=None, c_matrix=None, d_matrix=None, obs_fb_handle=None): self.C = c_matrix self.D = d_matrix self.base_lbl = base_lbl self.observer_fb = obs_fb_handle # mandatory if isinstance(a_matrices, np.ndarray): self.A = {1: a_matrices} else: self.A = a_matrices if 0 not in self.A: # this is the constant term (power 0) aka the f-vector self.A[0] = np.zeros((self.A[1].shape[0],)) # optional if isinstance(b_matrices, np.ndarray): # fake import order and power for backward compatibility self.B = {0: {1: b_matrices}} else: self.B = b_matrices # TODO calculate available order available_power = 1 if self.B is None: self.B = {0: {available_power: np.zeros((self.A[available_power].shape[0], available_power))}} if self.C is None: self.C = np.zeros((available_power, self.A[available_power].shape[1])) if self.D is None: self.D = np.zeros((self.C.shape[0], np.atleast_2d(self.B[0][available_power]).T.shape[1])) if input_handle is None: self.input = EmptyInput(self.B[0][available_power].shape[1]) elif isinstance(input_handle, SimulationInput): self.input = input_handle else: raise NotImplementedError # TODO export cython code?
[docs] def rhs(self, _t, _q): r""" Callback for the integration of the dynamic system, described by this object. Args: _t (float): timestamp _q (array): weight vector Returns: (array): :math:`\boldsymbol{\dot{x}}(t)` """ state_part = self.A[0] for power, a_mat in self.A.items(): state_part = state_part + a_mat @ np.power(_q, power) input_part = np.zeros_like(state_part) inputs = np.atleast_2d( self.input(time=_t, weights=_q, weight_lbl=self.base_lbl)) for der_order, power_dict in self.B.items(): for power, b_mat in power_dict.items(): for idx, col in enumerate(b_mat.T): input_part = input_part + col * inputs[idx][der_order] q_t = state_part + input_part if self.observer_fb is not None: q_t = q_t + self.observer_fb( time=_t, weights=_q, weight_lbl=self.base_lbl) return q_t
[docs]def simulate_system(weak_form, initial_states, temporal_domain, spatial_domain, derivative_orders=(0, 0), settings=None): r""" Convenience wrapper for :py:func:`.simulate_systems`. Args: weak_form (:py:class:`.WeakFormulation`): Weak formulation of the system to simulate. initial_states (numpy.ndarray): Array of core.Functions for :math:`x(t=0, z), \dot{x}(t=0, z), \dotsc, x^{(n)}(t=0, z)`. temporal_domain (:py:class:`.Domain`): Domain object holding information for time evaluation. spatial_domain (:py:class:`.Domain`): Domain object holding information for spatial evaluation. derivative_orders (tuple): tuples of derivative orders (time, spat) that shall be evaluated additionally as values settings: Integrator settings, see :py:func:`.simulate_state_space`. """ ics = sanitize_input(initial_states, Function) initial_states = {weak_form.name: ics} spatial_domains = {weak_form.name: spatial_domain} derivative_orders = {weak_form.name: derivative_orders} res = simulate_systems([weak_form], initial_states, temporal_domain, spatial_domains, derivative_orders, settings) return res
[docs]def simulate_systems(weak_forms, initial_states, temporal_domain, spatial_domains, derivative_orders=None, settings=None, out=list()): """ Convenience wrapper that encapsulates the whole simulation process. Args: weak_forms ((list of) :py:class:`.WeakFormulation`): (list of) Weak formulation(s) of the system(s) to simulate. initial_states (dict, numpy.ndarray): Array of core.Functions for :math:`x(t=0, z), \dot{x}(t=0, z), \dotsc, x^{(n)}(t=0, z)`. temporal_domain (:py:class:`.Domain`): Domain object holding information for time evaluation. spatial_domains (dict): Dict with :py:class:`.Domain` objects holding information for spatial evaluation. derivative_orders (dict): Dict, containing tuples of derivative orders (time, spat) that shall be evaluated additionally as values settings: Integrator settings, see :py:func:`.simulate_state_space`. out (list): List from user namespace, where the following intermediate results will be appended: - canonical equations (list of types: :py:class:`.CanocialEquation`) - state space object (type: :py:class:`.StateSpace`) - initial weights (type: :py:class:`numpy.array`) - simulation results/weights (type: :py:class:`numpy.array`) Note: The *name* attributes of the given weak forms must be unique! Return: list: List of :py:class:`.EvalData` objects, holding the results for the FieldVariable and demanded derivatives. """ if derivative_orders is None: derivative_orders = dict([(lbl, (0, 0))for lbl in spatial_domains]) weak_forms = sanitize_input(weak_forms, WeakFormulation) print("simulate systems: {}".format([f.name for f in weak_forms])) print(">>> parse weak formulations") canonical_equations = parse_weak_formulations(weak_forms) out.append(canonical_equations) print(">>> create state space system") state_space_form = create_state_space(canonical_equations) out.append(state_space_form) print(">>> derive initial conditions") q0 = project_on_bases(initial_states, canonical_equations) out.append(q0) print(">>> perform time step integration") sim_domain, q = simulate_state_space(state_space_form, q0, temporal_domain, settings=settings) out.append(q) print(">>> perform postprocessing") results = get_sim_results(sim_domain, spatial_domains, q, state_space_form, derivative_orders=derivative_orders) print(">>> finished simulation") return results
[docs]def get_sim_result(weight_lbl, q, temp_domain, spat_domain, temp_order, spat_order, name=""): """ Create handles and evaluate at given points. Args: weight_lbl (str): Label of Basis for reconstruction. temp_order: Order or temporal derivatives to evaluate additionally. spat_order: Order or spatial derivatives to evaluate additionally. q: weights spat_domain (:py:class:`.Domain`): Domain object providing values for spatial evaluation. temp_domain (:py:class:`.Domain`): Time steps on which rows of q are given. name (str): Name of the WeakForm, used to generate the data set. """ data = [] # temporal ini_funcs = get_base(weight_lbl).fractions for der_idx in range(temp_order + 1): name = "{0}{1}".format(name, "_" + "".join(["d" for x in range(der_idx)] + ["t"]) if der_idx > 0 else "") data.append(evaluate_approximation(weight_lbl, q[:, der_idx * ini_funcs.size:(der_idx + 1) * ini_funcs.size], temp_domain, spat_domain, name=name)) # spatial (0th derivative is skipped since this is already handled above) for der_idx in range(1, spat_order + 1): name = "{0}{1}".format(name, "_" + "".join(["d" for x in range(der_idx)] + ["z"]) if der_idx > 0 else "") data.append( evaluate_approximation(weight_lbl, q[:, :ini_funcs.size], temp_domain, spat_domain, der_idx, name=name)) return data
[docs]def get_sim_results(temp_domain, spat_domains, weights, state_space, names=None, derivative_orders=None): """ Convenience wrapper for :py:func:`.get_sim_result`. Args: temp_domain (:py:class:`.Domain`): Time domain spat_domains (dict): Spatial domain from all subsystems which belongs to *state_space* as values and name of the systems as keys. weights (numpy.array): Weights gained through simulation. For example with :py:func:`.simulate_state_space`. state_space (:py:class:`.StateSpace`): Simulated state space instance. names: List of names of the desired systems. If not given all available subssystems will be processed. derivative_orders (dict): Desired derivative orders. Returns: List of :py:class:`.EvalData` objects. """ ss_base = get_base(state_space.base_lbl) if names is None: if isinstance(ss_base, StackedBase): labels = ss_base.base_lbls names = ss_base.system_names else: names = list(spat_domains) labels = [state_space.base_lbl] else: if isinstance(ss_base, StackedBase): labels = [ss_base.base_lbls[ss_base.system_names.index(name)] for name in names] else: labels = [state_space.base_lbl] if derivative_orders is None: derivative_orders = dict([(name, (0, 0)) for name in names]) results = [] for nm, lbl in zip(names, labels): # if derivative_orders[n] is None derivatives of the # corresponding variables are not provided if derivative_orders[nm][0] is None: derivative_orders[nm][0] = 0 if derivative_orders[nm][1] is None: derivative_orders[nm][1] = 0 # acquire a transformation into the original weights src_order = int(weights.shape[1] / ss_base.fractions.size) - 1 info = get_transformation_info(state_space.base_lbl, lbl, src_order, derivative_orders[nm][0]) transformation = get_weight_transformation(info) # project back data = get_sim_result(info.dst_lbl, np.apply_along_axis(transformation, 1, weights), temp_domain, spat_domains[nm], info.dst_order, derivative_orders[nm][1], name=nm) results += data return results
[docs]class CanonicalForm(object): """ The canonical form of an nth order ordinary differential equation system. """ def __init__(self, name=None): self.name = name self.matrices = {} # self._max_idx = dict(E=0, f=0, G=0) self._weights = None self._input_function = None self._observer_feedback = list() self._finalized = False self.powers = None self.max_power = None self.max_temp_order = None self.dim_u = 0 self.dim_x = None self.dim_xb = None self.e_n_pb = None self.e_n_pb_inv = None self.singular = True # @staticmethod # def _build_name(term): # return "_" + term[0] + str(term[1]) # def __add__(self, other): # for name, names in other._matrices.items(): # for der, derivatives in names.items(): # for p, pow in derivatives.items(): # self._matrices[name][der][p] += pow @property def input_function(self): return self._input_function
[docs] def set_input_function(self, func): if not isinstance(func, SimulationInput): raise TypeError("Inputs must be of type `SimulationInput`.") if self._input_function is None: self._input_function = func elif self._input_function is not func: raise ValueError("already defined input is overridden!")
# @property # def weights(self): # return self._weights # # @weights.setter # def weights(self, weight_lbl): # if not isinstance(weight_lbl, str): # raise TypeError("only string allowed as weight label!") # if self._weights is None: # self._weights = weight_lbl # if self._weights != weight_lbl: # raise ValueError("already defined target weights are overridden!")
[docs] def add_to(self, term, value, column=None): """ Adds the value :py:obj:`value` to term :py:obj:`term`. :py:obj:`term` is a dict that describes which coefficient matrix of the canonical form the value shall be added to. Args: term (dict): Targeted term in the canonical form h. It has to contain: - name: Type of the coefficient matrix: 'E', 'f', or 'G'. - order: Temporal derivative order of the assigned weights. - exponent: Exponent of the assigned weights. value (:py:obj:`numpy.ndarray`): Value to add. column (int): Add the value only to one column of term (useful if only one dimension of term is known). """ if self._finalized: raise RuntimeError("Object has already been finalized, you are trying some nasty stuff there.") if term["name"] == "L": self._observer_feedback.append(value) return if not isinstance(value, np.ndarray): raise TypeError("val must be numpy.ndarray") if column and not isinstance(column, int): raise TypeError("column index must be int") # get entry if term["name"] == "f": if ("order" in term) \ or ("exponent" in term and term["exponent"] != 0): warnings.warn("order and exponent are ignored for f_vector!") f_vector = self.matrices.get("f", np.zeros_like(value)) self.matrices["f"] = value + f_vector return type_group = self.matrices.get(term["name"], {}) derivative_group = type_group.get(term["order"], {}) target_matrix = derivative_group.get(term["exponent"], np.zeros_like(value)) if target_matrix.shape != value.shape and column is None: msg = "{0}{1}{2} was already initialized with dimensions {3} but " \ "value to add has dimension {4}".format(term["name"], term["order"], term["exponent"], target_matrix.shape, value.shape) raise ValueError(msg) if column is not None: # check whether the dimensions fit or if the matrix must be extended if column >= target_matrix.shape[1]: new_target_matrix = np.zeros((target_matrix.shape[0], column + 1)) new_target_matrix[ :target_matrix.shape[0], :target_matrix.shape[1] ] = target_matrix target_matrix = new_target_matrix target_matrix[:, column:column + 1] += value else: target_matrix += value # store changes derivative_group[term["exponent"]] = target_matrix type_group[term["order"]] = derivative_group self.matrices[term["name"]] = type_group
[docs] def finalize(self): """ Finalizes the object. This method must be called after all terms have been added by :py:meth:`.add_to` and before :py:meth:`.convert_to_state_space` can be called. This functions makes sure that the formulation can be converted into state space form (highest time derivative only comes in one power) and collects information like highest derivative order, it's power and the sizes of current and state-space state vector (`dim_x` resp. `dim_xb`). Furthermore, the coefficient matrix of the highest derivative order `e_n_pb` and it's inverse are made accessible. """ if self._finalized: return # get highest power self.powers = set(chain.from_iterable([list(mat) for mat in self.matrices["E"].values()])) self.max_power = max(self.powers) # check whether the system can be formulated in an explicit form self.max_temp_order = max(self.matrices["E"]) if len(self.matrices["E"][self.max_temp_order]) > 1: # more than one power of the highest derivative -> implicit formulation raise NotImplementedError pb = next(iter(self.matrices["E"][self.max_temp_order])) if pb != 1: # TODO raise the resulting last blocks to 1/pb raise NotImplementedError self.e_n_pb = self.matrices["E"][self.max_temp_order][pb] self.dim_x = self.e_n_pb.shape[0] # length of the weight vector rank_e_n_pb = np.linalg.matrix_rank(self.e_n_pb) if rank_e_n_pb != max(self.e_n_pb.shape) or self.e_n_pb.shape[0] != self.e_n_pb.shape[1]: # this form cannot be used as dominant form self.singular = True else: self.singular = False self.e_n_pb_inv = np.linalg.inv(self.e_n_pb) self.dim_xb = self.max_temp_order * self.dim_x # dimension of the new system # input for derivatives in self.matrices.get("G", {}).values(): for power in derivatives.values(): self.dim_u = max(self.dim_u, power.shape[1])
[docs] def get_terms(self): """ Return all coefficient matrices of the canonical formulation. Return: Cascade of dictionaries: Structure: Type > Order > Exponent. """ return self.matrices
[docs] def convert_to_state_space(self): """ Convert the canonical ode system of order n a into an ode system of order 1. Note: This will only work if the highest derivative order of the given form can be isolated. This is the case if the highest order is only present in one power and the equation system can therefore be solved for it. Return: :py:class:`.StateSpace` object: """ if not self._finalized: self.finalize() # system matrices A_* a_matrices = {} for p in self.powers: a_mat = np.zeros((self.dim_xb, self.dim_xb)) # add integrator chain a_mat[:-self.dim_x:, self.dim_x:] = block_diag( *[np.eye(self.dim_x) for a in range(self.max_temp_order - 1)]) # add "block-line" with feedback entries a_mat[-self.dim_x:, :] = -self._build_feedback("E", p, self.e_n_pb_inv) a_matrices.update({p: a_mat}) # input matrices B_* if "G" in self.matrices: max_temp_input_order = max(iter(self.matrices["G"])) input_powers = set(chain.from_iterable( [list(mat) for mat in self.matrices["G"].values()]) ) dim_u = next(iter( self.matrices["G"][max_temp_input_order].values())).shape[1] # generate nested dict of B_o_p matrices where o is # derivative order and p is power b_matrices = {} for order in range(max_temp_input_order + 1): if order in self.matrices["G"]: b_powers = {} for q in input_powers: b_mat = np.zeros((self.dim_xb, dim_u)) # overwrite the last "block-line" in the matrices # with input entries b_mat[-self.dim_x:, :] = \ - self.e_n_pb_inv @ self.matrices["G"][order][q] b_powers.update({q: b_mat}) b_matrices.update({order: b_powers}) else: b_matrices = None # the f vector aka the A matrix corresponding to the power zero f_mat = np.zeros((self.dim_xb,)) if "f" in self.matrices: f_mat[-self.dim_x:] = self.matrices["f"] a_matrices.update({0: f_mat}) ss = StateSpace(a_matrices, b_matrices, input_handle=self.input_function) return ss
def _build_feedback(self, entry, power, product_mat): max_order = max(sorted(self.matrices[entry])) entry_shape = next(iter(self.matrices[entry][max_order].values())).shape if entry == "G": # include highest order for system input max_order += 1 blocks = [np.dot(product_mat, self.matrices[entry].get(order, {}).get(power, np.zeros(entry_shape))) for order in range(max_order)] return np.hstack(blocks)
[docs]class CanonicalEquation(object): """ Wrapper object, holding several entities of canonical forms for different weight-sets that form an equation when summed up. After instantiation, this object can be filled with information by passing the corresponding coefficients to :py:meth:`.add_to`. When the parsing process is completed and all coefficients have been collected, calling :py:meth:`.finalize` is required to compute all necessary information for further processing. When finalized, this object provides access to the dominant form of this equation. Args: name (str): Unique identifier of this equation. dominant_lbl (str): Label of the variable that dominates this equation. """ def __init__(self, name, dominant_lbl=None): self.name = name self.dominant_lbl = dominant_lbl self.dynamic_forms = {} self._static_form = CanonicalForm(self.name + "_static") self._finalized = False self._finalized_dynamic_forms = False
[docs] def add_to(self, weight_label, term, val, column=None): """ Add the provided *val* to the canonical form for *weight_label*, see :py:meth:`.CanonicalForm.add_to` for further information. Args: weight_label (str): Basis to add onto. term: Coefficient to add onto, see :py:func:`~CanonicalForm.add_to`. val: Values to add. column (int): passed to :py:func:`~CanonicalForm.add_to`. """ if self._finalized: raise RuntimeError("Object has already been finalized, you are trying some nasty stuff there.") if term["name"] in "fGL": # hold f and g vector separately self._static_form.add_to(term, val, column) return if weight_label is None: raise ValueError("weight_label can only be none if target is f or G.") if weight_label not in list(self.dynamic_forms.keys()): self.dynamic_forms[weight_label] = CanonicalForm("_".join([self.name + weight_label])) self.dynamic_forms[weight_label].add_to(term, val)
[docs] def finalize(self): """ Finalize the Object. After the complete formulation has been parsed and all terms have been sorted into this Object via :py:meth:`.add_to` this function has to be called to inform this object about it. Furthermore, the f and G parts of the static_form will be copied to the dominant form for easier state-space transformation. Note: This function must be called to use the :py:attr:`dominant_form` attribute. """ if self.dominant_lbl is None: raise ValueError("You have to set the dominant labels of the\n" "canonical equation (weak form), for example\n" "with pyinduct.simulation.set_dominant_labels().") if not self._finalized_dynamic_forms: self.finalize_dynamic_forms() if self.dynamic_forms[self.dominant_lbl].singular: raise ValueError("The form that has to be chosen is singular.") # copy static terms to dominant form to transform them correctly for letter in "fG": if letter in self._static_form.matrices: self.dynamic_forms[self.dominant_lbl].matrices.update({letter: self._static_form.matrices[letter]}) self._finalized = True
[docs] def finalize_dynamic_forms(self): """ Finalize all dynamic forms. See method :py:meth:`.CanonicalForm.finalize`. """ for lbl, form in self.dynamic_forms.items(): form.finalize() self._finalized_dynamic_forms = True
@property def static_form(self): """ :py:class:`.WeakForm` that does not depend on any weights. :return: """ return self._static_form @property def dominant_form(self): """ direct access to the dominant :py:class:`.CanonicalForm`. Note: :py:meth:`.finalize` must be called first. Returns: :py:class:`.CanonicalForm`: the dominant canonical form """ if self.dominant_lbl is None: raise RuntimeError("Dominant label is not defined! Use for\n" "expample pyinduct.simulation." "set_dominant_label or set it manually.") return self.dynamic_forms[self.dominant_lbl]
[docs] def get_static_terms(self): """ Return: Terms that do not depend on a certain weight set. """ return self._static_form.get_terms()
[docs] def get_dynamic_terms(self): """ Return: dict: Dictionary of terms for each weight set. """ return {label: val.get_terms() for label, val in self.dynamic_forms.items()}
@property def input_function(self): """ The input handles for the equation. """ return self._static_form.input_function
[docs] def set_input_function(self, func): self._static_form.set_input_function(func)
[docs]def create_state_space(canonical_equations): """ Create a state-space system constituted by several :py:class:`.CanonicalEquations` (created by :py:func:`.parse_weak_formulation`) Args: canonical_equations: List of :py:class:`.CanonicalEquation`'s. Raises: ValueError: If compatibility criteria cannot be fulfilled Return: :py:class:`.StateSpace`: State-space representation of the approximated system """ set_dominant_labels(canonical_equations) if isinstance(canonical_equations, CanonicalEquation): # backward compatibility canonical_equations = [canonical_equations] # check whether the formulations are compatible for eq in canonical_equations: for lbl, form in eq.dynamic_forms.items(): coupling_order = form.max_temp_order # search corresponding dominant form in other equations for _eq in canonical_equations: # check uniqueness of name - dom_lbl mappings if eq.name != _eq.name and eq.dominant_lbl == _eq.dominant_lbl: raise ValueError("A dominant form has to be unique over all given Equations") # identify coupling terms if lbl == eq.dominant_lbl: break # identify corresponding dominant form if _eq.dominant_lbl != lbl: continue dominant_order = _eq.dominant_form.max_temp_order if dominant_order <= coupling_order: # dominant order has to be at least one higher than # the coupling order raise ValueError("Formulations are not compatible") # transform dominant forms into state-space representation # and collect information dominant_state_spaces = {} state_space_props = Parameters(size=0, parts=OrderedDict(), powers=set(), input_powers=set(), dim_u=0, input=None) for eq in canonical_equations: dom_lbl = eq.dominant_lbl dom_form = eq.dominant_form dom_ss = dom_form.convert_to_state_space() dominant_state_spaces.update({dom_lbl: dom_ss}) # collect some information state_space_props.parts[dom_lbl] = dict(start=copy(state_space_props.size), orig_size=dom_form.dim_x, size=dom_form.dim_xb, order=dom_form.max_temp_order - 1, sys_name=eq.name) state_space_props.powers.update(dom_form.powers) state_space_props.size += dom_form.dim_xb state_space_props.dim_u = max(state_space_props.dim_u, dom_form.dim_u) # update input handles if state_space_props.input is None: state_space_props.input = eq.input_function elif eq.input_function is not None: if not state_space_props.input is eq.input_function: raise ValueError("Only one input object allowed.") # build new basis by concatenating the dominant bases of every equation if len(canonical_equations) == 1: new_name = next(iter(canonical_equations)).dominant_lbl else: base_info = copy(state_space_props.parts) base_lbls = state_space_props.parts.keys() for lbl in base_lbls: base_info[lbl].update({"base": get_base(lbl)}) new_base = StackedBase(base_info) new_name = "_".join(base_lbls) register_base(new_name, new_base) # build new state transition matrices A_p_k for corresponding powers p_k of the state vector a_matrices = {} for p in state_space_props.powers: a_mat = np.zeros((state_space_props.size, state_space_props.size)) for row_eq in canonical_equations: row_dom_lbl = row_eq.dominant_lbl row_dom_dim = state_space_props.parts[row_dom_lbl]["size"] row_dom_trans_mat = row_eq.dominant_form.e_n_pb_inv row_dom_sys_mat = dominant_state_spaces[row_dom_lbl].A.get(p, None) row_idx = state_space_props.parts[row_dom_lbl]["start"] for col_eq in canonical_equations: col_dom_lbl = col_eq.dominant_lbl # main diagonal if col_eq.name == row_eq.name: if row_dom_sys_mat is not None: a_mat[row_idx:row_idx + row_dom_dim, row_idx:row_idx + row_dom_dim] = row_dom_sys_mat continue # coupling terms if col_dom_lbl in row_eq.dynamic_forms: for order, mats in row_eq.dynamic_forms[col_dom_lbl].matrices["E"].items(): orig_mat = mats.get(p, None) if orig_mat is not None: # transform matrix with row-transformation matrix and add to last "row" # since it's not the dominant entry, revert sign change cop_mat = row_dom_trans_mat @ -orig_mat v_idx = row_idx + row_dom_dim - state_space_props.parts[row_dom_lbl]["orig_size"] col_idx = state_space_props.parts[col_dom_lbl]["start"] h_idx = col_idx + order * state_space_props.parts[col_dom_lbl]["orig_size"] a_mat[v_idx: v_idx + cop_mat.shape[0], h_idx: h_idx + cop_mat.shape[1]] = cop_mat a_matrices.update({p: a_mat}) # build new state input matrices b_matrices = {} for name, dom_ss in dominant_state_spaces.items(): for order, order_mats in dom_ss.B.items(): b_order_mats = b_matrices.get(order, {}) for p, power_mat in order_mats.items(): b_power_mat = b_order_mats.get(p, np.zeros((state_space_props.size, state_space_props.dim_u))) # add entry to the last "row" r_idx = state_space_props.parts[name]["start"] # - state_space_props.parts[name]["orig_size"] b_power_mat[r_idx: r_idx + power_mat.shape[0], :power_mat.shape[1]] = power_mat b_order_mats.update({p: b_power_mat}) b_matrices.update({order: b_order_mats}) # build observer feedback handle def observer_feedback(**kwargs): res = np.zeros(state_space_props.size) for ce in canonical_equations: for fb in ce._static_form._observer_feedback: idx_a = (state_space_props.parts[ce.dominant_lbl]["start"] + state_space_props.parts[ce.dominant_lbl]["orig_size"] * state_space_props.parts[ce.dominant_lbl]["order"]) idx_b = (idx_a + state_space_props.parts[ce.dominant_lbl]["orig_size"]) kwargs.update(obs_weight_lbl=ce.dominant_lbl) res[idx_a: idx_b] += ce.dominant_form.e_n_pb_inv @ np.squeeze( fb._calc_output(**kwargs)["output"], 1) kwargs.pop("obs_weight_lbl") return res dom_ss = StateSpace(a_matrices, b_matrices, base_lbl=new_name, input_handle=state_space_props.input, obs_fb_handle=observer_feedback) return dom_ss
[docs]def parse_weak_formulation(weak_form, finalize=False, is_observer=False): r""" Parses a :py:class:`.WeakFormulation` that has been derived by projecting a partial differential equation an a set of test-functions. Within this process, the separating approximation :math:`x^n(z, t) = \sum_{i=1}^n c_i^n(t) \varphi_i^n(z)` is plugged into the equation and the separated spatial terms are evaluated, leading to a ordinary equation system for the weights :math:`c_i^n(t)`. Args: weak_form: Weak formulation of the pde. finalize (bool): Default: False. If you have already defined the dominant labels of the weak formulations you can set this to True. See :py:meth:`.CanonicalEquation.finalize` Return: :py:class:`.CanonicalEquation`: The spatially approximated equation in a canonical form. """ if not isinstance(weak_form, WeakFormulation): raise TypeError("Only able to parse WeakFormulation") ce = CanonicalEquation(weak_form.name, weak_form.dominant_lbl) # handle each term for term in weak_form.terms: # extract Placeholders placeholders = dict( scalars=term.arg.get_arg_by_class(Scalars), functions=term.arg.get_arg_by_class(TestFunction), field_variables=term.arg.get_arg_by_class(FieldVariable), observer_fb=term.arg.get_arg_by_class(ObserverGain), inputs=term.arg.get_arg_by_class(Input)) if is_observer: if placeholders["observer_fb"]: raise ValueError( "The weak formulation for an observer gain can not hold \n" "the 'Placeholder' ObserverGain.") if placeholders["field_variables"]: raise ValueError( "The weak formulation for an observer gain can not hold \n" "the 'Placeholder' FieldVariable.") if placeholders["scalars"]: if any([plh.target_term["name"] == 'E' for plh in placeholders["scalars"]]): raise ValueError( "The weak formulation for an observer gain can not \n" "hold a 'Placeholder' Scalars with target_term == 'E'.") # field variable terms: sort into E_np, E_n-1p, ..., E_0p if placeholders["field_variables"]: assert isinstance(term, IntegralTerm) if len(placeholders["field_variables"]) != 1: raise NotImplementedError field_var = placeholders["field_variables"][0] if not field_var.simulation_compliant: msg = "Shape- and test-function labels of FieldVariable must " \ "match for simulation purposes." raise ValueError(msg) temp_order = field_var.order[0] exponent = field_var.data["exponent"] term_info = dict(name="E", order=temp_order, exponent=exponent) base = get_base(field_var.data["func_lbl"]).derive(field_var.order[1]) shape_funcs = base.raise_to(exponent) if placeholders["inputs"]: # essentially, this means that parts of the state-transition # matrix will be time dependent raise NotImplementedError if placeholders["functions"]: # is the integrand a product? if len(placeholders["functions"]) != 1: raise NotImplementedError func1 = placeholders["functions"][0] base1 = get_base(func1.data["func_lbl"]).derive(func1.order[1]) result = calculate_scalar_product_matrix(base1, shape_funcs) else: # extract constant term and compute integral part1 = [] for func1 in shape_funcs.fractions: from pyinduct.core import ComposedFunctionVector if isinstance(func1, ComposedFunctionVector): res = 0 for f in func1.members["funcs"]: area = domain_intersection(term.limits, f.nonzero) r, err = integrate_function(f, area) res += r for s in func1.members["scalars"]: res += s else: area = domain_intersection(term.limits, func1.nonzero) res, err = integrate_function(func1, area) part1.append(res) a = Scalars(np.atleast_2d(part1)) if placeholders["scalars"]: b = placeholders["scalars"][0] result = _compute_product_of_scalars([a, b]) else: result = a.data ce.add_to(weight_label=field_var.data["weight_lbl"], term=term_info, val=result * term.scale) continue # TestFunctions or pre evaluated terms, those can end up in E, f or G if placeholders["functions"]: if not 1 <= len(placeholders["functions"]) <= 2: raise NotImplementedError func1 = placeholders["functions"][0] base1 = get_base(func1.data["func_lbl"]).derive(func1.order[1]) prod = base1.scalar_product_hint() if len(placeholders["functions"]) == 1: # product of one function and something else, solve integral # first by faking 2nd factor base2 = [f.mul_neutral_element() for f in base1] else: func2 = placeholders["functions"][1] base2 = get_base(func2.data["func_lbl"]).derive(func2.order[1]) # resolve equation term if isinstance(term, ScalarProductTerm): int_res = vectorize_scalar_product(base1, base2, prod) elif isinstance(term, IntegralTerm): from pyinduct.core import Base, ComposedFunctionVector # create base with multiplied fractions s_base = Base([f1.scale(f2) for f1, f2 in zip(base1, base2)]) int_res = [] for frac in s_base: # WARN I don't think that this case actually makes sense. if isinstance(frac, ComposedFunctionVector): res = 0 for f in frac.members["funcs"]: area = domain_intersection(term.limits, f.nonzero) r, err = integrate_function(f, area) res += r for s in frac.members["scalars"]: res += s else: area = domain_intersection(term.limits, frac.nonzero) res, err = integrate_function(frac, area) int_res.append(res) else: raise NotImplementedError() # create column vector int_res = np.atleast_2d(int_res).T * term.scale # integral of the product of two functions if len(placeholders["functions"]) == 2: term_info = dict(name="f", exponent=0) ce.add_to(weight_label=None, term=term_info, val=int_res) continue if placeholders["scalars"]: a = placeholders["scalars"][0] b = Scalars(int_res) result = _compute_product_of_scalars([a, b]) ce.add_to(weight_label=a.target_form, term=get_common_target(placeholders["scalars"]), val=result) continue if placeholders["inputs"]: if len(placeholders["inputs"]) != 1: raise NotImplementedError input_var = placeholders["inputs"][0] input_func = input_var.data["input"] input_index = input_var.data["index"] input_exp = input_var.data["exponent"] input_order = input_var.order[0] term_info = dict(name="G", order=input_order, exponent=input_exp) ce.add_to(weight_label=None, term=term_info, val=int_res, column=input_index) ce.set_input_function(input_func) continue if is_observer: result = np.vstack([integrate_function(func, func.nonzero)[0] for func in base1]) ce.add_to(weight_label=func1.data["appr_lbl"], term=dict(name="E", order=0, exponent=1), val=result * term.scale) continue # pure scalar terms, sort into corresponding matrices if placeholders["scalars"]: assert isinstance(term, ScalarTerm) result = _compute_product_of_scalars(placeholders["scalars"]) target = get_common_target(placeholders["scalars"]) target_form = get_common_form(placeholders) if placeholders["inputs"]: input_var = placeholders["inputs"][0] input_func = input_var.data["input"] input_index = input_var.data["index"] input_exp = input_var.data["exponent"] input_order = input_var.order[0] term_info = dict(name="G", order=input_order, exponent=input_exp) if target["name"] == "E": # this would mean that the input term should appear in a # matrix like E1 or E2, again leading to a time dependant # state transition matrix raise NotImplementedError ce.add_to(weight_label=None, term=term_info, val=result * term.scale, column=input_index) ce.set_input_function(input_func) continue if is_observer: ce.add_to( weight_label=placeholders["scalars"][0].target_term["test_appr_lbl"], term=dict(name="E", order=0, exponent=1), val=result * term.scale) else: ce.add_to(weight_label=target_form, term=target, val=result * term.scale) continue if placeholders["observer_fb"]: ce.add_to(weight_label=None, term=dict(name="L"), val=placeholders["observer_fb"][0].data["obs_fb"]) continue # inform object that the parsing process is complete if finalize: ce.finalize() return ce
[docs]def parse_weak_formulations(weak_forms): """ Convenience wrapper for :py:func:`.parse_weak_formulation`. Args: weak_forms: List of :py:class:`.WeakFormulation`'s. Returns: List of :py:class:`.CanonicalEquation`'s. """ canonical_equations = list() for form in weak_forms: print(">>> parse formulation {}".format(form.name)) ce = parse_weak_formulation(form) if ce.name in [ceq.name for ceq in canonical_equations]: raise ValueError(("Name {} for CanonicalEquation already assigned, " "names must be unique.").format(form.name)) canonical_equations.append(ce) return canonical_equations
def _compute_product_of_scalars(scalars): """ Compute products for scalar terms while paying attention to some caveats Depending on how the data (coefficients for the lumped equations) of the terms were generated, it is either a column or a row vector. Special cases contain a simple scaling of all equations shape = (1, 1) and products of row and column vectors if two terms are provided. Args: scalars: Returns: """ data_shape1 = scalars[0].data.shape if len(scalars) < 1 or len(scalars) > 2: raise NotImplementedError() if len(scalars) == 1: # simple scaling of all terms if sum(data_shape1) > (max(data_shape1) + 1): # print("Workaround 1: Summing up all entries") res = np.sum(scalars[0].data, axis=0, keepdims=True).T else: assert data_shape1[0] == 1 or data_shape1[1] == 1 res = scalars[0].data return res # two arguments data_shape2 = scalars[1].data.shape if data_shape1 == data_shape2 and data_shape2[1] == 1: # element wise multiplication res = np.prod(np.array([scalars[0].data, scalars[1].data]), axis=0) elif data_shape1 == (1, 1) or data_shape2 == (1, 1): # a lumped term is present res = scalars[0].data * scalars[1].data else: # dyadic product try: if data_shape1[1] == 1: res = scalars[0].data @ scalars[1].data elif data_shape2[1] == 1: res = scalars[1].data @ scalars[0].data # TODO: handle dyadic product ComposedFunctionVector and Base in the same way elif data_shape1[1] == data_shape2[0]: # print("Workaround 2: Matrix product") res = np.transpose(scalars[1].data) @ np.transpose(scalars[0].data) else: raise NotImplementedError except ValueError as e: raise ValueError("provided entries do not form a dyadic product") return res
[docs]def simulate_state_space(state_space, initial_state, temp_domain, settings=None): r""" Wrapper to simulate a system given in state space form: .. math:: \dot{q} = A_pq^p + A_{p-1}q^{p-1} + \dotsb + A_0q + Bu. Args: state_space (:py:class:`.StateSpace`): State space formulation of the system. initial_state: Initial state vector of the system. temp_domain (:py:class:`.Domain`): Temporal domain object. settings (dict): Parameters to pass to the :py:func:`set_integrator` method of the :class:`scipy.ode` class, with the integrator name included under the key :obj:`name`. Return: tuple: Time :py:class:`.Domain` object and weights matrix. """ # if not isinstance(state_space, StateSpace): # raise TypeError q = [initial_state] t = [temp_domain[0]] r = ode(state_space.rhs) # TODO check for complex-valued matrices and use 'zvode' if settings: r.set_integrator(settings.pop("name"), **settings) else: # use some sane defaults r.set_integrator( "vode", max_step=temp_domain.step, method="adams", nsteps=1e3 ) r.set_initial_value(q[0], t[0]) for t_step in temp_domain[1:]: qn = r.integrate(t_step) if not r.successful(): warnings.warn("*** Error: Simulation aborted at t={} ***".format(r.t)) break t.append(r.t) q.append(qn) # create results q = np.array(q) return Domain(points=np.array(t), step=temp_domain.step), q
[docs]def evaluate_approximation(base_label, weights, temp_domain, spat_domain, spat_order=0, name=""): """ Evaluate an approximation given by weights and functions at the points given in spatial and temporal steps. Args: weights: 2d np.ndarray where axis 1 is the weight index and axis 0 the temporal index. base_label (str): Functions to use for back-projection. temp_domain (:py:class:`.Domain`): For steps to evaluate at. spat_domain (:py:class:`.Domain`): For points to evaluate at (or in). spat_order: Spatial derivative order to use. name: Name to use. Return: :py:class:`.EvalData` """ funcs = get_base(base_label).derive(spat_order).fractions if weights.shape[1] != funcs.shape[0]: raise ValueError("weights (len={0}) have to fit provided functions " "(len={1})!".format(weights.shape[1], funcs.size)) # evaluate shape functions at given points shape_vals = np.array([func.evaluation_hint(spat_domain) for func in funcs]).T if shape_vals.ndim == 2: res = weights @ shape_vals.T else: # get extra dims to the front in both arrays extra_axes = range(1, shape_vals.ndim - 1) axes_idxs = np.array(extra_axes) b_shape_vals = np.swapaxes(shape_vals, 0, -1) b_shape_vals = np.moveaxis(b_shape_vals, axes_idxs, axes_idxs-1) w_shape = (*np.array(shape_vals.shape)[axes_idxs], *weights.shape) b_weights = np.broadcast_to(weights, w_shape) b_res = b_weights @ b_shape_vals res = np.moveaxis(b_res, axes_idxs-1, axes_idxs+1) ed = EvalData([temp_domain.points, spat_domain.points], res, name=name, fill_axes=True) return ed
[docs]def set_dominant_labels(canonical_equations, finalize=True): """ Set the dominant label (*dominant_lbl*) member of all given canonical equations and check if the problem formulation is valid (see background section: http://pyinduct.readthedocs.io/en/latest/). If the dominant label of one or more :py:class:`.CanonicalEquation` is already defined, the function raise a UserWarning if the (pre)defined dominant label(s) are not valid. Args: canonical_equations: List of :py:class:`.CanonicalEquation` instances. finalize (bool): Finalize the equations? Default: True. """ if isinstance(canonical_equations, CanonicalEquation): canonical_equations = [canonical_equations] # collect all involved labels labels = set( chain(*[list(ce.dynamic_forms.keys()) for ce in canonical_equations])) if len(labels) != len(canonical_equations): raise ValueError("The N defined canonical equations (weak forms)\n" "must hold exactly N different weight labels!\n" "But your {} canonical equation(s) (weak form(s))\n" "hold {} weight label(s)!" "".format(len(canonical_equations), len(labels))) max_orders = dict() for ce in canonical_equations: ce.finalize_dynamic_forms() for lbl in list(ce.dynamic_forms.keys()): max_order = dict( (("max_order", ce.dynamic_forms[lbl].max_temp_order), ("can_eqs", [ce]))) if lbl not in max_orders or \ max_orders[lbl]["max_order"] < max_order["max_order"]: max_orders[lbl] = max_order elif max_orders[lbl]["max_order"] == max_order["max_order"]: max_orders[lbl]["can_eqs"].append( max_order["can_eqs"][0]) non_valid1 = [(lbl, max_orders[lbl]) for lbl in labels if len(max_orders[lbl]["can_eqs"]) > 1] if non_valid1: raise ValueError("The highest time derivative from a certain weight\n" "label may only occur in one canonical equation. But\n" "each of the canonical equations {} holds the\n" "weight label '{}' with order {} in time." "".format(non_valid1[0][1]["can_eqs"][0].name, non_valid1[0][0], non_valid1[0][1]["max_order"])) non_valid2 = [lbl for lbl in labels if max_orders[lbl]["max_order"] == 0] if non_valid2: raise ValueError("The defined problem leads to an differential\n" "algebraic equation, since there is no time\n" "derivative for the weights {}. Such problems are\n" "not considered in pyinduct, yet." "".format(non_valid2)) # set/check dominant labels for lbl in labels: pre_lbl = max_orders[lbl]["can_eqs"][0].dominant_lbl max_orders[lbl]["can_eqs"][0].dominant_lbl = lbl if pre_lbl is not None and pre_lbl != lbl: warnings.warn("\n Predefined dominant label '{}' from\n" "canonical equation / weak form '{}' not valid!\n" "It will be overwritten with the label '{}'." "".format(pre_lbl, max_orders[lbl]["can_eqs"][0].name, lbl), UserWarning) if finalize: for ce in canonical_equations: ce.finalize()
[docs]class SimulationInputVector(SimulationInput): """ A simulation input which combines :py:class:`.SimulationInput` objects into a column vector. Args: input_vector (array_like): Simulation inputs to stack. """ def __init__(self, input_vector): SimulationInput.__init__(self) self._input_vector = self._sanitize_input_vector(input_vector) def _sanitize_input_vector(self, input_vector): if hasattr(input_vector, "__len__") and len(input_vector) == 0: return list() else: return sanitize_input(input_vector, SimulationInput) def __iter__(self): return iter(self._input_vector) def __getitem__(self, item): return self._input_vector[item]
[docs] def append(self, input_vector): """ Add an input to the vector. """ inputs = self._sanitize_input_vector(input_vector) self._input_vector = np.hstack((self._input_vector, inputs))
def _calc_output(self, **kwargs): output = list() for input in self._input_vector: output.append(input(**kwargs)) return dict(output=output)