hydrogym.core
ActuatorBase Objects
class ActuatorBase()
step
def step(u: float, dt: float)
Update the state of the actuator
PDEBase Objects
class PDEBase(metaclass=abc.ABCMeta)
Basic configuration of the state of the PDE model
Will contain any time-varying flow fields, boundary conditions, actuation models, etc. Does not contain any information about solving the time-varying equations
num_inputs
@property
@abc.abstractmethod
def num_inputs() -> int
Length of the control vector (number of actuators)
num_outputs
@property
@abc.abstractmethod
def num_outputs() -> int
Number of scalar observed variables
load_mesh
@abc.abstractmethod
def load_mesh(name: str) -> MeshType
Load mesh from the file name
initialize_state
@abc.abstractmethod
def initialize_state()
Set up mesh, function spaces, state vector, etc
init_bcs
@abc.abstractmethod
def init_bcs()
Initialize any boundary conditions for the PDE.
set_state
def set_state(q: StateType)
Set the current state fields
Should be overridden if a different assignment
mechanism is used (e.g. Function.assign)
Arguments:
qStateType - State to be assigned
state
def state() -> StateType
Return current state field(s) of the PDE
copy_state
@abc.abstractmethod
def copy_state(deepcopy=True)
Return a copy of the flow state
reset
def reset(q0: StateType = None, t: float = 0.0)
Reset the PDE to an initial state
Arguments:
q0 (StateType, optional): State to which the PDE fields will be assigned. Defaults to None.
reset_controls
def reset_controls()
Reset the controls to a zero state
Note that this is broken out from reset because
the two are not necessarily called together (e.g.
for linearization or deriving the control vector)
collect_bcs
def collect_bcs() -> Iterable[BCType]
Return the set of boundary conditions
get_observations
@abc.abstractmethod
def get_observations() -> Iterable[ArrayLike]
Return the set of measurements/observations
evaluate_objective
@abc.abstractmethod
def evaluate_objective(q: StateType = None) -> ArrayLike
Return the objective function to be minimized
Arguments:
q (StateType, optional): State to evaluate the objective of, if not the current PDE state. Defaults to None.
Returns:
ArrayLike- objective function (negative of reward)
enlist
def enlist(x: Any) -> Iterable[Any]
Convert scalar or array-like to a list
set_control
def set_control(act: ArrayLike = None)
Directly set the control state
advance_time
def advance_time(dt: float, act: list[float] = None) -> list[float]
Update the current controls state. May involve integrating
a dynamics model rather than directly setting the controls state.
Here, if actual control is u and input is v, effectively
du/dt = (1/tau)*(v - u)
Arguments:
actIterable[ArrayLike] - Action inputsdtfloat - Time step
Returns:
Iterable[ArrayLike]- Updated actuator state
dot
def dot(q1: StateType, q2: StateType) -> float
Inner product between states q1 and q2
render
@abc.abstractmethod
def render(**kwargs)
Plot the current PDE state (called by gymnasium.Env)
CallbackBase Objects
class CallbackBase()
__init__
def __init__(interval: int = 1)
Base class for things that happen every so often in the simulation (e.g. save output for visualization or write some info to a log file).
Arguments:
-
intervalint, optional - How often to take action. Defaults to 1. -
TODO- Add a ControllerCallback
__call__
def __call__(iter: int, t: float, flow: PDEBase) -> bool
Check if this is an 'iostep' by comparing to self.interval
Arguments:
iterint - Iteration numbertfloat - Time valueflowPDEBase - Underlying PDE model
Returns:
bool- whether or not to do the thing in this iteration
close
def close()
Close any open files, etc.
TransientSolver Objects
class TransientSolver()
Time-stepping code for updating the transient PDE
solve
def solve(
t_span: Tuple[float, float] = None,
num_steps: int = None,
callbacks: Iterable[CallbackBase] = [],
controller: Callable = None,
collect_rewards: bool = False
) -> Union[PDEBase, Tuple[PDEBase, np.ndarray]]
Solve the initial-value problem for the PDE.
Supports both time-span and fixed-step modes:
- If t_span is provided: solve from t_span[0] to t_span[1] with self.dt
- If num_steps is provided: solve for exactly num_steps iterations
Arguments:
t_spanTuple[float, float], optional - Tuple of start and end times (mutually exclusive with num_steps)num_stepsint, optional - Number of steps to take (mutually exclusive with t_span) callbacks (Iterable[CallbackBase], optional): List of callbacks to evaluate throughout the solve. Defaults to []. controller (Callable, optional): Feedback/forward controlleru = ctrl(t, y)collect_rewardsbool, optional - If True, collect and return rewards from each step. Defaults to False.
Returns:
PDEBase- The state of the PDE at the end OR Tuple[PDEBase, np.ndarray]: (state, rewards) if collect_rewards=True
step
def step(iter: int, control: Iterable[float] = None, **kwargs)
Advance the transient simulation by one time step
Arguments:
iterint - Iteration countcontrolIterable[float], optional - Actuation input. Defaults to None.
reset
def reset()
Reset variables for the timestepper
FlowEnv Objects
class FlowEnv(gym.Env)
step
def step(
action: Iterable[ArrayLike] = None
) -> Tuple[ArrayLike, float, bool, bool, dict]
Advance the state of the environment. See gymnasium.Env documentation
Arguments:
actionIterable[ArrayLike], optional - Control inputs. Defaults to None.
Returns:
Tuple[ArrayLike, float, bool, bool, dict]: obs, reward, terminated, truncated, info
stack_observations
def stack_observations(obs)
Convert observations to numpy array format.
Arguments:
obs- Observations in various formats (tuple, list, ndarray, scalar)
Returns:
np.ndarray- Observations as a numpy array
reset
def reset(seed=None, options=None) -> Tuple[ArrayLike, dict]
Reset the environment to initial state.
Arguments:
seed- Random seed for reproducibility (gymnasium API).options- Additional options (gymnasium API).
Returns:
Tuple[ArrayLike, dict]: (observation, info)