import argparse
from sic_framework import SICComponentManager, SICService, utils
from sic_framework.core.connector import SICConnector
from sic_framework.core.message_python2 import (
CompressedImageMessage,
SICConfMessage,
SICMessage,
)
from sic_framework.core.sensor_python2 import SICSensor
if utils.PYTHON_VERSION_IS_2:
import random
import cv2
import numpy as np
import qi
from naoqi import ALProxy
from PIL import Image
[docs]
class NaoqiCameraConf(SICConfMessage):
[docs]
def __init__(
self,
naoqi_ip="127.0.0.1",
port=9559,
cam_id=0,
res_id=2,
fps=30,
brightness=None,
contrast=None,
saturation=None,
hue=None,
gain=None,
hflip=None,
vflip=None,
auto_exposition=None,
auto_white_bal=None,
manual_exposure_val=None,
auto_exp_algo=None,
sharpness=None,
back_light_comp=None,
auto_focus=None,
manual_focus_value=None,
):
"""
Initialize camera configuration and optional device parameters.
For parameter meaning and defaults, see:
- http://doc.aldebaran.com/2-8/family/nao_technical/video_naov6.html#naov6-video
- http://doc.aldebaran.com/2-1/family/robots/video_robot.html
:param str naoqi_ip: NAOqi host IP.
:param int port: NAOqi TCP port.
:param int cam_id: Camera ID to use.
:param int res_id: Resolution ID.
:param int fps: Target frames per second.
:param Optional[int] brightness: Camera brightness.
:param Optional[int] contrast: Camera contrast.
:param Optional[int] saturation: Camera color saturation.
:param Optional[int] hue: Camera hue adjustment.
:param Optional[int] gain: Camera gain level.
:param Optional[int] hflip: Horizontal flip toggle.
:param Optional[int] vflip: Vertical flip toggle.
:param Optional[int] auto_exposition: Auto exposure toggle.
:param Optional[int] auto_white_bal: Auto white balance toggle.
:param Optional[int] manual_exposure_val: Manual exposure value.
:param Optional[int] auto_exp_algo: Auto exposure algorithm.
:param Optional[int] sharpness: Image sharpness.
:param Optional[int] back_light_comp: Backlight compensation toggle.
:param Optional[int] auto_focus: Auto focus toggle.
:param Optional[int] manual_focus_value: Manual focus value.
Parameter Defaults:
brightness: 55
contrast: 32
saturation: 128
hue: 0
gain: 32
hflip: 0
vflip: 0
auto_exposition: 1
auto_white_bal: 1
auto_exp_algo: 1
sharpness: 0
back_light_comp: 1
auto_focus: 0
manual_focus_value: 0
"""
SICConfMessage.__init__(self)
self.naoqi_ip = naoqi_ip
self.port = port
self.cam_id = cam_id
self.res_id = res_id
self.color_id = 11 # RGB
self.fps = fps
self.brightness = brightness
self.contrast = contrast
self.saturation = saturation
self.hue = hue
self.gain = gain
self.hflip = hflip
self.vflip = vflip
self.auto_exposition = auto_exposition
self.auto_white_bal = auto_white_bal
self.manual_exposure_val = manual_exposure_val
self.auto_exp_algo = auto_exp_algo
self.sharpness = sharpness
self.back_light_comp = back_light_comp
self.auto_focus = auto_focus
self.manual_focus_value = manual_focus_value
[docs]
class BaseNaoqiCameraSensor(SICSensor):
[docs]
def __init__(self, *args, **kwargs):
super(BaseNaoqiCameraSensor, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://{}:{}".format(self.params.naoqi_ip, self.params.port))
self.video_service = self.session.service("ALVideoDevice")
# Dont actively set default parameters, this causes weird behaviour because the parameters are ususally not at the documented default.
if self.params.brightness is not None:
self.video_service.setParameter(
self.params.cam_id, 0, self.params.brightness
)
if self.params.contrast is not None:
self.video_service.setParameter(self.params.cam_id, 1, self.params.contrast)
if self.params.saturation is not None:
self.video_service.setParameter(
self.params.cam_id, 2, self.params.saturation
)
if self.params.hue is not None:
self.video_service.setParameter(self.params.cam_id, 3, self.params.hue)
if self.params.gain is not None:
self.video_service.setParameter(self.params.cam_id, 6, self.params.gain)
if self.params.hflip is not None:
self.video_service.setParameter(self.params.cam_id, 7, self.params.hflip)
if self.params.vflip is not None:
self.video_service.setParameter(self.params.cam_id, 8, self.params.vflip)
if self.params.auto_exposition is not None:
self.video_service.setParameter(
self.params.cam_id, 11, self.params.auto_exposition
)
if self.params.auto_white_bal is not None:
self.video_service.setParameter(
self.params.cam_id, 12, self.params.auto_white_bal
)
if self.params.manual_exposure_val is not None:
self.video_service.setParameter(
self.params.cam_id, 12, self.params.manual_exposure_val
)
if self.params.auto_exp_algo is not None:
self.video_service.setParameter(
self.params.cam_id, 22, self.params.auto_exp_algo
)
if self.params.sharpness is not None:
self.video_service.setParameter(
self.params.cam_id, 24, self.params.sharpness
)
if self.params.back_light_comp is not None:
self.video_service.setParameter(
self.params.cam_id, 34, self.params.back_light_comp
)
if self.params.auto_focus is not None:
self.video_service.setParameter(
self.params.cam_id, 40, self.params.auto_focus
)
if self.params.manual_focus_value is not None:
self.video_service.setParameter(
self.params.cam_id, 43, self.params.manual_focus_value
)
self.video_service.setParameter(0, 35, 1) # Keep Alive parameter
self.videoClient = self.video_service.subscribeCamera(
"Camera_{}".format(random.randint(0, 100000)),
self.params.cam_id,
self.params.res_id,
self.params.color_id,
self.params.fps,
)
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for a single camera sensor.
:returns: Camera configuration instance.
:rtype: NaoqiCameraConf
"""
return NaoqiCameraConf()
[docs]
@staticmethod
def get_output():
"""
Declare the output message type produced by this sensor.
:returns: Compressed image message class.
"""
return CompressedImageMessage
[docs]
def execute(self):
"""
Grab one image frame from the NAOqi camera and return it.
:returns: Compressed image containing the RGB frame as a NumPy array.
:rtype: CompressedImageMessage
"""
try:
# get the actual image from the NaoImage type
naoImage = self.video_service.getImageRemote(self.videoClient)
imageWidth = naoImage[0]
imageHeight = naoImage[1]
array = naoImage[6]
image_string = str(bytearray(array))
# Create a PIL Image from our pixel array.
im = Image.frombytes("RGB", (imageWidth, imageHeight), image_string)
return CompressedImageMessage(np.asarray(im))
except Exception as e:
if self._stopped.is_set() or self._signal_to_stop.is_set():
return
else:
raise e
[docs]
def _cleanup(self):
"""
Release NAOqi camera resources after the sensor thread has stopped.
"""
try:
if hasattr(self, "videoClient") and self.videoClient:
self.video_service.unsubscribe(self.videoClient)
except Exception:
pass
try:
self.session.close()
except Exception:
pass
##################
# Top Camera
##################
[docs]
class NaoqiTopCameraSensor(BaseNaoqiCameraSensor):
[docs]
def __init__(self, *args, **kwargs):
super(NaoqiTopCameraSensor, self).__init__(*args, **kwargs)
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for the top camera.
:returns: Configuration with cam_id=0 and res_id=1.
:rtype: NaoqiCameraConf
"""
return NaoqiCameraConf(cam_id=0, res_id=1)
[docs]
class NaoqiTopCamera(SICConnector):
component_class = NaoqiTopCameraSensor
##################
# Bottom Camera
##################
[docs]
class NaoqiBottomCameraSensor(BaseNaoqiCameraSensor):
"""
Sensor for the NAO bottom camera (cam_id=1).
"""
[docs]
def __init__(self, *args, **kwargs):
super(NaoqiBottomCameraSensor, self).__init__(*args, **kwargs)
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for the bottom camera.
:returns: Configuration with cam_id=1 and res_id=1.
:rtype: NaoqiCameraConf
"""
return NaoqiCameraConf(cam_id=1, res_id=1)
[docs]
class NaoqiBottomCamera(SICConnector):
component_class = NaoqiBottomCameraSensor
##################
# Stereo Pepper Camera
##################
[docs]
class StereoImageMessage(SICMessage):
_compress_images = True
[docs]
def __init__(self, left, right):
"""
Create a stereo image message.
:param numpy.ndarray left: Left image array.
:param numpy.ndarray right: Right image array.
"""
self.left_image = left
self.right_image = right
[docs]
class NaoStereoCameraConf(NaoqiCameraConf):
[docs]
def __init__(
self,
calib_params=None,
naoqi_ip="127.0.0.1",
port=9559,
cam_id=0,
res_id=2,
color_id=11,
fps=30,
convert_bw=True,
use_calib=True,
):
super(NaoStereoCameraConf, self).__init__(
naoqi_ip, port, cam_id, res_id, color_id, fps
) # TODO: correct?
if calib_params is None:
calib_params = {}
self.cameramtrx = calib_params.get("cameramtrx", None)
self.K = calib_params.get("K", None)
self.D = calib_params.get("D", None)
self.H1 = calib_params.get("H1", None)
self.H2 = calib_params.get("H2", None)
self.convert_bw = convert_bw # Convert images to b&w before sending
self.use_calib = (
use_calib # We don't want to rectify the images if we are calibrating
)
[docs]
class StereoPepperCameraSensor(BaseNaoqiCameraSensor):
[docs]
def __init__(self, *args, **kwargs):
super(StereoPepperCameraSensor, self).__init__(*args, **kwargs)
[docs]
@staticmethod
def get_conf():
# TODO: by default read calibration from disk
return NaoStereoCameraConf(
calib_params={
"cameramtrx": None,
"K": None,
"D": None,
"H1": None,
"H2": None,
},
cam_id=3,
res_id=15,
convert_bw=True,
use_calib=False,
)
[docs]
def undistort(self, img):
"""
Remove lens distortion using intrinsic matrix and distortion coefficients.
:param numpy.ndarray img: Image to undistort.
:returns: Undistorted image.
:rtype: numpy.ndarray
"""
assert self.params.K is not None, "Calibration parameter K not set"
assert self.params.D is not None, "Calibration parameter D not set"
return cv2.undistort(
img, self.params.K, self.params.D, None, self.params.cameramtrx
)
[docs]
def warp(self, img, is_left):
"""
Apply perspective warp using rectification homography.
:param numpy.ndarray img: Image to warp.
:param bool is_left: Selects H1 for left and H2 for right.
:returns: Warped image.
:rtype: numpy.ndarray
:raises AssertionError: If `H1` or `H2` is missing.
"""
H_matrix = self.params.H1 if is_left else self.params.H2
assert H_matrix is not None, "Calibration parameter H1 or H2 not set"
return cv2.warpPerspective(img, H_matrix, img.shape[::-1])
[docs]
def rectify(self, img, is_left):
"""
Undistort and warp an image for rectification.
:param numpy.ndarray img: Image to rectify.
:param bool is_left: Whether this is a left frame.
:returns: Rectified image.
:rtype: numpy.ndarray
"""
if len(img.shape) == 2:
return self.warp(self.undistort(img), is_left)
img = np.concatenate(
[
self.rectify(img[..., i], is_left)[..., np.newaxis]
for i in range(img.shape[-1])
],
axis=2,
)
return img
[docs]
def execute(self):
# Get the regular stereo image
img_message = super(StereoPepperCameraSensor, self).execute().image
if self.params.convert_bw:
img_message = cv2.cvtColor(img_message, cv2.COLOR_BGR2GRAY)
# Split the stereo image into separate left and right images
left, right = (
img_message[:, : img_message.shape[1] // 2, ...],
img_message[:, img_message.shape[1] // 2 :, ...],
)
# Rectify the images to account for lens distortion and camera mis-alignment
if self.params.use_calib:
left = self.rectify(left, is_left=True)
right = self.rectify(right, is_left=False)
return StereoImageMessage(left, right)
[docs]
@staticmethod
def get_output():
"""
Declare the output message type for this sensor.
:returns: StereoImageMessage type.
"""
return StereoImageMessage
[docs]
class StereoPepperCamera(SICConnector):
component_class = StereoPepperCameraSensor
##################
# Depth Pepper Camera
##################
[docs]
class DepthPepperCameraSensor(BaseNaoqiCameraSensor):
[docs]
def __init__(self, *args, **kwargs):
super(DepthPepperCameraSensor, self).__init__(*args, **kwargs)
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for the depth camera.
:returns: Configuration with cam_id=2 and res_id=10.
:rtype: NaoqiCameraConf
"""
return NaoqiCameraConf(cam_id=2, res_id=10)
[docs]
class DepthPepperCamera(SICConnector):
component_class = DepthPepperCameraSensor
# Example: run the top and bottom camera sensors directly.
if __name__ == "__main__":
SICComponentManager([NaoqiTopCameraSensor, NaoqiBottomCameraSensor])