From 1499e737c1adaf5880c71a5de67ac7c944fbc02d Mon Sep 17 00:00:00 2001 From: Gentile G Date: Thu, 31 Aug 2023 17:31:43 +0200 Subject: [PATCH] Moving the mission and mission control in their own file. Adapting the rescue mission for the new shuttle --- main.py | 189 +----------------------------------- maneuver_scheduler.py | 2 +- maneuvers/approach.py | 22 +---- maneuvers/approach_rcs.py | 96 ++++++++++++++++++ maneuvers/docking.py | 39 +------- maneuvers/utils.py | 57 +++++++++++ mission/__init__.py | 130 +++++++++++++++++++++++++ mission_control/__init__.py | 161 ++++++++++++++++++++++++++++++ 8 files changed, 450 insertions(+), 246 deletions(-) create mode 100644 maneuvers/approach_rcs.py create mode 100644 mission/__init__.py create mode 100644 mission_control/__init__.py diff --git a/main.py b/main.py index c63cade..b7a0c41 100644 --- a/main.py +++ b/main.py @@ -1,195 +1,12 @@ import sys import signal -from enum import Enum + +from mission_control import ShuttleKerbin, ComsatKerbin +from mission import RescueMission, ComSatNetworkMission from maneuver_scheduler import ManeuverScheduler -from maneuvers.rendezvous import RendezvousManeuver -from maneuvers.approach import ApproachManeuver -from maneuvers.docking import DockingManeuver -from maneuvers.comsat import ComsatManeuver - from connector import get_connexion -from lib import get_contract, get_vessel, get_rescuee_vessel, get_body - - -class MissionType(Enum): - transport = 1 - rescue = 2 - comsat = 3 - - -class MissionStatus(Enum): - Backlog = 1 - Assigned = 2 - InProgress = 3 - Done = 4 - - -class MissionReport: - mission_status = None - new_vessel_id = None - new_missions = [] - - def __int__(self): - pass - - -class Mission: - pass - - -class TransportMission(Mission): - type = MissionType.transport - - def __init__(self, kerbal_name, destination): - self.kerbal_name = kerbal_name - self.destination = destination - - -class RescueMission(Mission): - type = MissionType.rescue - - def __init__(self, contract_title): - self.contract_title = contract_title - self.contract = get_contract(contract_title) - rescuee_first_name = contract_title.split()[1] - self.rescuee_vessel = get_rescuee_vessel(rescuee_first_name) - self.rescuee_vessel_name = self.rescuee_vessel.name - self.rescuee_name = self.rescuee_vessel.crew[0].name - - def execute(self, mission_control): - conn = get_connexion() - sc = conn.space_center - - if sc.active_vessel.name != mission_control.vessel.name: - sc.active_vessel = mission_control.vessel - - if sc.target_vessel is None or sc.target_vessel.name != self.rescuee_vessel.name: - sc.target_vessel = self.rescuee_vessel - - rendezvous_done = RendezvousManeuver(conn, mission_control).execute() - if rendezvous_done: - reference_frame = sc.ReferenceFrame.create_relative( - self.rescuee_vessel.reference_frame, - rotation=(1., 0., 0., 0.) - ) - sc.active_vessel.parts.controlling = mission_control.get_grappling() - approach_done = ApproachManeuver(conn, mission_control, reference_frame).start() - if approach_done: - mission_control.toggle_grappling(True) - docking_done = DockingManeuver(conn, - mission_control, - mission_control.get_grappling(), - reference_frame).start() - # Find new creat member name - # sc.transfer_crew(self.rescuee_name, target_part) - # Release Grapple - # Destroy Capsule - report = MissionReport() - report.mission_status = MissionStatus.Done - report.new_missions.append( - TransportMission(self.rescuee_name, 'kerbin_orbit') - ) - - return report - - -class ComSatNetworkMission: - type = MissionType.comsat - - def __init__(self, body_name): - self.body = get_body(body_name) - - def execute(self, mission_control): - conn = get_connexion() - sc = conn.space_center - - if sc.active_vessel.name != mission_control.vessel.name: - sc.active_vessel = mission_control.vessel - - mission_control.toggle_engines(True) - - comsat_done = ComsatManeuver(conn, mission_control, self.body).execute() - if comsat_done: - report = MissionReport() - report.mission_status = MissionStatus.Done - - return report - - -class MissionControl: - mission_types = [] - - def __init__(self): - self.current_mission = None - - def pick_missions(self, backlog): - current_body_name = self.vessel.orbit.body.name - for i, mission in enumerate(backlog.missions[current_body_name]): - if mission.type in self.mission_types: - if self.current_mission is None: - self.plan_mission(mission) - else: - self.current_orbit_missions.append(mission) - del (backlog.missions[current_body_name][i]) - - def plan_mission(self, mission): - self.current_mission = mission - planning_duration = 60 - ut = ManeuverScheduler.next_free_timeslot( - from_ut=get_connexion().space_center.ut + planning_duration, - duration=planning_duration) - ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration) - - def execute_mission(self, alarm): - if self.current_mission is None: - if self.current_orbit_missions: - self.current_mission = self.current_orbit_missions.pop(0) - else: - return - report = self.current_mission.execute(self) - if report and report.mission_status == MissionStatus.Done: - self.current_mission = None - - -class ShuttleKerbin(MissionControl): - mission_types = [MissionType.rescue, MissionType.transport] - PISTON_MAX_EXTENSION = 2.4 - - def __init__(self, vessel_name): - super().__init__() - self.vessel_name = vessel_name - self.vessel = get_vessel(vessel_name) - for e in self.vessel.parts.engines: - e.active = True - - self.current_orbit_missions = [] - - def toggle_grappling(self, value=True): - piston = self.vessel.parts.robotic_pistons[0] - piston.target_extension = self.PISTON_MAX_EXTENSION if value else 0 - - arm = piston.part.children[0] - if arm.modules[1].has_event('Arm') and not value \ - or arm.modules[1].has_event('Disarm') and value: - arm.modules[1].set_action_by_id('ToggleAction', True) - - def get_grappling(self): - piston = self.vessel.parts.robotic_pistons[0] - return piston.part.children[0] - - -class ComsatKerbin(MissionControl): - mission_types = [MissionType.comsat] - - def __init__(self, vessel_name): - super().__init__() - self.vessel_name = vessel_name - self.vessel = get_vessel(vessel_name) - - def toggle_engines(self, value=True): - self.vessel.parts.engines[0].active = value class Backlog: diff --git a/maneuver_scheduler.py b/maneuver_scheduler.py index 73d5f8c..db05112 100644 --- a/maneuver_scheduler.py +++ b/maneuver_scheduler.py @@ -161,7 +161,7 @@ class ManeuverScheduler: except json.JSONDecodeError: continue - raise EOFError('Excepted to find a free timeslot at the end alarm list') + raise EOFError('Expected to find a free timeslot at the end alarm list') @classmethod def get_reservation(cls, ut_at) -> Timeslot: diff --git a/maneuvers/approach.py b/maneuvers/approach.py index 96388c7..5218b61 100644 --- a/maneuvers/approach.py +++ b/maneuvers/approach.py @@ -3,7 +3,7 @@ from krpc.services.spacecenter import SASMode import numpy as np from time import time, sleep -from .utils import magnitude, unitary, kill_relative_velocity, correct_course +from .utils import magnitude, unitary, kill_relative_velocity, correct_course, get_safety_radius, point_toward_direction from . import Maneuver @@ -52,26 +52,6 @@ class ApproachManeuver(Maneuver): return True -def get_safety_radius(vessel): - bbox = vessel.bounding_box(vessel.reference_frame) - return max(magnitude(bbox[0]), magnitude(bbox[1])) - - -def point_toward_direction(vessel, direction, reference_frame): - ap = vessel.auto_pilot - ap.reference_frame = reference_frame - ap.target_direction = unitary(direction) - ap.target_roll = 0 - ap.sas = False - ap.engage() - sleep(1) - ap.wait() - - ap.disengage() - ap.sas_mode = SASMode.stability_assist - ap.sas = True - - THROTTLE = .1 VELOCITY_TOLERANCE = .1 diff --git a/maneuvers/approach_rcs.py b/maneuvers/approach_rcs.py new file mode 100644 index 0000000..a4b9417 --- /dev/null +++ b/maneuvers/approach_rcs.py @@ -0,0 +1,96 @@ +from krpc.services.spacecenter import SASMode + +import numpy as np +from time import time, sleep + +from .utils import magnitude, unitary, kill_relative_velocity, kill_rcs_velocity, correct_course, get_safety_radius,\ + point_toward_direction + +from . import Maneuver + + +class ApproachRCSManeuver(Maneuver): + + SAFETY_RADIUS_MARGIN = 10 + + def __init__(self, conn, mission_control, reference_frame): + super().__init__(conn, mission_control) + self.reference_frame = reference_frame + + def start(self): + sc = self.conn.space_center + vessel = sc.active_vessel + target = sc.target_vessel + + kill_rcs_velocity(vessel, self.reference_frame) + + self.conn.drawing.add_direction((0, 1, 0), self.reference_frame) + + vessel.control.rcs = False + + pv = vessel.position(self.reference_frame) + + safety_radius = get_safety_radius(vessel) + get_safety_radius(target) + self.SAFETY_RADIUS_MARGIN + + # if under and inside safety cylinder's circle + if pv[1] < safety_radius and pow(pv[0], 2) + pow(pv[2], 2) <= pow(safety_radius, 2): + print("We're under the target and inside the safety cylinder, getting out") + # get out of the cylinder + planar_move_vector = unitary((pv[0], pv[2])) * (safety_radius - magnitude((pv[0], pv[2]))) + spacial_move_vector = np.array((planar_move_vector[0], 0, planar_move_vector[1])) + + pv = vessel.position(self.reference_frame) + move_to_waypoint(self.conn, vessel, pv + spacial_move_vector, self.reference_frame) + + print("We're outside of the safety cylinder, setting vertical distance") + pv = vessel.position(self.reference_frame) + move_to_waypoint(self.conn, vessel, (pv[0], safety_radius, pv[2]), self.reference_frame) + + # should be above and outside => get inside + print("We're at the right vertical distance to the target, setting horizontal position") + move_to_waypoint(self.conn, vessel, (0, safety_radius, 0), self.reference_frame) + + # point_toward_direction(vessel, - np.array(vessel.position(self.reference_frame)), self.reference_frame) + point_toward_direction(vessel, (0, -1, 0), self.reference_frame) + + return True + + +def move_to_waypoint(conn, vessel, waypoint, reference_frame): + mj = conn.mech_jeb + + kill_relative_velocity(conn, vessel, reference_frame) + + conn.drawing.add_line(vessel.position(reference_frame), waypoint, reference_frame) + waypoint = np.array(waypoint) + + start_position = np.array(vessel.position(reference_frame)) + vector = waypoint - start_position + distance = magnitude(vector) + direction = unitary(vector) + acceleration_distance = distance / 4 + + point_toward_direction(vessel, direction, reference_frame) + + print("Starting acceleration") + remaining_distance = distance + vessel.control.rcs = True + vessel.control.forward = 1 + while remaining_distance > 3 * acceleration_distance: + sleep(.1) + remaining_distance = magnitude(waypoint - vessel.position(reference_frame)) + vessel.control.forward = 0 + print("Target velocity achieved") + + while remaining_distance > acceleration_distance: + sleep(.1) + correct_course(conn, vessel, waypoint, reference_frame) + remaining_distance = magnitude(waypoint - vessel.position(reference_frame)) + print(remaining_distance) + + print("Starting deceleration") + kill_rcs_velocity(vessel, reference_frame) + print("Ship decelerated") + + print("destination position: {}".format(waypoint)) + print("end position: {}".format(np.array(vessel.position(reference_frame)))) diff --git a/maneuvers/docking.py b/maneuvers/docking.py index 0703c99..04a7e56 100644 --- a/maneuvers/docking.py +++ b/maneuvers/docking.py @@ -1,6 +1,6 @@ from time import sleep -from .utils import kill_relative_velocity, correct_course, magnitude +from .utils import kill_relative_velocity, kill_rcs_velocity, correct_course, magnitude from . import Maneuver @@ -83,43 +83,6 @@ def rcs_push(vessel, axis, duration): vessel.control.rcs = False -def kill_rcs_velocity(vessel, reference_frame): - print("Killing RCS velocity") - velo = vessel.velocity(reference_frame) - vessel.control.rcs = True - while any(abs(component) > .05 for component in velo) > .05: - if abs(velo[0]) > .05: - sign = -velo[0] / abs(velo[0]) - if abs(velo[0]) > .1: - vessel.control.up = 1 * sign - elif abs(velo[0]) > .05: - vessel.control.up = .1 * sign - else: - vessel.control.up = 0 - - if abs(velo[1]) > .05: - sign = -velo[1] / abs(velo[1]) - if abs(velo[1]) > .1: - vessel.control.forward = 1 * sign - elif abs(velo[1]) > .05: - vessel.control.forward = .1 * sign - else: - vessel.control.forward = 0 - - if abs(velo[2]) > .05: - sign = velo[2] / abs(velo[2]) - if abs(velo[2]) > .1: - vessel.control.right = 1 * sign - elif abs(velo[2]) > .05: - vessel.control.right = .1 * sign - else: - vessel.control.right = 0 - sleep(.1) - velo = vessel.velocity(reference_frame) - vessel.control.rcs = False - print("RCS velocity killed") - - def align_horizontally(conn, vessel, reference_frame): conn.drawing.add_direction((1, 0, 0), vessel.reference_frame) diff --git a/maneuvers/utils.py b/maneuvers/utils.py index b92e3a4..c0e3766 100644 --- a/maneuvers/utils.py +++ b/maneuvers/utils.py @@ -63,3 +63,60 @@ def correct_course(conn, vessel, waypoint, reference_frame): vessel.control.up = -.1 else: vessel.control.up = 0 + + +def kill_rcs_velocity(vessel, reference_frame): + print("Killing RCS velocity") + velo = vessel.velocity(reference_frame) + vessel.control.rcs = True + while any(abs(component) > .05 for component in velo) > .05: + if abs(velo[0]) > .05: + sign = -velo[0] / abs(velo[0]) + if abs(velo[0]) > .1: + vessel.control.up = 1 * sign + elif abs(velo[0]) > .05: + vessel.control.up = .1 * sign + else: + vessel.control.up = 0 + + if abs(velo[1]) > .05: + sign = -velo[1] / abs(velo[1]) + if abs(velo[1]) > .1: + vessel.control.forward = 1 * sign + elif abs(velo[1]) > .05: + vessel.control.forward = .1 * sign + else: + vessel.control.forward = 0 + + if abs(velo[2]) > .05: + sign = velo[2] / abs(velo[2]) + if abs(velo[2]) > .1: + vessel.control.right = 1 * sign + elif abs(velo[2]) > .05: + vessel.control.right = .1 * sign + else: + vessel.control.right = 0 + sleep(.1) + velo = vessel.velocity(reference_frame) + vessel.control.rcs = False + print("RCS velocity killed") + + +def get_safety_radius(vessel): + bbox = vessel.bounding_box(vessel.reference_frame) + return max(magnitude(bbox[0]), magnitude(bbox[1])) + + +def point_toward_direction(vessel, direction, reference_frame): + ap = vessel.auto_pilot + ap.reference_frame = reference_frame + ap.target_direction = unitary(direction) + ap.target_roll = 0 + ap.sas = False + ap.engage() + sleep(1) + ap.wait() + + ap.disengage() + ap.sas_mode = SASMode.stability_assist + ap.sas = True \ No newline at end of file diff --git a/mission/__init__.py b/mission/__init__.py new file mode 100644 index 0000000..78e8f45 --- /dev/null +++ b/mission/__init__.py @@ -0,0 +1,130 @@ +from time import sleep +from enum import Enum + +from maneuvers.rendezvous import RendezvousManeuver +from maneuvers.approach_rcs import ApproachRCSManeuver +from maneuvers.docking import DockingManeuver +from maneuvers.comsat import ComsatManeuver + +from lib import get_contract, get_rescuee_vessel, get_body, get_connexion + + +class MissionType(Enum): + transport = 1 + orbit_rescue = 2 + comsat = 3 + + +class MissionStatus(Enum): + Backlog = 1 + Assigned = 2 + InProgress = 3 + Done = 4 + + +class MissionReport: + mission_status = None + new_vessel_id = None + new_missions = [] + + def __int__(self): + pass + + +class Mission: + pass + + +class TransportMission(Mission): + type = MissionType.transport + + def __init__(self, kerbal_name, destination): + self.kerbal_name = kerbal_name + self.destination = destination + + +class RescueMission(Mission): + type = MissionType.orbit_rescue + + def __init__(self, contract_title): + self.contract_title = contract_title + self.contract = get_contract(contract_title) + rescuee_first_name = contract_title.split()[1] + self.rescuee_vessel = get_rescuee_vessel(rescuee_first_name) + self.rescuee_vessel_name = self.rescuee_vessel.name + self.rescuee_name = self.rescuee_vessel.crew[0].name + + def execute(self, mission_control): + conn = get_connexion() + sc = conn.space_center + + if sc.active_vessel.name != mission_control.vessel.name: + sc.active_vessel = mission_control.vessel + + if sc.target_vessel is None or sc.target_vessel.name != self.rescuee_vessel.name: + sc.target_vessel = self.rescuee_vessel + + rendezvous_done = RendezvousManeuver(conn, mission_control).execute() + if rendezvous_done: + reference_frame = sc.ReferenceFrame.create_relative( + self.rescuee_vessel.reference_frame, + rotation=(1., 0., 0., 0.) + ) + + sc.active_vessel = mission_control.deliver_probe + sc.active_vessel.parts.controlling = mission_control.get_grappling() + + target_approach_done = ApproachRCSManeuver(conn, mission_control, reference_frame).start() + if target_approach_done: + mission_control.toggle_grappling(True) + target_docking_done = DockingManeuver(conn, + mission_control, + mission_control.get_grappling(), + reference_frame).start() + if target_docking_done: + sc.active_vessel.parts.controlling = mission_control.get_probe_port() + reference_frame = mission_control.get_bay_port().reference_frame + mothership_approach_done = ApproachRCSManeuver(conn, mission_control, reference_frame).start() + if mothership_approach_done: + mothership_docking_done = DockingManeuver(conn, + mission_control, + mission_control.get_probe_port(), + reference_frame).start() + + if mothership_docking_done: + kerbal = mission_control.get_kerbal(self.rescuee_name) + sc.transfer_crew(kerbal, mission_control.get_part_with_free_seat()) + mission_control.toggle_grappling(False) + mission_control.recover_probe() + + # Destroy Capsule + report = MissionReport() + report.mission_status = MissionStatus.Done + report.new_missions.append( + TransportMission(self.rescuee_name, 'kerbin_orbit') + ) + + return report + + +class ComSatNetworkMission: + type = MissionType.comsat + + def __init__(self, body_name): + self.body = get_body(body_name) + + def execute(self, mission_control): + conn = get_connexion() + sc = conn.space_center + + if sc.active_vessel.name != mission_control.vessel.name: + sc.active_vessel = mission_control.vessel + + mission_control.toggle_engines(True) + + comsat_done = ComsatManeuver(conn, mission_control, self.body).execute() + if comsat_done: + report = MissionReport() + report.mission_status = MissionStatus.Done + + return report diff --git a/mission_control/__init__.py b/mission_control/__init__.py new file mode 100644 index 0000000..4dab6b0 --- /dev/null +++ b/mission_control/__init__.py @@ -0,0 +1,161 @@ +from time import sleep + +from krpc.services.spacecenter import CargoBayState, DockingPortState + +from maneuver_scheduler import ManeuverScheduler + +from mission import MissionStatus, MissionType + +from lib import get_connexion, get_vessel + + +class MissionControl: + mission_types = [] + + def __init__(self): + self.current_mission = None + self.current_orbit_missions = [] + self.vessel = NotImplemented + + def pick_missions(self, backlog): + current_body_name = self.vessel.orbit.body.name + for i, mission in enumerate(backlog.missions[current_body_name]): + if mission.type in self.mission_types: + if self.current_mission is None: + self.plan_mission(mission) + else: + self.current_orbit_missions.append(mission) + del (backlog.missions[current_body_name][i]) + + def plan_mission(self, mission): + self.current_mission = mission + planning_duration = 60 + ut = ManeuverScheduler.next_free_timeslot( + from_ut=get_connexion().space_center.ut + planning_duration, + duration=planning_duration) + ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration) + + def execute_mission(self, alarm): + if self.current_mission is None: + if self.current_orbit_missions: + self.current_mission = self.current_orbit_missions.pop(0) + else: + return + report = self.current_mission.execute(self) + if report and report.mission_status == MissionStatus.Done: + self.current_mission = None + + def get_kerbal(self, name): + for k in self.vessel.crew: + if k.name == "name": + return k + + raise LookupError('Kerbal {} not found onboard vessel {}'.format(name, self.vessel.name)) + + def get_docking_port(self, name): + for p in self.vessel.parts.docking_ports: + for m in p.modules: + if m.name == "ModuleDockingNodeNamed" and m.get_field('portName') == name: + return p + + +class ShuttleKerbin(MissionControl): + mission_types = [MissionType.orbit_rescue, MissionType.transport] + PISTON_MAX_EXTENSION = 2.4 + + def __init__(self, vessel_name): + super().__init__() + self.vessel_name = vessel_name + self.mothership = get_vessel(vessel_name) + self.probe = None + for e in self.vessel.parts.engines: + e.active = True + + self.current_orbit_missions = [] + self.first_cabin = self.mothership.parts.with_title("Mk2 Inline Cockpit")[0] + self.second_cabin = self.mothership.parts.with_title("MK2 Crew Cabin")[0] + self.third_cabin = self.mothership.parts.with_title("MK2 Crew Cabin")[1] + + self.docking_ports = self.mothership.parts.docking_ports + + @property + def vessel(self): + if self.probe is None: + return self.mothership + else: + return self.probe + + def toggle_grappling(self, value=True): + arm = self.get_grappling() + if arm.modules[1].has_event('Arm') and not value \ + or arm.modules[1].has_event('Disarm') and value: + arm.modules[1].set_action_by_id('ToggleAction', True) + + def get_grappling(self): + if self.probe is None: + raise Exception('Probe is not ready') + + return self.probe.parts.with_title("Advanced Grabbing Unit")[0] + + def get_probe_port(self): + # return self.probe.parts.with_name('Probe Port')[0] + return self.get_docking_port('Probe Port') + + def get_bay_port(self): + #return self.mothership.with_name('Shuttle Bay Port')[0] + return self.get_docking_port('Shuttle Bay Port') + + def deliver_probe(self): + bay = self.mothership.cargo_bays[0] + bay.open = True + while bay.state != CargoBayState.open: + sleep(1) + + hinge = self.mothership.robotic_hinges[0] + hinge.target_extension = 90 + while hinge.current_angle < 90: + sleep(.1) + + bay_port = self.get_bay_port() + self.probe = bay_port.undock() + + return self.probe + + def recover_probe(self): + bay_port = self.mothership.parts.with_name('Shuttle Bay Port')[0] + if bay_port.state != DockingPortState.docked: + raise Exception("Probe is not parked in shuttle") + self.probe = None + + hinge = self.mothership.robotic_hinges[0] + hinge.target_extension = 0 + while hinge.current_angle > 0: + sleep(.1) + + bay = self.mothership.cargo_bays[0] + bay.open = False + + def get_part_with_free_seat(self): + crew_size = self.mothership.crew_count + if self.mothership.crew_capacity > 10: + crew_size = crew_size - 1 + if crew_size <= 2: + return self.first_cabin + elif crew_size <= 6: + return self.second_cabin + elif crew_size <= 10: + return self.third_cabin + else: + raise Exception('Vessel already has no free sit') + + +class ComsatKerbin(MissionControl): + mission_types = [MissionType.comsat] + + def __init__(self, vessel_name): + super().__init__() + self.vessel_name = vessel_name + self.vessel = get_vessel(vessel_name) + + def toggle_engines(self, value=True): + self.vessel.parts.engines[0].active = value \ No newline at end of file