apollo

apollo.ApolloContainer

class apollo.ApolloContainer.ApolloContainer(apollo_root: str, username: str)

Class to represent Apollo container

Parameters
  • apollo_root (str) – root directory of Baidu Apollo

  • username (str) – unique ID used to identify container

property container_name: str

Gets the name of the container

Type

str

property ip: str

Gets the ip address of the container

Type

str

is_running() bool

Checks if the container is running

Returns

True if running, False otherwise

Return type

bool

reset()

Resets the container (e.g., stopps and restarts all related modules)

reset_bridge_connection()

Close any existing connection to bridge and reconnect

restart_dreamview()

Restart Dreamview

restart_modules()

Restart all the necessary modules

start_bridge()

Start cyber bridge

start_dreamview()

Start Dreamview

start_instance(restart=False)

Starts an Apollo instance

Parameters

restart (bool) – force container to restart

start_modules()

Start all the necessary modules

start_recorder(record_id: str)

Starts cyber_recorder

Parameters

record_id (str) – the name of the record file

start_sim_control_standalone()

Starts SimControlStandalone module

stop_all()

Stops SimControl and all other AD related modules

stop_dreamview()

Stop Dreamview

stop_modules()

Stop all the necessary modules

stop_recorder()

Stops cyber_recorder

stop_sim_control_standalone()

Stops SimControlStandalone module

apollo.ApolloRunner

class apollo.ApolloRunner.ApolloRunner(nid: int, ctn: apollo.ApolloContainer.ApolloContainer, start: apollo.utils.PositionEstimate, start_time: float, waypoints: List[apollo.utils.PositionEstimate])

Class to manage and run an Apollo instance

Parameters
  • nid (int) – an unique ID assigned to this container runner

  • ctn (ApolloContainer) – the Apollo container controlled by this runner

  • start (PositionEstimate) – the initial location of this Apollo instance

  • start_time (float) – the amount of time this Apollo instance waits before starts moving

  • waypoints (List[PositionEstimate]) – the expected route this Apollo instance should complete

get_decisions() Set[Tuple]

Get the decisions made by the Apollo instance

Returns

list of decisions made

Return type

Set[Tuple]

get_min_distance() float

Get the minimum distance this instance ever reached w.r.t. another object. e.g., 0 if a collision occurred

Returns

the minimum distance between this Apollo instance and another object

Return type

float

get_trajectory() List[Tuple]

Get the points traversed by this Apollo instance

Returns

list of coordinates traversed by this Apollo instance

Return type

List[Tuple[float, float]]

initialize()

Resets and initializes all necessary modules of Apollo

register_publishers()

Register publishers for the cyberRT communication

register_subscribers()

Register subscribers for the cyberRT communication

send_initial_localization()

Send the instance’s initial location to cyberRT

send_routing()

Send the instance’s routing request to cyberRT

set_min_distance(d: float)

Updates the minimum distance between this distance and another object if the argument passed in is smaller than the current min distance

Parameters

d (float) – the distance between this instance and another object.

should_send_routing(t: float) bool

Check if a routing request should be send to the Apollo instance

Parameters

t (float) – the amount of time since the start of the simulation

Returns

True if should send, False otherwise

Return type

bool

stop(stop_reason: str)

Stop the modules in the container

Parameters

stop_reason (str) – a debug message indicating why the instance is stopped

apollo.CyberBridge

class apollo.CyberBridge.BridgeOp

Class representing cyber bridge operations.

AddReader = b'\x02'

Register a subscriber

AddWriter = b'\x03'

Register a publisher

Publish = b'\x04'

Publish a message

class apollo.CyberBridge.Channel(channel: str, msg_type: str, msg_cls: any)

Class representing information regarding cyber bridge channels

Parameters
  • channel (str) – the name of the channel

  • msg_type (str) – the protobuf data type of messages on this channel

  • msg_cls (any) – the compiled Python protobuf class

class apollo.CyberBridge.CyberBridge(host: str, port=9090)

Class to represent CyberBridge

Parameters
  • host (str) – IP address of cyber bridge running inside docker

  • port (int) – port of the cyber bridge

add_publisher(channel: apollo.CyberBridge.Channel)

Adds a publisher to the bridge

Parameters

channel (Channel) – the channel to publish message to

add_subscriber(channel: apollo.CyberBridge.Channel, cb)

Adds a subscriber to the bridge

Parameters
  • channel (Channel) – the channel to subscribe from

  • cb (Function) – callback function that takes parsed data from bridge

on_read(data: bytes)

Function callback to notify bridge has published data

Parameters

data (bytes) – data received from bridge

publish(channel: apollo.CyberBridge.Channel, data: bytes)

Publish data to the bridge

Parameters
  • channel (Channel) – channel to publish data to

  • data (bytes) – data to be published

Note

You should register a publisher first before publishing any data

receive_publish(data: bytes)

Receives data published by bridge and calls subscribers

Parameters

data (bytes) – data received from bridge

spin()

Starts to spin the cyber bridge client

stop()

Stops the cyber bridge client

class apollo.CyberBridge.Topics

Class representing channels used

Chassis = Channel(channel='/apollo/canbus/chassis', msg_type='apollo.canbus.Chassis', msg_cls=<class 'modules.canbus.proto.chassis_pb2.Chassis'>)

Chassis channel

Localization = Channel(channel='/apollo/localization/pose', msg_type='apollo.localization.LocalizationEstimate', msg_cls=<class 'modules.localization.proto.localization_pb2.LocalizationEstimate'>)

Localization Channel, typically includes vehicle’s location, velocity, acceleration, etc.

Obstacles = Channel(channel='/apollo/perception/obstacles', msg_type='apollo.perception.PerceptionObstacles', msg_cls=<class 'modules.perception.proto.perception_obstacle_pb2.PerceptionObstacles'>)

Perception Channel, typically includes obstacles perceived from sensor or ground truth.

Planning = Channel(channel='/apollo/planning/simplified', msg_type='apollo.planning.ADCTrajectory', msg_cls=<class 'modules.planning.proto.planning_pb2.ADCTrajectory'>)

Planning Channel, Apollo’s planning decisions are published to this channel

RoutingRequest = Channel(channel='/apollo/routing_request', msg_type='apollo.routing.RoutingRequest', msg_cls=<class 'modules.routing.proto.routing_pb2.RoutingRequest'>)

Routing Request Channel, users send routing request to this channel

TrafficLight = Channel(channel='/apollo/perception/traffic_light', msg_type='apollo.perception.TrafficLightDetection', msg_cls=<class 'modules.perception.proto.traffic_light_detection_pb2.TrafficLightDetection'>)

Traffic Light Channel, users can send color of any traffic signal to this channel

apollo.CyberBridge.to_bytes(s: str) bytes

Converts string to bytes using ascii

Parameters

s (str) – string to be converted

Returns

bytes in ascii

Return type

bytes

apollo.Dreamview

class apollo.Dreamview.Dreamview(ip: str, port: int)

Class to wrap Dreamview connection

Parameters
  • ip (str) – IP address of Dreamview websocket

  • port (int) – port of Dreamview websocket

reset()

Resets Dreamview

send_data(data: dict)

Helper function to send data to Dreamview

Parameters

data (dict) – data to be sent

start_sim_control()

Starts SimControl via websocket

stop_sim_control()

Stops SimControl via websocket

apollo.MessageBroker

class apollo.MessageBroker.MessageBroker(runners: List[apollo.ApolloRunner.ApolloRunner])

Class to represent MessageBroker, it tracks location of each ADS instance and broadcasts perception message to all ADS instances

Parameters

runners (List[ApolloRunner]) – list of runners each controlling an ADS instance

broadcast(channel: apollo.CyberBridge.Channel, data: bytes)

Sends data to specified channel of every instance

Parameters
  • channel (Channel) – cyberRT channel to send data to

  • data (bytes) – data to be sent

spin()

Starts to forward localization

stop()

Stops forwarding localization

apollo.utils

class apollo.utils.PositionEstimate(lane_id: str, s: float)

Class representing a location on a HD Map

Parameters
  • lane_id (str) – id of the lane on a HD Map

  • s (float) – distance from the start of the lane

is_too_close(rhs) bool

Check if 2 PositionEstimate objects are too close to each other. They are too close if their distance is less than 5 meters.

Parameters

rhs (self) – right hand side object for comparison

Returns

True if too close, False otherwise

Return type

bool

apollo.utils.calculate_velocity(linear_velocity: modules.common.proto.geometry_pb2.Point3D) float

Calculate velocity based on a given vector

Parameters

linear_velocity (Point3D) – velocity in vector form

Returns

speed calculated from the velocity

Return type

float

apollo.utils.clean_appolo_dir()

Removes Apollo’s log files to save disk space

apollo.utils.construct_lane_boundary_linestring(lane_msg: modules.map.proto.map_lane_pb2.Lane) List[Tuple[shapely.geometry.linestring.LineString, shapely.geometry.linestring.LineString]]

Construct two linestrings for the lane’s left and right boundary

Parameters

lane_msg (Lane) – Lane protobuf message extracted from HD Map

Returns

2 LineString objects representing left and right boundary of the lane

Return type

Tuple[LaneString, LaneString]

apollo.utils.construct_lane_polygon(lane_msg: modules.map.proto.map_lane_pb2.Lane) shapely.geometry.polygon.Polygon

Construct the lane polygon based on their boundaries

Parameters

lane_msg (Lane) – Lane protobuf message extracted from HD Map

Returns

Polygon representing the lane

Return type

Polygon

apollo.utils.dynamic_obstacle_location_to_obstacle(_id: int, speed: float, loc: modules.common.proto.geometry_pb2.Point3D, heading: float) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle

Constructs a perception obstacle message for a dynamic obstacle, used for generating a simulated Apollo instance only

Parameters
  • _id (int) – ID of the obstacle

  • speed (float) – speed of the obstacle

  • loc (Point3D) – location of the obstacle

  • heading (float) – heading of the obstacle

Returns

a PerceptionObstacle protobuf message ready to be published to cyberRT

Return type

PerceptionObstacle

apollo.utils.extract_main_decision(data: modules.planning.proto.planning_pb2.ADCTrajectory) Set[Tuple]

Extracts the main decision from a Planning message

Parameters

data (ADCTrajectory) – ADC’s planning module output

Returns

a set containing the overall decision and main decision for each obstacle

Return type

Set[Tuple]

apollo.utils.generate_adc_polygon(position: modules.common.proto.geometry_pb2.Point3D, theta: float) List[modules.common.proto.geometry_pb2.Point3D]

Generate a polygon for the ADC based on its current position

Parameters
  • position (Point3D) – position of the ADC

  • theta (float) – the heading of the ADC (in radians)

Returns

a list consisting 4 Point3D objects to represent ADC polygon

Return type

List[Point3D]

apollo.utils.generate_adc_rear_vertices(position: modules.common.proto.geometry_pb2.Point3D, theta: float)

Generate the rear edge for the ADC

Parameters
  • position (Point3D) – position of the ADC

  • theta (float) – heading of the ADC

Returns

a list consisting 2 Point3D objects to represent the rear edge of the ADC

Return type

List[Point3D]

apollo.utils.generate_polygon(position: modules.common.proto.geometry_pb2.Point3D, theta: float, length: float, width: float) List[modules.common.proto.geometry_pb2.Point3D]

Generate polygon for a perception obstacle

Parameters
  • position (Point3D) – the position of the obstacle

  • theta (float) – the heading of the obstacle

  • length (float) – the length of the obstacle

  • width (float) – the width of the obstacle

Returns

List with 4 Point3D objects representing the polygon of the obstacle

Return type

List[Point3D]

apollo.utils.get_lane_boundary_points(boundary: modules.map.proto.map_lane_pb2.LaneBoundary) List[Tuple[float, float]]

Given a lane boundary (left/right), return a list of x, y coordinates of all points in the boundary

Parameters

boundary (LaneBoundary) – LaneBoundary protobuf message

Returns

list of boundary points

Return type

List[Tuple[float, float]]

apollo.utils.localization_to_obstacle(_id: int, data: modules.localization.proto.localization_pb2.LocalizationEstimate) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle

Converts LocalizationEstimate to PerceptionObstacle. The localization message of an ADS instance is used as part of the perception message for other ADS instances.

Parameters
  • _id (int) – ID of the obstacle

  • data (LocalizationEstimate) – localization message of the ADC

Returns

PerceptionObstacle message converted from localization of an ADC

Return type

PerceptionObstacle

apollo.utils.obstacle_to_polygon(obs: modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle) shapely.geometry.polygon.Polygon

Constructs a polygon for an obstacle

Parameters

obs (PerceptionObstacle) – the perception obstacle protobuf message

Returns

a Polygon object representing the obstacle

Return type

Polygon

apollo.utils.pedestrian_location_to_obstacle(_id: int, speed: float, loc: modules.common.proto.geometry_pb2.Point3D, heading: float) modules.perception.proto.perception_obstacle_pb2.PerceptionObstacle

Constructs a perception obstacle message for a pedestrian

Parameters
  • _id (int) – ID of the obstacle

  • speed (float) – speed of the obstacle

  • loc (Point3D) – location of the obstacle

  • heading (float) – heading of the obstacle

Returns

a PerceptionObstacle protobuf message ready to be published to cyberRT

Return type

PerceptionObstacle

apollo.utils.to_Point3D(data: modules.common.proto.geometry_pb2.Point3D) modules.common.proto.geometry_pb2.Point3D

Replaces NaN that may occur in Apollo to 0.0

Parameters

data (Point3D) – Point3D object to be cleaned

Returns

cleaned up version of the original Point3D object

Return type

Point3D