Source code for pyrobot.sensors.camera
from __future__ import print_function
import copy
import importlib
import os
import sys
import threading
import time
from abc import ABCMeta, abstractmethod
import numpy as np
import rospy
import tf
from geometry_msgs.msg import Twist, Pose, PoseStamped
from sensor_msgs.msg import JointState, CameraInfo, Image
import pyrobot.utils.util as prutil
from pyrobot.utils.util import try_cv2_import, append_namespace
cv2 = try_cv2_import()
from cv_bridge import CvBridge, CvBridgeError
import message_filters
[docs]class Camera(object):
"""
This is a parent class on which the robot
specific Camera classes would be built.
"""
__metaclass__ = ABCMeta
[docs] def __init__(self, configs, ns=""):
"""
Constructor for Camera parent class.
:param configs: configurations for camera
:type configs: YACS CfgNode
"""
self.configs = configs
self.ns = ns
self.cv_bridge = CvBridge()
self.camera_info_lock = threading.RLock()
self.camera_img_lock = threading.RLock()
self.rgb_img = None
self.depth_img = None
self.camera_info = None
self.camera_P = None
info_topic = append_namespace(self.ns, self.configs.ROSTOPIC_CAMERA_INFO_STREAM)
rospy.Subscriber(
self.configs.ROSTOPIC_CAMERA_INFO_STREAM,
CameraInfo,
self._camera_info_callback,
)
rgb_topic = append_namespace(self.ns, self.configs.ROSTOPIC_CAMERA_RGB_STREAM)
self.rgb_sub = message_filters.Subscriber(rgb_topic, Image)
depth_topic = append_namespace(self.ns, self.configs.ROSTOPIC_CAMERA_DEPTH_STREAM)
self.depth_sub = message_filters.Subscriber(depth_topic, Image)
img_subs = [self.rgb_sub, self.depth_sub]
self.sync = message_filters.ApproximateTimeSynchronizer(
img_subs, queue_size=10, slop=0.2
)
self.sync.registerCallback(self._sync_callback)
def _sync_callback(self, rgb, depth):
self.camera_img_lock.acquire()
try:
self.rgb_img = self.cv_bridge.imgmsg_to_cv2(rgb, "bgr8")
self.rgb_img = self.rgb_img[:, :, ::-1]
self.depth_img = self.cv_bridge.imgmsg_to_cv2(depth, "passthrough")
except CvBridgeError as e:
rospy.logerr(e)
self.camera_img_lock.release()
def _camera_info_callback(self, msg):
self.camera_info_lock.acquire()
self.camera_info = msg
self.camera_P = np.array(msg.P).reshape((3, 4))
self.camera_info_lock.release()
[docs] def get_rgb(self):
"""
This function returns the RGB image perceived by the camera.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
rgb = copy.deepcopy(self.rgb_img)
self.camera_img_lock.release()
return rgb
[docs] def get_depth(self):
"""
This function returns the depth image perceived by the camera.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
depth = copy.deepcopy(self.depth_img)
self.camera_img_lock.release()
depth = depth / self.configs.DEPTH_MAP_FACTOR
return depth
[docs] def get_rgb_depth(self):
"""
This function returns both the RGB and depth
images perceived by the camera.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
rgb = copy.deepcopy(self.rgb_img)
depth = copy.deepcopy(self.depth_img)
self.camera_img_lock.release()
return rgb, depth
[docs] def get_intrinsics(self):
"""
This function returns the camera intrinsics.
:rtype: np.ndarray
"""
if self.camera_P is None:
return self.camera_P
self.camera_info_lock.acquire()
P = copy.deepcopy(self.camera_P)
self.camera_info_lock.release()
return P[:3, :3]