Source code for pyrobot.robots.arm

from __future__ import print_function

import copy
import importlib
import os
import sys
import threading
import time
from abc import ABCMeta, abstractmethod

import numpy as np
import rospy
import tf
from geometry_msgs.msg import Twist, Pose, PoseStamped
from sensor_msgs.msg import JointState, CameraInfo, Image

import pyrobot.utils.util as prutil

from pyrobot.utils.util import try_cv2_import, append_namespace

cv2 = try_cv2_import()

from cv_bridge import CvBridge, CvBridgeError

import message_filters

import actionlib

from actionlib_msgs.msg import GoalStatus



[docs]class Arm(object): """ This is a parent class on which the robot specific Arm classes would be built. """ __metaclass__ = ABCMeta
[docs] def __init__( self, configs, ns="" ): """ Constructor for Arm parent class. :param configs: configurations for arm """ self.configs = configs self.ns = ns self.joint_state_lock = threading.RLock() self.arm_joint_names = self.configs.JOINT_NAMES self.arm_dof = len(self.arm_joint_names) # Subscribers self._joint_angles = dict() self._joint_velocities = dict() self._joint_efforts = dict() rospy.Subscriber( append_namespace(self.ns, configs.ROSTOPIC_JOINT_STATES), JointState, self._callback_joint_states ) # Publishers self.joint_pub = None self._setup_joint_pub()
[docs] @abstractmethod def go_home(self): """ Reset robot to default home position """ pass
[docs] def get_joint_angles(self): """ Return the joint angles :return: joint_angles :rtype: np.ndarray """ self.joint_state_lock.acquire() joint_angles = [] for joint in self.arm_joint_names: joint_angles.append(self.get_joint_angle(joint)) joint_angles = np.array(joint_angles).flatten() self.joint_state_lock.release() return joint_angles
[docs] def get_joint_velocities(self): """ Return the joint velocities :return: joint_vels :rtype: np.ndarray """ self.joint_state_lock.acquire() joint_vels = [] for joint in self.arm_joint_names: joint_vels.append(self.get_joint_velocity(joint)) joint_vels = np.array(joint_vels).flatten() self.joint_state_lock.release() return joint_vels
[docs] def get_joint_torques(self): """ Return the joint torques :return: joint_torques :rtype: np.ndarray """ self.joint_state_lock.acquire() joint_torques = [] for joint in self.arm_joint_names: try: joint_torques.append(self.get_joint_torque(joint)) except (ValueError, IndexError): rospy.loginfo("Torque value for joint " "[%s] not available!" % joint) joint_torques = np.array(joint_torques).flatten() self.joint_state_lock.release() return joint_torques
[docs] def get_joint_angle(self, joint): """ Return the joint angle of the 'joint' :param joint: joint name :type joint: string :return: joint angle :rtype: float """ if joint not in self.arm_joint_names: raise ValueError("%s not in arm joint list!" % joint) if joint not in self._joint_angles.keys(): raise ValueError("Joint angle for joint $s not available!" % joint) return self._joint_angles[joint]
[docs] def get_joint_velocity(self, joint): """ Return the joint velocity of the 'joint' :param joint: joint name :type joint: string :return: joint velocity :rtype: float """ if joint not in self.arm_joint_names: raise ValueError("%s not in arm joint list!" % joint) if joint not in self._joint_velocities.keys(): raise ValueError("Joint velocity for joint" " $s not available!" % joint) return self._joint_velocities[joint]
[docs] def get_joint_torque(self, joint): """ Return the joint torque of the 'joint' :param joint: joint name :type joint: string :return: joint torque :rtype: float """ if joint not in self.arm_joint_names: raise ValueError("%s not in arm joint list!" % joint) if joint not in self._joint_efforts.keys(): raise ValueError("Joint torque for joint $s" " not available!" % joint) return self._joint_efforts[joint]
[docs] def set_joint_positions(self, positions, wait=True, **kwargs): """ Sets the desired joint angles for all arm joints :param positions: list of length #of joints, angles in radians :param plan: whether to use moveit to plan a path. Without planning, there is no collision checking and each joint will move to its target joint position directly. :param wait: if True, it will wait until the desired joint positions are achieved :type positions: list :type plan: bool :type wait: bool :return: True if successful; False otherwise (timeout, etc.) :rtype: bool """ self._pub_joint_positions(positions) if wait: result = self._loop_angle_pub_cmd(self._pub_joint_positions, positions) return result
[docs] def set_joint_velocities(self, velocities, **kwargs): """ Sets the desired joint velocities for all arm joints :param velocities: target joint velocities :type velocities: list """ self._pub_joint_velocities(velocities)
[docs] def set_joint_torques(self, torques, **kwargs): """ Sets the desired joint torques for all arm joints :param torques: target joint torques :type torques: list """ self._pub_joint_torques(torques)
def _callback_joint_states(self, msg): """ ROS subscriber callback for arm joint state (position, velocity) :param msg: Contains message published in topic :type msg: sensor_msgs/JointState """ self.joint_state_lock.acquire() for idx, name in enumerate(msg.name): if name in self.arm_joint_names: if idx < len(msg.position): self._joint_angles[name] = msg.position[idx] if idx < len(msg.velocity): self._joint_velocities[name] = msg.velocity[idx] if idx < len(msg.effort): self._joint_efforts[name] = msg.effort[idx] self.joint_state_lock.release() def _pub_joint_positions(self, positions): joint_state = JointState() joint_state.position = tuple(positions) self.joint_pub.publish(joint_state) def _pub_joint_velocities(self, velocities): joint_state = JointState() joint_state.velocity = tuple(velocities) self.joint_pub.publish(joint_state) def _pub_joint_torques(self, torques): joint_state = JointState() joint_state.effort = tuple(torques) self.joint_pub.publish(joint_state) def _angle_error_is_small(self, target_joints): cur_joint_vals = self.get_joint_angles() joint_diff = cur_joint_vals - np.array(target_joints) error = np.max(np.abs(joint_diff)) if error <= self.configs.MAX_JOINT_ERROR: return joint_diff, error, True else: return joint_diff, error, False def _loop_angle_pub_cmd(self, cmd, joint_vals): start_time = time.time() vel_zero_times = 0 # wait 10s in worse case success = False for i in range(int(10 / self.configs.WAIT_MIN_TIME)): cmd(joint_vals) res = self._angle_error_is_small(joint_vals) joint_diff, error, eflag = res if eflag: success = True break vels = self.get_joint_velocities() es_time = time.time() - start_time if es_time > 0.5 and np.max(np.abs(vels)) < 0.01: vel_zero_times += 1 else: vel_zero_times = 0 if vel_zero_times > 10: success = False break rospy.sleep(self.configs.WAIT_MIN_TIME) return success def _setup_joint_pub(self): self.joint_pub = rospy.Publisher( append_namespace(self.ns, self.configs.ROSTOPIC_SET_JOINT), JointState, queue_size=1 )