toddlerbot.utils package

Submodules

toddlerbot.utils.array_utils module

Array manipulation utilities for both JAX and NumPy compatibility.

Provides unified array operations that work with either JAX or NumPy based on the USE_JAX environment variable.

toddlerbot.utils.array_utils.conditional_update(condition: bool | Array | ndarray[Any, dtype[bool_]], true_func: Callable[[], Array | ndarray[Any, dtype[float32]]], false_func: Callable[[], Array | ndarray[Any, dtype[float32]]]) Array | ndarray[Any, dtype[float32]]

Performs a conditional update using jax.lax.cond if USE_JAX is True, or a standard if-else statement for NumPy.

Parameters:
  • condition – The condition to check.

  • true_func – Function to execute if the condition is True.

  • false_func – Function to execute if the condition is False.

Returns:

The result of true_func if condition is True, otherwise the result of false_func.

toddlerbot.utils.array_utils.inplace_add(array: Array | ndarray[Any, dtype[float32]], idx: int | slice | tuple[int | slice, ...], value: Any) Array | ndarray[Any, dtype[float32]]

Performs an in-place addition to an array at specified indices.

Parameters:
  • array (ArrayType) – The array to be updated.

  • idx (int | slice | tuple[int | slice, ...]) – The index or indices where the addition should occur.

  • value (Any) – The value to add to the specified indices.

Returns:

The updated array after performing the in-place addition.

Return type:

ArrayType

toddlerbot.utils.array_utils.inplace_update(array: Array | ndarray[Any, dtype[float32]], idx: int | slice | Array | ndarray[Any, dtype[float32]] | tuple[int | slice | Array | ndarray[Any, dtype[float32]], ...], value: Any) Array | ndarray[Any, dtype[float32]]

Updates the specified elements of an array in place with a given value.

Parameters:
  • array (ArrayType) – The array to be updated.

  • idx (int | slice | ArrayType | tuple[int | slice | ArrayType, ...]) – The indices of the elements to update. Can be an integer, slice, array, or a tuple of these.

  • value (Any) – The value to set at the specified indices.

Returns:

The updated array.

Return type:

ArrayType

toddlerbot.utils.array_utils.loop_update(update_step: Callable[[Tuple[Array | ndarray[Any, dtype[float32]], Array | ndarray[Any, dtype[float32]]], int], Tuple[Tuple[Array | ndarray[Any, dtype[float32]], Array | ndarray[Any, dtype[float32]]], Array | ndarray[Any, dtype[float32]]]], x: Array | ndarray[Any, dtype[float32]], u: Array | ndarray[Any, dtype[float32]], index_range: Tuple[int, int]) Array | ndarray[Any, dtype[float32]]

A general function to perform loop updates compatible with both JAX and NumPy.

Parameters:
  • N – Number of steps.

  • traj_x – The state trajectory array.

  • traj_u – The control input trajectory array.

  • update_step – A function that defines how to update the state at each step.

  • USE_JAX – A flag to determine whether to use JAX or NumPy.

Returns:

The updated trajectory array.

toddlerbot.utils.array_utils.random_uniform(low: float, high: float, rng: Array | None = None, shape: Tuple[int, ...] | None = None) Array | ndarray[Any, dtype[float32]]

Generates random uniform values compatible with both JAX and NumPy.

Parameters:
  • low – Lower bound of the uniform distribution.

  • high – Upper bound of the uniform distribution.

  • rng – JAX random key (required when USE_JAX is True).

  • shape – Shape of the output array.

Returns:

Random uniform array.

toddlerbot.utils.comm_utils module

Communication utilities for network time sync and ZeroMQ messaging.

Provides ZMQ-based communication between sender and receiver nodes and network time synchronization functionality.

class toddlerbot.utils.comm_utils.ZMQMessage(time: float, control_inputs: Dict[str, float] | None = None, action: ndarray[Any, dtype[float32]] | None = None, fsr: ndarray[Any, dtype[float32]] | None = None, camera_frame: ndarray[Any, dtype[uint8]] | None = None)

Bases: object

Data class for ZMQ messages.

action: ndarray[Any, dtype[float32]] | None = None
camera_frame: ndarray[Any, dtype[uint8]] | None = None
control_inputs: Dict[str, float] | None = None
fsr: ndarray[Any, dtype[float32]] | None = None
time: float
class toddlerbot.utils.comm_utils.ZMQNode(type: str = 'sender', ip: str = '', queue_len: int = 1)

Bases: object

A class for handling ZMQ communication between sender and receiver nodes.

get_msg(return_last: bool = True)

Retrieves messages from a ZMQ socket buffer until it is empty.

This method is designed to handle cases where reading from the buffer is too slow, causing issues with simple get operations. It reads all available messages from the buffer and returns either the last message or all messages, depending on the return_last parameter.

Parameters:

return_last (bool) – If True, returns only the last message received. If False, returns all messages. Defaults to True.

Returns:

The last message if return_last is True, a list of all messages if return_last is False, or None if no messages are available.

Return type:

ZMQMessage or List[ZMQMessage] or None

Raises:

ValueError – If the ZMQ socket type is not ‘receiver’.

send_msg(msg: ZMQMessage)

Sends a serialized ZMQMessage if the instance type is ‘sender’.

Parameters:

msg (ZMQMessage) – The message to be sent, which will be serialized.

Raises:

ValueError – If the instance type is not ‘sender’.

start_zmq()

Initialize a ZeroMQ context and socket for data exchange based on the specified type.

Sets up a ZeroMQ context and configures a socket as either a sender or receiver. For a sender, it connects to a specified IP and port, setting options to manage message queue length and non-blocking behavior. For a receiver, it binds to a port and configures options to manage message queue length and ensure only the latest message is kept.

toddlerbot.utils.comm_utils.sync_time(ip: str)

Synchronizes the system time with a network time server.

This function connects to a network time server specified by the given IP address and adjusts the system clock to match the server’s time.

Parameters:

ip (str) – The IP address of the network time server to synchronize with.

toddlerbot.utils.dataset_utils module

Dataset utilities for logging and saving teleoperation data.

Provides data logging functionality for capturing and persisting teleoperation episodes to compressed files.

class toddlerbot.utils.dataset_utils.Data(time: float, action: ndarray[Any, dtype[float32]], motor_pos: ndarray[Any, dtype[float32]], image: ndarray[Any, dtype[uint8]] | None = None)

Bases: object

Data class for logging teleoperation data during an episode.

action: ndarray[Any, dtype[float32]]
image: ndarray[Any, dtype[uint8]] | None = None
motor_pos: ndarray[Any, dtype[float32]]
time: float
class toddlerbot.utils.dataset_utils.DatasetLogger

Bases: object

A class for logging teleoperation data during an episode and saving it to disk.

log_entry(data: Data)

Adds a data entry to the internal list.

Parameters:

data (Data) – The data entry to be added to the list.

move_files_to_exp_folder(exp_folder_path: str)

Moves files with a specific naming pattern from the temporary directory to a specified experiment folder.

Parameters:

exp_folder_path (str) – The destination directory path where the files will be moved.

save()

Saves the current data list to a compressed file and resets the data list.

This method converts the attributes of each data object in self.data_list into a dictionary of numpy arrays, adds a start time, and saves the dictionary to a file in LZ4 compressed format. The file is named using the current number of episodes. After saving, the episode count is incremented, and the data list is cleared.

data_list

A list of data objects to be saved.

Type:

list

n_episodes

The current number of episodes logged.

Type:

int

Side Effects:

Increments self.n_episodes. Clears self.data_list. Prints a log message indicating the number of episodes logged and their length.

toddlerbot.utils.io_utils module

Input/output utilities for file operations and system interactions.

Provides functionality for XML formatting, serial port detection, file discovery, and environment path resolution.

toddlerbot.utils.io_utils.find_last_result_dir(result_dir: str, prefix: str = '') str | None

Find the latest (most recent) result directory within a given directory.

Parameters:
  • result_dir – The path to the directory containing result subdirectories.

  • prefix – The prefix of result directory names to consider.

Returns:

The path to the latest result directory, or None if no matching directory is found.

toddlerbot.utils.io_utils.find_latest_file_with_time_str(directory: str, file_prefix: str = '') str | None

Finds the file with the latest timestamp (YYYYMMDD_HHMMSS) in the given directory, for files ending with the specified suffix.

Parameters:
  • directory (str) – Directory to search for files.

  • file_suffix (str) – The suffix to match (e.g., ‘.pkl’, ‘_updated.pkl’).

Returns:

Full path of the latest file or None if no matching file is found.

Return type:

str | None

toddlerbot.utils.io_utils.find_ports(target: str) List[str]

Find open network ports on a specified target.

Parameters:

target – The IP address or hostname of the target to scan for open ports.

Returns:

A list of strings representing the open ports on the target.

toddlerbot.utils.io_utils.get_conda_path()

Determines the path of the current Python environment.

Returns:

The path to the current Python environment. If a virtual environment is active, returns the virtual environment’s path. If a conda environment is active, returns the conda environment’s path. Otherwise, returns the system environment’s path.

Return type:

str

toddlerbot.utils.io_utils.pretty_write_xml(root: Element, file_path: str)

Formats an XML Element into a pretty-printed XML string and writes it to a specified file.

Parameters:
  • root (ET.Element) – The root element of the XML tree to be formatted.

  • file_path (str) – The path to the file where the formatted XML will be written.

toddlerbot.utils.math_utils module

Mathematical utilities for signal processing, interpolation, and filtering.

Provides functions for signal generation, trajectory interpolation, coordinate transforms, and various mathematical operations used throughout the robot control system.

Performs a binary search on a sorted array to find the index of a target value.

Parameters:
  • arr (ArrayType) – A sorted array of numbers.

  • t (ArrayType | float) – The target value to search for.

Returns:

The index of the target value if found; otherwise, the index of the largest element less than the target.

Return type:

int

toddlerbot.utils.math_utils.butterworth(b: Array | ndarray[Any, dtype[float32]], a: Array | ndarray[Any, dtype[float32]], x: Array | ndarray[Any, dtype[float32]], past_inputs: Array | ndarray[Any, dtype[float32]], past_outputs: Array | ndarray[Any, dtype[float32]]) Tuple[Array | ndarray[Any, dtype[float32]], Array | ndarray[Any, dtype[float32]], Array | ndarray[Any, dtype[float32]]]

Apply Butterworth filter to input data using filter coefficients.

Supports both scalar and multi-dimensional inputs.

Parameters:
  • b – Filter numerator coefficients (b0, b1, …, bm).

  • a – Filter denominator coefficients (a0, a1, …, an) with a[0] = 1.

  • x – Current input value (scalar or array).

  • past_inputs – Past input values with shape (filter_order-1, x.shape).

  • past_outputs – Past output values with shape (filter_order-1, x.shape).

Returns:

A tuple containing:
  • y: Filtered output with same shape as x

  • new_past_inputs: Updated past inputs

  • new_past_outputs: Updated past outputs

Return type:

tuple

toddlerbot.utils.math_utils.exponential_moving_average(alpha: Array | ndarray[Any, dtype[float32]] | float, current_value: Array | ndarray[Any, dtype[float32]] | float, previous_filtered_value: Array | ndarray[Any, dtype[float32]] | float | None = None) Array | ndarray[Any, dtype[float32]] | float

Calculate the exponential moving average of a current value.

This function computes the exponential moving average (EMA) for a given current value using a specified smoothing factor, alpha. If a previous filtered value is provided, it is used to compute the EMA; otherwise, the current value is used as the initial EMA.

Parameters:
  • alpha (ArrayType | float) – The smoothing factor, where 0 < alpha <= 1.

  • current_value (ArrayType | float) – The current data point to be filtered.

  • previous_filtered_value (Optional[ArrayType | float]) – The previous EMA value. If None, the current value is used as the initial EMA.

Returns:

The updated exponential moving average.

Return type:

ArrayType | float

toddlerbot.utils.math_utils.gaussian_basis_functions(phase: Array | ndarray[Any, dtype[float32]], N: int = 50)

Resample a trajectory to a specified time interval using interpolation.

Parameters:
  • trajectory (List[Tuple[float, Dict[str, float]]]) – The original trajectory, where each element is a tuple containing a timestamp and a dictionary of joint angles.

  • desired_interval (float, optional) – The desired time interval between resampled points. Defaults to 0.01.

  • interp_type (str, optional) – The type of interpolation to use (‘linear’, ‘quadratic’, ‘cubic’). Defaults to ‘linear’.

Returns:

The resampled trajectory with interpolated joint angles at the specified time intervals.

Return type:

List[Tuple[float, Dict[str, float]]]

toddlerbot.utils.math_utils.get_action_traj(time_curr: float, action_curr: Array | ndarray[Any, dtype[float32]], action_next: Array | ndarray[Any, dtype[float32]], duration: float, control_dt: float, end_time: float = 0.0)

Calculates the trajectory of an action over a specified duration, interpolating between current and next actions.

Parameters:
  • time_curr (float) – The current time from which the trajectory starts.

  • action_curr (ArrayType) – The current action state as a NumPy array.

  • action_next (ArrayType) – The next action state as a NumPy array.

  • duration (float) – The total duration over which the action should be interpolated.

  • end_time (float, optional) – The time at the end of the duration where the action should remain constant. Defaults to 0.0.

Returns:

A tuple containing the time steps and the corresponding interpolated positions.

Return type:

Tuple[ArrayType, ArrayType]

toddlerbot.utils.math_utils.get_chirp_signal(duration: float, control_dt: float, mean: float, initial_frequency: float, final_frequency: float, amplitude: float, decay_rate: float, method: str = 'linear') Tuple[Array | ndarray[Any, dtype[float32]], Array | ndarray[Any, dtype[float32]]]

Generate a chirp signal over a specified duration with varying frequency and amplitude.

Parameters:
  • duration – Total duration of the chirp signal in seconds.

  • control_dt – Time step for the signal generation.

  • mean – Mean value of the signal.

  • initial_frequency – Starting frequency of the chirp in Hz.

  • final_frequency – Ending frequency of the chirp in Hz.

  • amplitude – Amplitude of the chirp signal.

  • decay_rate – Rate at which the amplitude decays over time.

  • method – Method of frequency variation, e.g., “linear”, “quadratic”, “logarithmic”.

Returns:

  • Time array for the chirp signal.

  • Generated chirp signal array.

Return type:

A tuple containing

toddlerbot.utils.math_utils.get_local_vec(world_vec: Array | ndarray[Any, dtype[float32]], world_quat: Array | ndarray[Any, dtype[float32]]) Array | ndarray[Any, dtype[float32]]

Transforms a world-frame vector to local frame using quaternion rotation.

Parameters:
  • world_vec – Vector in world coordinates.

  • world_quat – Quaternion rotation (w, x, y, z format).

Returns:

Vector transformed to local frame.

toddlerbot.utils.math_utils.get_random_sine_signal_config(duration: float, control_dt: float, mean: float, frequency_range: List[float], amplitude_range: List[float])

Generates a random sinusoidal signal configuration based on specified parameters.

Parameters:
  • duration (float) – The total duration of the signal in seconds.

  • control_dt (float) – The time step for signal generation.

  • mean (float) – The mean value around which the sinusoidal signal oscillates.

  • frequency_range (List[float]) – A list containing the minimum and maximum frequency values for the signal.

  • amplitude_range (List[float]) – A list containing the minimum and maximum amplitude values for the signal.

Returns:

A tuple containing the time array and the generated sinusoidal signal array.

Return type:

Tuple[ArrayType, ArrayType]

toddlerbot.utils.math_utils.get_sine_signal(sine_signal_config: Dict[str, float])

Generate a sine signal based on the provided configuration.

Parameters:

sine_signal_config (Dict[str, float]) – Configuration dictionary containing parameters for the sine signal, such as amplitude, frequency, and phase.

Returns:

Array representing the generated sine signal.

Return type:

np.ndarray

toddlerbot.utils.math_utils.interpolate(p_start: Array | ndarray[Any, dtype[float32]] | float, p_end: Array | ndarray[Any, dtype[float32]] | float, duration: Array | ndarray[Any, dtype[float32]] | float, t: Array | ndarray[Any, dtype[float32]] | float, interp_type: str = 'linear') Array | ndarray[Any, dtype[float32]] | float

Interpolate position at time t using specified interpolation type.

Parameters:
  • p_start – Initial position.

  • p_end – Desired end position.

  • duration – Total duration from start to end.

  • t – Current time (within 0 to duration).

  • interp_type – Type of interpolation (‘linear’, ‘quadratic’, ‘cubic’).

Returns:

Position at time t.

toddlerbot.utils.math_utils.interpolate_action(t: Array | ndarray[Any, dtype[float32]] | float, time_arr: Array | ndarray[Any, dtype[float32]], action_arr: Array | ndarray[Any, dtype[float32]], interp_type: str = 'linear')

Interpolates an action value at a given time using specified interpolation method.

Parameters:
  • t (ArrayType | float) – The time at which to interpolate the action.

  • time_arr (ArrayType) – An array of time points corresponding to the action values.

  • action_arr (ArrayType) – An array of action values corresponding to the time points.

  • interp_type (str, optional) – The type of interpolation to use. Defaults to “linear”.

Returns:

The interpolated action value at time t.

toddlerbot.utils.math_utils.resample_trajectory(time_arr: Array | ndarray[Any, dtype[float32]], trajectory: Array | ndarray[Any, dtype[float32]], desired_interval: float = 0.02, interp_type: str = 'linear') Array | ndarray[Any, dtype[float32]]

Resamples a trajectory of joint angles over time to a specified time interval using interpolation.

Parameters:
  • trajectory (List[Tuple[float, Dict[str, float]]]) – A list of tuples where each tuple contains a timestamp and a dictionary of joint angles.

  • desired_interval (float, optional) – The desired time interval between resampled points. Defaults to 0.01.

  • interp_type (str, optional) – The type of interpolation to use (‘linear’, etc.). Defaults to ‘linear’.

Returns:

A resampled list of tuples with timestamps and interpolated joint angles.

Return type:

List[Tuple[float, Dict[str, float]]]

toddlerbot.utils.math_utils.round_floats(obj: Any, precision: int = 6) Any

Recursively round floats in a list-like structure to a given precision.

Parameters:
  • obj – The list, tuple, or numpy array to round.

  • precision (int) – The number of decimal places to round to.

Returns:

The rounded list, tuple, or numpy array.

toddlerbot.utils.math_utils.round_to_sig_digits(x: float, digits: int)

Round a floating-point number to a specified number of significant digits.

Parameters:
  • x – The number to be rounded.

  • digits – The number of significant digits to round to.

Returns:

The number rounded to the specified number of significant digits.

toddlerbot.utils.misc_utils module

Miscellaneous utility functions for the ToddlerBot framework.

This module provides various helper functions for common operations including: - Logging with colored output and different severity levels - Function profiling and performance analysis - Asynchronous operation utilities - Data structure manipulation and serialization - File system operations and process management

These utilities are used throughout the ToddlerBot codebase to provide consistent functionality for debugging, monitoring, and general operations.

toddlerbot.utils.misc_utils.camel2snake(camel_str: str) str

Converts a CamelCase string to snake_case.

Parameters:

camel_str (str) – The CamelCase string to be converted.

Returns:

The converted snake_case string.

Return type:

str

toddlerbot.utils.misc_utils.dataclass2dict(obj)

Converts a dataclass instance to a dictionary, including nested dataclasses.

Parameters:

obj – A dataclass instance to be converted.

Returns:

A dictionary representation of the dataclass, with nested dataclasses also converted to dictionaries.

Return type:

dict

toddlerbot.utils.misc_utils.dump_profiling_data(prof_path: str = 'profile_output.lprof')

Save profiling data to a specified file path.

Parameters:

prof_path (str) – The file path where the profiling data will be saved. Defaults to “profile_output.lprof”.

toddlerbot.utils.misc_utils.log(message: str, header: str = '', level: str = 'info')

Logs a message with an optional header and severity level.

Parameters:
  • message (str) – The message to log.

  • header (str, optional) – An optional header for the log message. Defaults to an empty string.

  • level (str, optional) – The severity level of the log message (e.g., ‘info’, ‘warning’, ‘error’). Defaults to ‘info’.

toddlerbot.utils.misc_utils.parse_value(value: str)

Recursively converts a dataclass to a dictionary, handling nested dataclasses by directly accessing their fields.

Parameters:

value (str) – The string representation of the value to be parsed.

Returns:

A dictionary representation of the dataclass.

Return type:

dict

toddlerbot.utils.misc_utils.precise_sleep(duration: float)

Sleeps for a specified duration with high precision.

Parameters:

duration (float) – The time to sleep in seconds.

toddlerbot.utils.misc_utils.profile() Callable[[F], F]

Convert a snake_case string to CamelCase.

Parameters:

snake_str – The snake_case string to convert.

Returns:

The CamelCase string.

toddlerbot.utils.misc_utils.set_seed(seed: int)

Sets the random seed for various libraries to ensure reproducibility.

This function sets the seed for Python’s random module, NumPy, and the environment variable PYTHONHASHSEED. If the provided seed is -1, a random seed is generated and used.

Parameters:

seed (int) – The seed value to set. If -1, a random seed is generated.

toddlerbot.utils.misc_utils.snake2camel(snake_str: str) str

Converts a snake_case string to CamelCase.

Parameters:

snake_str (str) – The snake_case string to convert.

Returns:

The CamelCase string.

Return type:

str

toddlerbot.utils.terrain_utils module

Utility functions for terrain generation and heightmap processing.

Includes: - Smooth Perlin noise generation for bumpy surfaces. - Edge fade mask for blending heightfields smoothly. - Frustum-shaped heightmap with adjustable flat top. - Center flattening mask for spawn-safe regions.

Used by terrain_types.py.

toddlerbot.utils.terrain_utils.center_flat_mask(size, flat_radius_frac=0.05)

Generate a radial mask that flattens the center of a heightmap.

Parameters:
  • size (int) – Output resolution (square).

  • flat_radius_frac (float) – Radius of the flat region as a fraction of size.

Returns:

A radial mask with a smooth transition from center to edges.

Return type:

ndarray

toddlerbot.utils.terrain_utils.edge_slope(size, border_width=5, blur_iterations=20)

Create a fade-out mask along the edges of a heightmap.

Parameters:
  • size (int) – Size of the heightmap (square).

  • border_width (int) – Width of the zeroed edge border.

  • blur_iterations (int) – Number of blurring steps to smooth the mask.

Returns:

Edge mask that gradually fades to zero.

Return type:

ndarray

toddlerbot.utils.terrain_utils.frustum_with_flat_top(size, peak=0.05, flat_ratio=0.3, falloff='quadratic', edge_flat_width=1)

Generate a frustum-shaped heightmap with a flat top.

Parameters:
  • size (int) – Size of the output heightmap (square).

  • peak (float) – Height of the peak.

  • flat_ratio (float) – Radius ratio of the flat top (0 to 1).

  • falloff (str) – Type of falloff function (“quadratic”, “linear”, “cosine”).

  • edge_flat_width (int) – Force a flat border of this width around edges.

Returns:

Frustum-shaped heightmap.

Return type:

ndarray

toddlerbot.utils.terrain_utils.interpolant(t)

Smoothstep interpolant used by Perlin noise. Helps create smooth transitions between gradient cells.

Parameters:

t (ndarray) – fractional position grid

Returns:

smoothed position weights

Return type:

ndarray

toddlerbot.utils.terrain_utils.perlin(shape, res, tileable=(False, False))

Generate a 2D Perlin noise grid.

Parameters:
  • shape (tuple) – Output array shape (H, W).

  • res (tuple) – Number of periods along each axis (y, x).

  • tileable (tuple) – Whether the noise tiles along each axis.

Returns:

2D Perlin noise map normalized to [-sqrt(2), sqrt(2)]

Return type:

ndarray

Module contents

Utility modules for the ToddlerBot framework.

This package contains various utility functions and classes for: - Mathematical operations and signal processing - Array and data structure manipulation - I/O operations and data serialization - Communication protocols and interfaces - Terrain generation and analysis - Miscellaneous helper functions

These utilities support both simulation and real robot operations, providing common functionality across the entire ToddlerBot ecosystem.