Skip to content

Commit

Permalink
Add stereo matching validator for multiple camera calibration
Browse files Browse the repository at this point in the history
  • Loading branch information
HViktorTsoi committed Mar 30, 2024
1 parent 1f60227 commit 63d5c7f
Showing 1 changed file with 369 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
#!/usr/bin/env python
# coding=utf-8
from __future__ import division, print_function

import argparse
import thread
import time

import aslam_cv as acv
import cv2
import igraph
import message_filters
import numpy as np
import rospy
import sm
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image, CompressedImage

import kalibr_common as kc

# make numpy print prettier
np.set_printoptions(suppress=True)


class CameraChainValidator(object):
def __init__(self, chainConfig):

self.current_cam_msgs = None

self.chainConfig = chainConfig
self.numCameras = chainConfig.numCameras()
self.bridge = CvBridge()

# initialize the cameras in the chain
self.G = igraph.Graph(self.numCameras)
self.monovalidators = []
for cidx in range(0, self.numCameras):
camConfig = chainConfig.getCameraParameters(cidx)

# create a mono instance for each cam (detection and mono view)
monovalidator = MonoCameraValidator(camConfig)
self.monovalidators.append(monovalidator)

# add edges to overlap graph
overlaps = chainConfig.getCamOverlaps(cidx)
for overlap in overlaps:
# add edge if it isn't existing yet
try:
edge_idx = self.G.get_eid(cidx, overlap)
except:
self.G.add_edges([(cidx, overlap)])

# prepare the rectification maps
for edge in self.G.es:
cidx_src = edge.source
cidx_dest = edge.target

edge["rect_map"] = dict();
edge["R"] = dict();
edge["A"] = dict();

edge["rect_map"][cidx_src], \
edge["rect_map"][cidx_dest], \
edge["R"][cidx_src], \
edge["R"][cidx_dest], \
edge["A"][cidx_src], \
edge["A"][cidx_dest] = self.prepareStereoRectificationMaps(cidx_src, cidx_dest)

# register the callback for the synchronized images
sync_sub = message_filters.TimeSynchronizer([val.image_sub for val in self.monovalidators], 1000)
sync_sub.registerCallback(self.synchronizedCallback)

# initialize message throttler
self.timeLast = 0

thread.start_new_thread(self.processing_thread, ())

def processing_thread(self):
while True:
cam_msgs = self.current_cam_msgs
if cam_msgs is None:
continue

# process the images of all cameras
for cam_nr, msg in enumerate(cam_msgs):

# convert image to numpy
try:
if (type(msg) is CompressedImage):
cv_image = cv2.imdecode(np.fromstring(msg.data, np.uint8), cv2.IMREAD_COLOR)
else:
if (msg.encoding == "rgb8"):
cv_image = np.squeeze(np.array(self.bridge.imgmsg_to_cv2(msg, "mono8")))
else:
cv_image = self.bridge.imgmsg_to_cv2(msg)
np_image = np.array(cv_image)
if np_image.shape[1] > 1:
np_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2GRAY)
except CvBridgeError, e:
print(e)

# get the corresponding monovalidator instance
validator = self.monovalidators[cam_nr]

# undistort the image
if type(validator.camera.geometry) == acv.DistortedOmniCameraGeometry:
validator.undist_image = validator.undistorter.undistortImageToPinhole(np_image)
else:
validator.undist_image = validator.undistorter.undistortImage(np_image)

# generate all rectification views
for edge in self.G.es:
cidx_src = edge.source
cidx_dest = edge.target
self.generatePairView(cidx_src, cidx_dest)

cv2.waitKey(1)

def synchronizedCallback(self, *cam_msgs):
# throttle image processing
rate = 100 # Hz
timeNow = time.time()
if (timeNow - self.timeLast < 1.0 / rate) and self.timeLast != 0:
return
self.timeLast = timeNow
self.current_cam_msgs = cam_msgs

# returns transformation T_to_from
def getTransformationCamFromTo(self, cidx_from, cidx_to):
# build pose chain (target->cam0->baselines->camN)
lowid = min((cidx_from, cidx_to))
highid = max((cidx_from, cidx_to))

T_high_low = sm.Transformation()
for cidx in range(lowid, highid):
baseline_HL = self.chainConfig.getExtrinsicsLastCamToHere(cidx + 1)
T_high_low = baseline_HL * T_high_low

if cidx_from < cidx_to:
T_BA = T_high_low
else:
T_BA = T_high_low.inverse()

return T_BA

def rectifyAndStereoMatching(self, imageA, mapA, imageB, mapB):
# rectify images
rect_image_A = cv2.remap(imageA,
mapA[0],
mapA[1],
cv2.INTER_LINEAR)

rect_image_B = cv2.remap(imageB,
mapB[0],
mapB[1],
cv2.INTER_LINEAR)

# stereo match
scale_downsample = parsed.scale
rect_image_A = cv2.resize(rect_image_A, dsize=(
rect_image_A.shape[1] // scale_downsample,
rect_image_A.shape[0] // scale_downsample,
))
rect_image_B = cv2.resize(rect_image_B, dsize=(
rect_image_B.shape[1] // scale_downsample,
rect_image_B.shape[0] // scale_downsample,
))
# flip
rect_image_A = cv2.rotate(rect_image_A, cv2.ROTATE_180)
rect_image_B = cv2.rotate(rect_image_B, cv2.ROTATE_180)

if parsed.matcher == 'bm':
# BM Matching
stereo = cv2.StereoBM_create(numDisparities=32, blockSize=15)
elif parsed.matcher == 'sgbm':
# SGBM Matching
window_size = 5
stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=48, # max_disp has to be dividable by 16 f. E. HH 192, 256
blockSize=5,
P1=8 * 3 * window_size ** 2,
# wsize default 3; 5; 7 for SGBM reduced size image; 15 for SGBM full size image (1300px and above); 5 Works nicely
P2=32 * 3 * window_size ** 2,
disp12MaxDiff=1,
uniquenessRatio=15,
speckleWindowSize=0,
speckleRange=2,
preFilterCap=63,
mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY
)
else:
raise NotImplementedError('Stereo matching method {} not supported'.format(parsed.method))

# stereo compute
disparity = stereo.compute(rect_image_A, rect_image_B)
# normalize
disparity = np.int_(256 * np.float_(disparity - disparity.min()) / (disparity.max() - disparity.min())) \
.astype(np.uint8)
# print(disparity.max(), disparity.min())
depth_map = cv2.applyColorMap((255 - disparity).astype(np.uint8), cv2.COLORMAP_JET)
# remove useless boundary
depth_map[np.where((disparity == 0) | (rect_image_A == 0) | (rect_image_B == 0))] = 0
# combine the images
np_rect_image = np.hstack((rect_image_A, rect_image_B))

return np_rect_image, depth_map

def generatePairView(self, camAnr, camBnr):
# prepare the window
windowName = "Rectified view (cam{0} and cam{1})".format(camAnr, camBnr)
windowNameDisparity = "Disparity from (cam{0} and cam{1})".format(camAnr, camBnr)
# cv2.namedWindow(windowName, 1)

# get the mono validators for each cam
camA = self.monovalidators[camAnr]
camB = self.monovalidators[camBnr]

# rectify the undistorted images
edge_idx = self.G.get_eid(camAnr, camBnr)
edge = self.G.es[edge_idx]

np_image_rect, depth_map = self.rectifyAndStereoMatching(camA.undist_image,
edge["rect_map"][camAnr],
camB.undist_image,
edge["rect_map"][camBnr])

# draw some epilines
np_image_rect = cv2.cvtColor(np_image_rect, cv2.COLOR_GRAY2BGR)
n = 10
for i in range(0, n):
y = int(np_image_rect.shape[0] * i / n)
cv2.line(np_image_rect, (0, y), (2 * np_image_rect.shape[1], y), (0, 255, 0))

# cv2.imshow(windowName, cv2.resize(np_image_rect, (np_image_rect.shape[1] // 2, np_image_rect.shape[0] // 2)))

# depth_map[np.where(rect_image_B == 0)] = 0
cv2.imshow(windowName, np_image_rect)
cv2.imshow(windowNameDisparity, depth_map)

def prepareStereoRectificationMaps(self, camAnr, camBnr):
# get the camera parameters for the undistorted cameras
camIdealA = self.monovalidators[camAnr].undist_camera.projection().getParameters().flatten()
camIdealB = self.monovalidators[camBnr].undist_camera.projection().getParameters().flatten()
camIdealA = np.array([[camIdealA[0], 0, camIdealA[2]], [0, camIdealA[1], camIdealA[3]], [0, 0, 1]])
camIdealB = np.array([[camIdealB[0], 0, camIdealB[2]], [0, camIdealB[1], camIdealB[3]], [0, 0, 1]])
imageSize = (self.monovalidators[camAnr].undist_camera.projection().ru(),
self.monovalidators[camAnr].undist_camera.projection().rv())

# get the baseline between the cams
baseline_BA = self.getTransformationCamFromTo(camAnr, camBnr)

##
# A.Fusiello, E. Trucco, A. Verri: A compact algorithm for recification of stereo pairs, 1999
##
Poa = np.matrix(camIdealA) * np.hstack(
(np.matrix(np.eye(3)), np.matrix(np.zeros((3, 1))))) # use camA coords as world frame...
Pob = np.matrix(camIdealB) * np.hstack((np.matrix(baseline_BA.C()), np.matrix(baseline_BA.t()).T))

# optical centers (in camA's coord sys)
c1 = -np.linalg.inv(Poa[:, 0:3]) * Poa[:, 3]
c2 = -np.linalg.inv(Pob[:, 0:3]) * Pob[:, 3]

# get "mean" rotation between cams
old_z_mean = (baseline_BA.C()[2, :].flatten() + sm.Transformation().T()[2, 0:3]) / 2.0
v1 = c1 - c2 # newx-axis = direction of baseline
v2 = np.cross(np.matrix(old_z_mean).flatten(), v1.flatten()).T # new y axis orthogonal to new x and mean old z
v3 = np.cross(v1.flatten(), v2.flatten()).T # orthogonal to baseline and new y

# normalize
v1 = v1 / np.linalg.norm(v1)
v2 = v2 / np.linalg.norm(v2)
v3 = v3 / np.linalg.norm(v3)

# create rotation matrix
R = np.hstack((np.hstack((v1, v2)), v3)).T

# new intrinsic parameters
A = (camIdealA + camIdealB) / 2.0

# new projection matrices
Pna = A * np.hstack((R, -R * c1))
Pnb = A * np.hstack((R, -R * c2))

# rectyfing transforms
Ta = Pna[0:3, 0:3] * np.linalg.inv(Poa[0:3, 0:3])
Tb = Pnb[0:3, 0:3] * np.linalg.inv(Pob[0:3, 0:3])

Ra = R # camA=world, then to rectified coords
Rb = R * baseline_BA.inverse().C() # to world then to rectified coords

# create the rectification maps
rect_map_x_a, rect_map_y_a = cv2.initUndistortRectifyMap(camIdealA,
np.zeros((4, 1)),
Ra,
A,
imageSize,
cv2.CV_16SC2)

rect_map_x_b, rect_map_y_b = cv2.initUndistortRectifyMap(camIdealB,
np.zeros((4, 1)),
Rb,
A,
imageSize,
cv2.CV_16SC2)

return (rect_map_x_a, rect_map_y_a), (rect_map_x_b, rect_map_y_b), Ra, Rb, A, A


class MonoCameraValidator(object):
def __init__(self, camConfig):

print("initializing camera geometry")
self.camera = kc.ConfigReader.AslamCamera.fromParameters(camConfig)

# print details
print("Camera {0}:".format(camConfig.getRosTopic()))
camConfig.printDetails();

self.topic = camConfig.getRosTopic()
self.windowName = "Camera: {0}".format(self.topic)
# cv2.namedWindow(self.windowName, 0)
# register the cam topic to the message synchronizer
if parsed.compressed_image:
self.image_sub = message_filters.Subscriber(self.topic, CompressedImage)
else:
self.image_sub = message_filters.Subscriber(self.topic, Image)

# create image undistorter
alpha = 1.0
scale = 1.0
self.undistorter = self.camera.undistorterType(self.camera.geometry, cv2.INTER_LINEAR, alpha, scale)

if type(self.camera.geometry) == acv.DistortedOmniCameraGeometry:
# convert omni image to pinhole image aswell
self.undist_camera = self.undistorter.getIdealPinholeGeometry()
else:
self.undist_camera = self.undistorter.getIdealGeometry()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Validate the intrinsics of a camera.')
parser.add_argument('--cam', dest='chainYaml', help='Camera configuration as yaml file', required=True)
parser.add_argument('--verbose', action='store_true', dest='verbose', help='Verbose output')
parser.add_argument('--compressed_image', action='store_true', help='Using CompressedImage msg', required=False)
parser.add_argument('--scale', type=int, default=2,
help='Downsample scale, using larger downsample scale to speed up stereo matching',
required=False)
parser.add_argument('--matcher', default='sgbm', help='Stereo matching algorithm [sgbm|bm]', required=False)
parsed = parser.parse_args()

if parsed.verbose:
sm.setLoggingLevel(sm.LoggingLevel.Debug)
else:
sm.setLoggingLevel(sm.LoggingLevel.Info)

camchain = kc.ConfigReader.CameraChainParameters(parsed.chainYaml)

# create the validator
chain_validator = CameraChainValidator(camchain)

# ros message loops
rospy.init_node('kalibr_validator', anonymous=True)
try:
rospy.spin()
except KeyboardInterrupt:
print("Shutting down")

cv2.destroyAllWindows()

0 comments on commit 63d5c7f

Please sign in to comment.