[Redesign] Add type annotations to operator and component creator#278
[Redesign] Add type annotations to operator and component creator#278HivaMohammadzadeh1 wants to merge 3 commits intoerdos-project:redesignfrom
Conversation
pschafhalter
left a comment
There was a problem hiding this comment.
Left some feedback. It might be a good idea to break this PR into smaller pieces.
| def add_prediction(obstacles_tracking_stream: Stream[ObstaclesMessageTuple], | ||
| vehicle_id_stream: Stream[int], | ||
| release_sensor_stream: Stream[None], | ||
| point_cloud_stream: Stream[PointCloud], | ||
| lidar_setup: Stream[LidarSetup], | ||
| pose_stream: Optional[Stream[pylot.utils.Pose]] = None, | ||
| Camera_transform = None, | ||
| time_to_decision_stream: Optional[Stream[float]] = None | ||
| ) -> Tuple[Stream[prediction], Stream[int], Stream]: |
There was a problem hiding this comment.
Could you make sure that the arguments remain in the same order as before?
There was a problem hiding this comment.
Also, the return type should be Tuple[Stream[List[ObstaclePrediction]], Stream[SegmentedFrame], Stream[None]]
|
|
||
| def add_traffic_light_invasion_sensor(ground_vehicle_id_stream: Stream, | ||
| pose_stream: Stream) -> Stream: | ||
| pose_stream: Stream[pylot.utils.Pose]) -> Stream: |
There was a problem hiding this comment.
Please add an annotation the returned stream with the message type.
| camera_setup, | ||
| name='center_track'): | ||
| def add_center_track_tracking(bgr_camera_stream: Stream[CameraFrame], | ||
| camera_setup: Stream[CameraSetup], |
There was a problem hiding this comment.
| camera_setup: Stream[CameraSetup], | |
| camera_setup: CameraSetup, |
| global_trajectory_stream, | ||
| prediction_stream: Stream[prediction], | ||
| time_to_decision_stream: Optional[Stream[float]] = None, | ||
| pose_stream: Optional[Stream[pylot.utils.Pose]] = None) -> Stream: |
There was a problem hiding this comment.
Please annotate the return type of the stream -- it should carry waypoints.
| from absl import flags | ||
|
|
||
| from erdos import Stream | ||
| from pylot import prediction |
There was a problem hiding this comment.
| from pylot import prediction |
|
|
||
| import erdos | ||
| from erdos import Stream | ||
| from pylot import prediction |
There was a problem hiding this comment.
| from pylot import prediction |
|
|
||
| from pylot.loggers.pose_logger_operator import PoseLoggerOperator | ||
| from pylot.loggers.imu_logger_operator import IMULoggerOperator | ||
| from pylot.prediction import obstacle_prediction | ||
|
|
There was a problem hiding this comment.
Why were these imports added?
|
|
||
| def add_linear_prediction(tracking_stream: Stream, | ||
| time_to_decision_stream: Stream) -> Stream: | ||
| time_to_decision_stream: Stream) -> Stream[prediction]: |
There was a problem hiding this comment.
| time_to_decision_stream: Stream) -> Stream[prediction]: | |
| time_to_decision_stream: Stream[float]) -> Stream[List[ObstaclePrediction]]: |
| def add_r2p2_prediction(point_cloud_stream, obstacles_tracking_stream, | ||
| time_to_decision_stream, lidar_setup): | ||
| def add_r2p2_prediction(point_cloud_stream: Stream[PointCloud], obstacles_tracking_stream: Stream[ObstaclesMessageTuple], | ||
| time_to_decision_stream, lidar_setup: Stream[LidarSetup]) -> Stream[prediction]: |
There was a problem hiding this comment.
| time_to_decision_stream, lidar_setup: Stream[LidarSetup]) -> Stream[prediction]: | |
| time_to_decision_stream: Stream[float], lidar_setup: LidarSetup) -> Stream[List[ObstaclePrediction]]: |
| tracking_stream: Stream, | ||
| prediction_stream: Stream, |
There was a problem hiding this comment.
Please annotate the types of these streams.
No description provided.