toddlerbot.utils package¶
Submodules¶
toddlerbot.utils.array_utils module¶
toddlerbot.utils.comm_utils module¶
- class toddlerbot.utils.comm_utils.ZMQMessage(time: float, control_inputs: Dict[str, float] | None = None, action: ndarray[Any, dtype[float32]] | None = None, fsr: ndarray[Any, dtype[float32]] | None = None, camera_frame: ndarray[Any, dtype[uint8]] | None = None)¶
Bases:
object
Data class for ZMQ messages.
- action: ndarray[Any, dtype[float32]] | None = None¶
- camera_frame: ndarray[Any, dtype[uint8]] | None = None¶
- control_inputs: Dict[str, float] | None = None¶
- fsr: ndarray[Any, dtype[float32]] | None = None¶
- time: float¶
- class toddlerbot.utils.comm_utils.ZMQNode(type: str = 'sender', ip: str = '', queue_len: int = 1)¶
Bases:
object
A class for handling ZMQ communication between sender and receiver nodes.
- get_msg(return_last: bool = True)¶
Retrieves messages from a ZMQ socket buffer until it is empty.
This method is designed to handle cases where reading from the buffer is too slow, causing issues with simple get operations. It reads all available messages from the buffer and returns either the last message or all messages, depending on the return_last parameter.
- Parameters:
return_last (bool) – If True, returns only the last message received. If False, returns all messages. Defaults to True.
- Returns:
The last message if return_last is True, a list of all messages if return_last is False, or None if no messages are available.
- Return type:
ZMQMessage or List[ZMQMessage] or None
- Raises:
ValueError – If the ZMQ socket type is not ‘receiver’.
- send_msg(msg: ZMQMessage)¶
Sends a serialized ZMQMessage if the instance type is ‘sender’.
- Parameters:
msg (ZMQMessage) – The message to be sent, which will be serialized.
- Raises:
ValueError – If the instance type is not ‘sender’.
- start_zmq()¶
Initialize a ZeroMQ context and socket for data exchange based on the specified type.
Sets up a ZeroMQ context and configures a socket as either a sender or receiver. For a sender, it connects to a specified IP and port, setting options to manage message queue length and non-blocking behavior. For a receiver, it binds to a port and configures options to manage message queue length and ensure only the latest message is kept.
- toddlerbot.utils.comm_utils.sync_time(ip: str)¶
Synchronizes the system time with a network time server.
This function connects to a network time server specified by the given IP address and adjusts the system clock to match the server’s time.
- Parameters:
ip (str) – The IP address of the network time server to synchronize with.
toddlerbot.utils.dataset_utils module¶
- class toddlerbot.utils.dataset_utils.Data(time: float, action: ndarray[Any, dtype[float32]], motor_pos: ndarray[Any, dtype[float32]], image: ndarray[Any, dtype[uint8]] | None = None)¶
Bases:
object
Data class for logging teleoperation data during an episode.
- action: ndarray[Any, dtype[float32]]¶
- image: ndarray[Any, dtype[uint8]] | None = None¶
- motor_pos: ndarray[Any, dtype[float32]]¶
- time: float¶
- class toddlerbot.utils.dataset_utils.DatasetLogger¶
Bases:
object
A class for logging teleoperation data during an episode and saving it to disk.
- log_entry(data: Data)¶
Adds a data entry to the internal list.
- Parameters:
data (Data) – The data entry to be added to the list.
- move_files_to_exp_folder(exp_folder_path: str)¶
Moves files with a specific naming pattern from the temporary directory to a specified experiment folder.
- Parameters:
exp_folder_path (str) – The destination directory path where the files will be moved.
- save()¶
Saves the current data list to a compressed file and resets the data list.
This method converts the attributes of each data object in self.data_list into a dictionary of numpy arrays, adds a start time, and saves the dictionary to a file in LZ4 compressed format. The file is named using the current number of episodes. After saving, the episode count is incremented, and the data list is cleared.
- data_list¶
A list of data objects to be saved.
- Type:
list
- n_episodes¶
The current number of episodes logged.
- Type:
int
- Side Effects:
Increments self.n_episodes. Clears self.data_list. Prints a log message indicating the number of episodes logged and their length.
toddlerbot.utils.file_utils module¶
- toddlerbot.utils.file_utils.find_last_result_dir(result_dir: str, prefix: str = '') str | None ¶
Find the latest (most recent) result directory within a given directory.
- Parameters:
result_dir – The path to the directory containing result subdirectories.
prefix – The prefix of result directory names to consider.
- Returns:
The path to the latest result directory, or None if no matching directory is found.
- toddlerbot.utils.file_utils.find_ports(target: str) List[str] ¶
Find open network ports on a specified target.
- Parameters:
target – The IP address or hostname of the target to scan for open ports.
- Returns:
A list of strings representing the open ports on the target.
- toddlerbot.utils.file_utils.find_robot_file_path(robot_name: str, suffix: str = '.urdf') str ¶
Dynamically finds the URDF file path for a given robot name.
This function searches for a .urdf file in the directory corresponding to the given robot name. It raises a FileNotFoundError if no URDF file is found.
- Parameters:
robot_name – The name of the robot (e.g., ‘robotis_op3’).
- Returns:
The file path to the robot’s URDF file.
- Raises:
FileNotFoundError – If no URDF file is found in the robot’s directory.
Example
robot_urdf_path = find_urdf_path(“robotis_op3”) print(robot_urdf_path)
toddlerbot.utils.math_utils module¶
toddlerbot.utils.misc_utils module¶
- toddlerbot.utils.misc_utils.camel2snake(camel_str: str) str ¶
Converts a CamelCase string to snake_case.
- Parameters:
camel_str (str) – The CamelCase string to be converted.
- Returns:
The converted snake_case string.
- Return type:
str
- toddlerbot.utils.misc_utils.dataclass2dict(obj)¶
Converts a dataclass instance to a dictionary, including nested dataclasses.
- Parameters:
obj – A dataclass instance to be converted.
- Returns:
A dictionary representation of the dataclass, with nested dataclasses also converted to dictionaries.
- Return type:
dict
- toddlerbot.utils.misc_utils.dump_profiling_data(prof_path: str = 'profile_output.lprof')¶
Save profiling data to a specified file path.
- Parameters:
prof_path (str) – The file path where the profiling data will be saved. Defaults to “profile_output.lprof”.
- toddlerbot.utils.misc_utils.find_latest_file_with_time_str(directory: str, file_prefix: str = '') str | None ¶
Finds the file with the latest timestamp (YYYYMMDD_HHMMSS) in the given directory, for files ending with the specified suffix.
- Parameters:
directory (str) – Directory to search for files.
file_suffix (str) – The suffix to match (e.g., ‘.pkl’, ‘_updated.pkl’).
- Returns:
Full path of the latest file or None if no matching file is found.
- Return type:
str | None
- toddlerbot.utils.misc_utils.log(message: str, header: str = '', level: str = 'info')¶
Logs a message with an optional header and severity level.
- Parameters:
message (str) – The message to log.
header (str, optional) – An optional header for the log message. Defaults to an empty string.
level (str, optional) – The severity level of the log message (e.g., ‘info’, ‘warning’, ‘error’). Defaults to ‘info’.
- toddlerbot.utils.misc_utils.parse_value(value: str)¶
Recursively converts a dataclass to a dictionary, handling nested dataclasses by directly accessing their fields.
- Parameters:
value (str) – The string representation of the value to be parsed.
- Returns:
A dictionary representation of the dataclass.
- Return type:
dict
- toddlerbot.utils.misc_utils.precise_sleep(duration: float)¶
Sleeps for a specified duration with high precision.
- Parameters:
duration (float) – The time to sleep in seconds.
- toddlerbot.utils.misc_utils.profile() Callable[[F], F] ¶
Convert a snake_case string to CamelCase.
- Parameters:
snake_str – The snake_case string to convert.
- Returns:
The CamelCase string.
- toddlerbot.utils.misc_utils.set_seed(seed: int)¶
Sets the random seed for various libraries to ensure reproducibility.
This function sets the seed for Python’s random module, NumPy, and the environment variable PYTHONHASHSEED. If the provided seed is -1, a random seed is generated and used.
- Parameters:
seed (int) – The seed value to set. If -1, a random seed is generated.
- toddlerbot.utils.misc_utils.snake2camel(snake_str: str) str ¶
Converts a snake_case string to CamelCase.
- Parameters:
snake_str (str) – The snake_case string to convert.
- Returns:
The CamelCase string.
- Return type:
str