Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to create annots/nuScenes.yaml #30

Open
venshow-w opened this issue Sep 5, 2024 · 3 comments
Open

how to create annots/nuScenes.yaml #30

venshow-w opened this issue Sep 5, 2024 · 3 comments

Comments

@venshow-w
Copy link

Thank you for sharing your work.
I want to know how to create the nuScenes.yaml file, especially the action data in the file.

@Little-Podi
Copy link
Collaborator

Do you mean the JSON files that provide nuScenes annotations? Please refer to issue #3 and issue #25 for more details. I'm happy to help if you have any other questions.

@xiaowenhe
Copy link

@Little-Podi , Can you provide a complete script for generating a nuScenes.json file? Thank you!

@Little-Podi
Copy link
Collaborator

@xiaowenhe Sorry for the late reply. Here is the complete script to generate nuScenes.json.

import json

import numpy as np
from nuscenes.can_bus.can_bus_api import NuScenesCanBus
from nuscenes.nuscenes import NuScenes
from nuscenes.utils import splits
from nuscenes.utils.geometry_utils import transform_matrix
from pyquaternion import Quaternion

version = "v1.0-trainval"
data_root = "path_to/nuscenes"
key_frame = 5
valid_scenes = splits.train  # val


def get_global_pose(rec, nusc, inverse=False):
    lidar_token = rec["data"]["LIDAR_TOP"]
    lidar_sample_data = nusc.get("sample_data", lidar_token)

    sd_ep = nusc.get("ego_pose", lidar_sample_data["ego_pose_token"])
    sd_cs = nusc.get("calibrated_sensor", lidar_sample_data["calibrated_sensor_token"])
    if inverse:
        sensor_from_ego = transform_matrix(sd_cs["translation"], Quaternion(sd_cs["rotation"]), inverse=True)
        ego_from_global = transform_matrix(sd_ep["translation"], Quaternion(sd_ep["rotation"]), inverse=True)
        pose = sensor_from_ego.dot(ego_from_global)
    else:
        global_from_ego = transform_matrix(sd_ep["translation"], Quaternion(sd_ep["rotation"]), inverse=False)
        ego_from_sensor = transform_matrix(sd_cs["translation"], Quaternion(sd_cs["rotation"]), inverse=False)
        pose = global_from_ego.dot(ego_from_sensor)
    return pose


def get_matrix(calibrated_data, inverse=False):
    output = np.eye(4)
    output[:3, :3] = Quaternion(calibrated_data["rotation"]).rotation_matrix
    output[:3, 3] = calibrated_data["translation"]
    if inverse:
        output = np.linalg.inv(output)
    return output


dataset = NuScenes(version=version, dataroot=data_root, verbose=False)
can_bus = NuScenesCanBus(dataroot=data_root)
all_samples = dataset.sample

all_samples.sort(key=lambda x: (x["scene_token"], x["timestamp"]))
valid_sample_indices = list()
for index in range(len(all_samples)):
    is_valid_data = True
    previous_sample = None
    for t in range(key_frame):
        index_t = index + t

        # exceed the dataset capacity
        if index_t >= len(all_samples):
            is_valid_data = False
            break

        current_sample = all_samples[index_t]

        # check if scene is the same
        if (
                dataset.get("scene", current_sample["scene_token"])["name"] not in valid_scenes or
                (previous_sample is not None and current_sample["scene_token"] != previous_sample["scene_token"])
        ):
            is_valid_data = False
            break

        previous_sample = current_sample

    if is_valid_data:
        valid_sample_indices.append(index)

new_sample_indices = list()
new_samples = list()
for index in valid_sample_indices:
    sample_dict = dict()
    frame_files = list()
    enough_sweeps = True
    first_sample = all_samples[index]
    camera_token = first_sample["data"]["CAM_FRONT"]
    for i in range(25):
        if camera_token == "":
            enough_sweeps = False
            break
        camera_data = dataset.get("sample_data", camera_token)
        # if i % 6 == 0 and not camera_data["is_key_frame"]:
        #     enough_sweeps = False
        #     break
        frame_files.append(camera_data["filename"])
        previous_frame = camera_data
        camera_token = previous_frame["next"]
    if enough_sweeps:
        new_sample_indices.append(index)
        sample_dict.update({"frames": frame_files})
        new_samples.append(sample_dict)

for index, sample_dict in zip(new_sample_indices, new_samples):
    sample = all_samples[index]
    scene_name = dataset.get("scene", sample["scene_token"])["name"]
    try:
        vehicle_monitor = can_bus.get_messages(scene_name, "vehicle_monitor")
        has_can_bus = True
    except:
        has_can_bus = False

    gt_trajectory = np.zeros((key_frame, 2), np.float64)

    current_ego_pose = get_global_pose(sample, dataset, inverse=True)

    camera_token = sample["data"]["CAM_FRONT"]
    camera_data = dataset.get("sample_data", camera_token)
    assert sample_dict["frames"][0] == camera_data["filename"]

    origin = None
    future_global_pose = None
    speed = list()
    angle = list()
    for i in range(key_frame):
        index_i = index + i
        if index_i < len(all_samples):
            future_sample = all_samples[index_i]

            future_global_pose = get_global_pose(future_sample, dataset, inverse=False)
            future_ego_pose = current_ego_pose.dot(future_global_pose)

            origin = np.array(future_ego_pose[:3, 3])

            gt_trajectory[i, :] = [origin[0], origin[1]]

            if has_can_bus:
                time_diff = None
                for each_message in vehicle_monitor:
                    if time_diff is None:
                        time_diff = abs(each_message["utime"] - future_sample["timestamp"])
                        last_message = each_message
                    else:
                        if abs(each_message["utime"] - future_sample["timestamp"]) < time_diff:
                            time_diff = abs(each_message["utime"] - future_sample["timestamp"])
                            last_message = each_message
                        else:
                            break
                speed.append(last_message["vehicle_speed"])
                angle.append(last_message["steering"])

    sample_dict.update({"traj": gt_trajectory.flatten().tolist()})
    sample_dict.update({"speed": speed})
    sample_dict.update({"angle": angle})

    if origin[0] >= 2:  # turn right
        sample_dict.update({"cmd": 0})
    elif origin[0] <= -2:  # turn left
        sample_dict.update({"cmd": 1})
    elif origin[1] <= 2:  # stop
        sample_dict.update({"cmd": 2})
    else:  # go straight
        sample_dict.update({"cmd": 3})

    ego_pose = dataset.get("ego_pose", camera_data["ego_pose_token"])
    global_to_ego = get_matrix(ego_pose, True)  # global -> ego
    calibrated_sensor = dataset.get("calibrated_sensor", camera_data["calibrated_sensor_token"])
    ego_to_camera = get_matrix(calibrated_sensor, True)  # ego -> camera
    camera_intrinsic = np.eye(4)
    camera_intrinsic[:3, :3] = calibrated_sensor["camera_intrinsic"]
    # global -> camera_ego_pose -> image
    global_to_image = camera_intrinsic @ ego_to_camera @ global_to_ego

    goal_point = np.zeros(4, np.float64)
    goal_point[:2] = np.array(future_global_pose[:2, 3])
    goal_point[3] = 1

    image_points = goal_point @ global_to_image.T  # global -> image
    image_points[:2] /= image_points[[2]]

    sample_dict.update({"z": image_points[2]})
    sample_dict.update({"goal": image_points[:2].tolist()})

print(len(new_samples))
new_anno_file = "nuScenes.json"
with open(new_anno_file, "w") as new_anno_json:
    json.dump(new_samples, new_anno_json, indent=4)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants