from time import sleep import numpy as np from krpc.services.spacecenter import CargoBayState, DockingPortState from maneuvers.utils import point_toward_direction, point_toward_target from maneuver_scheduler import ManeuverScheduler from mission import MissionStatus, MissionType from lib import get_connexion, get_vessel class MissionControl: mission_types = [] vessel = None def __init__(self): self.current_mission = None self.current_orbit_missions = [] def pick_missions(self, backlog): current_body_name = self.vessel.orbit.body.name for i, mission in enumerate(backlog.missions[current_body_name]): if mission.type in self.mission_types: if self.current_mission is None: self.plan_mission(mission) else: self.current_orbit_missions.append(mission) del (backlog.missions[current_body_name][i]) def plan_mission(self, mission): self.current_mission = mission planning_duration = 60 ut = ManeuverScheduler.next_free_timeslot( from_ut=get_connexion().space_center.ut + planning_duration, duration=planning_duration) ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration) def execute_mission(self, alarm): if self.current_mission is None: if self.current_orbit_missions: self.current_mission = self.current_orbit_missions.pop(0) else: return report = self.current_mission.execute(self) if report and report.mission_status == MissionStatus.Done: self.current_mission = None def get_kerbal(self, name): for k in self.vessel.crew: if k.name == name: return k raise LookupError('Kerbal {} not found onboard vessel {}'.format(name, self.vessel.name)) def get_docking_port(self, name): for m in self.vessel.parts.modules_with_name('ModuleDockingNodeNamed'): if m.get_field_by_id('portName') == name: return m.part.docking_port raise LookupError('Docking port {} not found on vessel {}'.format(name, self.vessel.name)) class ShuttleKerbin(MissionControl): mission_types = [MissionType.orbit_rescue, MissionType.transport] PISTON_MAX_EXTENSION = 2.4 def __init__(self, vessel_name): super().__init__() self.vessel_name = vessel_name self.mothership = get_vessel(vessel_name) self.probe = None for e in self.vessel.parts.engines: e.active = True self.current_orbit_missions = [] self.first_cabin = self.mothership.parts.with_name('mk2Cockpit.Inline')[0] self.second_cabin = self.mothership.parts.with_name('mk2CrewCabin')[0] self.third_cabin = self.mothership.parts.with_name('mk2CrewCabin')[1] self.docking_ports = self.mothership.parts.docking_ports @property def vessel(self): if self.probe is None: return self.mothership else: return self.probe @vessel.setter def vessel(self, value): if self.probe is None: self.mothership = value else: self.probe = value def toggle_grappling(self, value=True): arm = self.get_grappling() if arm.modules[1].has_event('Arm') and value \ or arm.modules[1].has_event('Disarm') and not value: arm.modules[1].set_action_by_id('ToggleAction', True) def get_grappling(self): if self.probe is None: return self.vessel.parts.with_name('GrapplingDevice')[0] return self.probe.parts.with_name('GrapplingDevice')[0] def get_probe_port(self): return self.get_docking_port('Tug Probe Port') def get_bay_port(self): if self.probe is not None: return self.get_mothership_docking_port('Shuttle Bay Port') return self.get_docking_port('Shuttle Bay Port') def get_mothership_docking_port(self, name): for m in self.mothership.parts.modules_with_name('ModuleDockingNodeNamed'): if m.get_field_by_id('portName') == name: return m.part.docking_port raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name)) def get_probe_docking_port(self, name): for m in self.probe.parts.modules_with_name('ModuleDockingNodeNamed'): if m.get_field_by_id('portName') == name: return m.part.docking_port raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name)) def deliver_probe(self, target=None): bay = self.mothership.parts.cargo_bays[0] bay.open = True with get_connexion().stream(getattr, bay, 'state') as state: with state.condition: while not state() == CargoBayState.open: state.wait() hinge = self.mothership.parts.robotic_hinges[0] hinge.target_angle = 90 with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle: with current_angle.condition: while current_angle() < hinge.target_angle - 1: current_angle.wait() if target is not None: self.vessel.parts.controlling = self.get_bay_port().part point_toward_target(get_connexion(), self.vessel, target) bay_port = self.get_bay_port() self.probe = bay_port.undock() return self.probe def recover_probe(self): bay_port = self.get_bay_port() if bay_port.state != DockingPortState.docked: raise Exception("Probe is not parked in shuttle") self.probe = None hinge = self.mothership.parts.robotic_hinges[0] hinge.target_angle = 0 with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle: with current_angle.condition: while current_angle() > hinge.target_angle: current_angle.wait() bay = self.mothership.parts.cargo_bays[0] bay.open = False def get_part_with_free_seat(self): crew_size = self.mothership.crew_count if self.mothership.crew_capacity > 10: crew_size = crew_size - 1 if crew_size <= 2: return self.first_cabin elif crew_size <= 6: return self.second_cabin elif crew_size <= 10: return self.third_cabin else: raise Exception('Vessel already has no free sit') class ComsatKerbin(MissionControl): mission_types = [MissionType.comsat] def __init__(self, vessel_name): super().__init__() self.vessel_name = vessel_name self.vessel = get_vessel(vessel_name) def toggle_engines(self, value=True): self.vessel.parts.engines[0].active = value