apollo
apollo.ApolloContainer
- class apollo.ApolloContainer.ApolloContainer(apollo_root: str, username: str)
Class to represent Apollo container
- Parameters
apollo_root (str) – root directory of Baidu Apollo
username (str) – unique ID used to identify container
- property container_name: str
Gets the name of the container
- Type
str
- property ip: str
Gets the ip address of the container
- Type
str
- is_running() bool
Checks if the container is running
- Returns
True if running, False otherwise
- Return type
bool
- reset()
Resets the container (e.g., stopps and restarts all related modules)
- reset_bridge_connection()
Close any existing connection to bridge and reconnect
- restart_dreamview()
Restart Dreamview
- restart_modules()
Restart all the necessary modules
- start_bridge()
Start cyber bridge
- start_dreamview()
Start Dreamview
- start_instance(restart=False)
Starts an Apollo instance
- Parameters
restart (bool) – force container to restart
- start_modules()
Start all the necessary modules
- start_recorder(record_id: str)
Starts cyber_recorder
- Parameters
record_id (str) – the name of the record file
- start_sim_control_standalone()
Starts SimControlStandalone module
- stop_all()
Stops SimControl and all other AD related modules
- stop_dreamview()
Stop Dreamview
- stop_modules()
Stop all the necessary modules
- stop_recorder()
Stops cyber_recorder
- stop_sim_control_standalone()
Stops SimControlStandalone module
apollo.ApolloRunner
- class apollo.ApolloRunner.ApolloRunner(nid: int, ctn: apollo.ApolloContainer.ApolloContainer, start: apollo.utils.PositionEstimate, start_time: float, waypoints: List[apollo.utils.PositionEstimate])
Class to manage and run an Apollo instance
- Parameters
nid (int) – an unique ID assigned to this container runner
ctn (ApolloContainer) – the Apollo container controlled by this runner
start (PositionEstimate) – the initial location of this Apollo instance
start_time (float) – the amount of time this Apollo instance waits before starts moving
waypoints (List[PositionEstimate]) – the expected route this Apollo instance should complete
- get_decisions() Set[Tuple]
Get the decisions made by the Apollo instance
- Returns
list of decisions made
- Return type
Set[Tuple]
- get_min_distance() float
Get the minimum distance this instance ever reached w.r.t. another object. e.g., 0 if a collision occurred
- Returns
the minimum distance between this Apollo instance and another object
- Return type
float
- get_trajectory() List[Tuple]
Get the points traversed by this Apollo instance
- Returns
list of coordinates traversed by this Apollo instance
- Return type
List[Tuple[float, float]]
- initialize()
Resets and initializes all necessary modules of Apollo
- register_publishers()
Register publishers for the cyberRT communication
- register_subscribers()
Register subscribers for the cyberRT communication
- send_initial_localization()
Send the instance’s initial location to cyberRT
- send_routing()
Send the instance’s routing request to cyberRT
- set_min_distance(d: float)
Updates the minimum distance between this distance and another object if the argument passed in is smaller than the current min distance
- Parameters
d (float) – the distance between this instance and another object.
- should_send_routing(t: float) bool
Check if a routing request should be send to the Apollo instance
- Parameters
t (float) – the amount of time since the start of the simulation
- Returns
True if should send, False otherwise
- Return type
bool
- stop(stop_reason: str)
Stop the modules in the container
- Parameters
stop_reason (str) – a debug message indicating why the instance is stopped
apollo.CyberBridge
- class apollo.CyberBridge.BridgeOp
Class representing cyber bridge operations.
- AddReader = b'\x02'
Register a subscriber
- AddWriter = b'\x03'
Register a publisher
- Publish = b'\x04'
Publish a message
- class apollo.CyberBridge.Channel(channel: str, msg_type: str, msg_cls: any)
Class representing information regarding cyber bridge channels
- Parameters
channel (str) – the name of the channel
msg_type (str) – the protobuf data type of messages on this channel
msg_cls (any) – the compiled Python protobuf class
- class apollo.CyberBridge.CyberBridge(host: str, port=9090)
Class to represent CyberBridge
- Parameters
host (str) – IP address of cyber bridge running inside docker
port (int) – port of the cyber bridge
- add_publisher(channel: apollo.CyberBridge.Channel)
Adds a publisher to the bridge
- Parameters
channel (Channel) – the channel to publish message to
- add_subscriber(channel: apollo.CyberBridge.Channel, cb)
Adds a subscriber to the bridge
- Parameters
channel (Channel) – the channel to subscribe from
cb (Function) – callback function that takes parsed data from bridge
- on_read(data: bytes)
Function callback to notify bridge has published data
- Parameters
data (bytes) – data received from bridge
- publish(channel: apollo.CyberBridge.Channel, data: bytes)
Publish data to the bridge
- Parameters
channel (Channel) – channel to publish data to
data (bytes) – data to be published
- Note
You should register a publisher first before publishing any data
- receive_publish(data: bytes)
Receives data published by bridge and calls subscribers
- Parameters
data (bytes) – data received from bridge
- spin()
Starts to spin the cyber bridge client
- stop()
Stops the cyber bridge client
- class apollo.CyberBridge.Topics
Class representing channels used
- Chassis = Channel(channel='/apollo/canbus/chassis', msg_type='apollo.canbus.Chassis', msg_cls=<class 'modules.canbus.proto.chassis_pb2.Chassis'>)
Chassis channel
- Localization = Channel(channel='/apollo/localization/pose', msg_type='apollo.localization.LocalizationEstimate', msg_cls=<class 'modules.localization.proto.localization_pb2.LocalizationEstimate'>)
Localization Channel, typically includes vehicle’s location, velocity, acceleration, etc.
- Obstacles = Channel(channel='/apollo/perception/obstacles', msg_type='apollo.perception.PerceptionObstacles', msg_cls=<class 'modules.perception.proto.perception_obstacle_pb2.PerceptionObstacles'>)
Perception Channel, typically includes obstacles perceived from sensor or ground truth.
- Planning = Channel(channel='/apollo/planning/simplified', msg_type='apollo.planning.ADCTrajectory', msg_cls=<class 'modules.planning.proto.planning_pb2.ADCTrajectory'>)
Planning Channel, Apollo’s planning decisions are published to this channel
- RoutingRequest = Channel(channel='/apollo/routing_request', msg_type='apollo.routing.RoutingRequest', msg_cls=<class 'modules.routing.proto.routing_pb2.RoutingRequest'>)
Routing Request Channel, users send routing request to this channel
- TrafficLight = Channel(channel='/apollo/perception/traffic_light', msg_type='apollo.perception.TrafficLightDetection', msg_cls=<class 'modules.perception.proto.traffic_light_detection_pb2.TrafficLightDetection'>)
Traffic Light Channel, users can send color of any traffic signal to this channel
- apollo.CyberBridge.to_bytes(s: str) bytes
Converts string to bytes using ascii
- Parameters
s (str) – string to be converted
- Returns
bytes in ascii
- Return type
bytes
apollo.Dreamview
- class apollo.Dreamview.Dreamview(ip: str, port: int)
Class to wrap Dreamview connection
- Parameters
ip (str) – IP address of Dreamview websocket
port (int) – port of Dreamview websocket
- reset()
Resets Dreamview
- send_data(data: dict)
Helper function to send data to Dreamview
- Parameters
data (dict) – data to be sent
- start_sim_control()
Starts SimControl via websocket
- stop_sim_control()
Stops SimControl via websocket
apollo.MessageBroker
- class apollo.MessageBroker.MessageBroker(runners: List[apollo.ApolloRunner.ApolloRunner])
Class to represent MessageBroker, it tracks location of each ADS instance and broadcasts perception message to all ADS instances
- Parameters
runners (List[ApolloRunner]) – list of runners each controlling an ADS instance
- broadcast(channel: apollo.CyberBridge.Channel, data: bytes)
Sends data to specified channel of every instance
- Parameters
channel (Channel) – cyberRT channel to send data to
data (bytes) – data to be sent
- spin()
Starts to forward localization
- stop()
Stops forwarding localization
apollo.utils
- class apollo.utils.PositionEstimate(lane_id: str, s: float)
Class representing a location on a HD Map
- Parameters
lane_id (str) – id of the lane on a HD Map
s (float) – distance from the start of the lane
- is_too_close(rhs) bool
Check if 2 PositionEstimate objects are too close to each other. They are too close if their distance is less than 5 meters.
- Parameters
rhs (self) – right hand side object for comparison
- Returns
True if too close, False otherwise
- Return type
bool
- apollo.utils.calculate_velocity(linear_velocity: modules.common.proto.geometry_pb2.Point3D) float
Calculate velocity based on a given vector
- Parameters
linear_velocity (Point3D) – velocity in vector form
- Returns
speed calculated from the velocity
- Return type
float
- apollo.utils.clean_appolo_dir()
Removes Apollo’s log files to save disk space
- apollo.utils.construct_lane_boundary_linestring(lane_msg: modules.map.proto.map_lane_pb2.Lane) List[Tuple[shapely.geometry.linestring.LineString, shapely.geometry.linestring.LineString]]
Construct two linestrings for the lane’s left and right boundary
- Parameters
lane_msg (Lane) – Lane protobuf message extracted from HD Map
- Returns
2 LineString objects representing left and right boundary of the lane
- Return type
Tuple[LaneString, LaneString]
- apollo.utils.construct_lane_polygon(lane_msg: modules.map.proto.map_lane_pb2.Lane) shapely.geometry.polygon.Polygon
Construct the lane polygon based on their boundaries
- Parameters
lane_msg (Lane) – Lane protobuf message extracted from HD Map
- Returns
Polygon representing the lane
- Return type
Polygon
- apollo.utils.dynamic_obstacle_location_to_obstacle(_id: int, speed: float, loc: modules.common.proto.geometry_pb2.Point3D, heading: float) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle
Constructs a perception obstacle message for a dynamic obstacle, used for generating a simulated Apollo instance only
- Parameters
_id (int) – ID of the obstacle
speed (float) – speed of the obstacle
loc (Point3D) – location of the obstacle
heading (float) – heading of the obstacle
- Returns
a PerceptionObstacle protobuf message ready to be published to cyberRT
- Return type
PerceptionObstacle
- apollo.utils.extract_main_decision(data: modules.planning.proto.planning_pb2.ADCTrajectory) Set[Tuple]
Extracts the main decision from a Planning message
- Parameters
data (ADCTrajectory) – ADC’s planning module output
- Returns
a set containing the overall decision and main decision for each obstacle
- Return type
Set[Tuple]
- apollo.utils.generate_adc_polygon(position: modules.common.proto.geometry_pb2.Point3D, theta: float) List[modules.common.proto.geometry_pb2.Point3D]
Generate a polygon for the ADC based on its current position
- Parameters
position (Point3D) – position of the ADC
theta (float) – the heading of the ADC (in radians)
- Returns
a list consisting 4 Point3D objects to represent ADC polygon
- Return type
List[Point3D]
- apollo.utils.generate_adc_rear_vertices(position: modules.common.proto.geometry_pb2.Point3D, theta: float)
Generate the rear edge for the ADC
- Parameters
position (Point3D) – position of the ADC
theta (float) – heading of the ADC
- Returns
a list consisting 2 Point3D objects to represent the rear edge of the ADC
- Return type
List[Point3D]
- apollo.utils.generate_polygon(position: modules.common.proto.geometry_pb2.Point3D, theta: float, length: float, width: float) List[modules.common.proto.geometry_pb2.Point3D]
Generate polygon for a perception obstacle
- Parameters
position (Point3D) – the position of the obstacle
theta (float) – the heading of the obstacle
length (float) – the length of the obstacle
width (float) – the width of the obstacle
- Returns
List with 4 Point3D objects representing the polygon of the obstacle
- Return type
List[Point3D]
- apollo.utils.get_lane_boundary_points(boundary: modules.map.proto.map_lane_pb2.LaneBoundary) List[Tuple[float, float]]
Given a lane boundary (left/right), return a list of x, y coordinates of all points in the boundary
- Parameters
boundary (LaneBoundary) – LaneBoundary protobuf message
- Returns
list of boundary points
- Return type
List[Tuple[float, float]]
- apollo.utils.localization_to_obstacle(_id: int, data: modules.localization.proto.localization_pb2.LocalizationEstimate) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle
Converts LocalizationEstimate to PerceptionObstacle. The localization message of an ADS instance is used as part of the perception message for other ADS instances.
- Parameters
_id (int) – ID of the obstacle
data (LocalizationEstimate) – localization message of the ADC
- Returns
PerceptionObstacle message converted from localization of an ADC
- Return type
PerceptionObstacle
- apollo.utils.obstacle_to_polygon(obs: modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle) shapely.geometry.polygon.Polygon
Constructs a polygon for an obstacle
- Parameters
obs (PerceptionObstacle) – the perception obstacle protobuf message
- Returns
a Polygon object representing the obstacle
- Return type
Polygon
- apollo.utils.pedestrian_location_to_obstacle(_id: int, speed: float, loc: modules.common.proto.geometry_pb2.Point3D, heading: float) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle
Constructs a perception obstacle message for a pedestrian
- Parameters
_id (int) – ID of the obstacle
speed (float) – speed of the obstacle
loc (Point3D) – location of the obstacle
heading (float) – heading of the obstacle
- Returns
a PerceptionObstacle protobuf message ready to be published to cyberRT
- Return type
PerceptionObstacle
- apollo.utils.to_Point3D(data: modules.common.proto.geometry_pb2.Point3D) modules.common.proto.geometry_pb2.Point3D
Replaces NaN that may occur in Apollo to 0.0
- Parameters
data (Point3D) – Point3D object to be cleaned
- Returns
cleaned up version of the original Point3D object
- Return type
Point3D