198 lines
6.8 KiB
Python
198 lines
6.8 KiB
Python
from time import sleep
|
|
|
|
import numpy as np
|
|
|
|
from krpc.services.spacecenter import CargoBayState, DockingPortState
|
|
from maneuvers.utils import point_toward_direction, point_toward_target
|
|
|
|
from maneuver_scheduler import ManeuverScheduler
|
|
|
|
from mission import MissionStatus, MissionType
|
|
|
|
from lib import get_connexion, get_vessel
|
|
|
|
|
|
class MissionControl:
|
|
mission_types = []
|
|
vessel = None
|
|
|
|
def __init__(self):
|
|
self.current_mission = None
|
|
self.current_orbit_missions = []
|
|
|
|
def pick_missions(self, backlog):
|
|
current_body_name = self.vessel.orbit.body.name
|
|
for i, mission in enumerate(backlog.missions[current_body_name]):
|
|
if mission.type in self.mission_types:
|
|
if self.current_mission is None:
|
|
self.plan_mission(mission)
|
|
else:
|
|
self.current_orbit_missions.append(mission)
|
|
del (backlog.missions[current_body_name][i])
|
|
|
|
def plan_mission(self, mission):
|
|
self.current_mission = mission
|
|
planning_duration = 60
|
|
ut = ManeuverScheduler.next_free_timeslot(
|
|
from_ut=get_connexion().space_center.ut + planning_duration,
|
|
duration=planning_duration)
|
|
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
|
|
|
|
def execute_mission(self, alarm):
|
|
if self.current_mission is None:
|
|
if self.current_orbit_missions:
|
|
self.current_mission = self.current_orbit_missions.pop(0)
|
|
else:
|
|
return
|
|
report = self.current_mission.execute(self)
|
|
if report and report.mission_status == MissionStatus.Done:
|
|
self.current_mission = None
|
|
|
|
def get_kerbal(self, name):
|
|
for k in self.vessel.crew:
|
|
if k.name == name:
|
|
return k
|
|
|
|
raise LookupError('Kerbal {} not found onboard vessel {}'.format(name, self.vessel.name))
|
|
|
|
def get_docking_port(self, name):
|
|
for m in self.vessel.parts.modules_with_name('ModuleDockingNodeNamed'):
|
|
if m.get_field_by_id('portName') == name:
|
|
return m.part.docking_port
|
|
|
|
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.vessel.name))
|
|
|
|
|
|
class ShuttleKerbin(MissionControl):
|
|
mission_types = [MissionType.orbit_rescue, MissionType.transport]
|
|
PISTON_MAX_EXTENSION = 2.4
|
|
|
|
def __init__(self, vessel_name):
|
|
super().__init__()
|
|
self.vessel_name = vessel_name
|
|
self.mothership = get_vessel(vessel_name)
|
|
self.probe = None
|
|
for e in self.vessel.parts.engines:
|
|
e.active = True
|
|
|
|
self.current_orbit_missions = []
|
|
self.first_cabin = self.mothership.parts.with_name('mk2Cockpit.Inline')[0]
|
|
self.second_cabin = self.mothership.parts.with_name('mk2CrewCabin')[0]
|
|
self.third_cabin = self.mothership.parts.with_name('mk2CrewCabin')[1]
|
|
|
|
self.docking_ports = self.mothership.parts.docking_ports
|
|
|
|
@property
|
|
def vessel(self):
|
|
if self.probe is None:
|
|
return self.mothership
|
|
else:
|
|
return self.probe
|
|
|
|
@vessel.setter
|
|
def vessel(self, value):
|
|
if self.probe is None:
|
|
self.mothership = value
|
|
else:
|
|
self.probe = value
|
|
|
|
def toggle_grappling(self, value=True):
|
|
arm = self.get_grappling()
|
|
if arm.modules[1].has_event('Arm') and value \
|
|
or arm.modules[1].has_event('Disarm') and not value:
|
|
arm.modules[1].set_action_by_id('ToggleAction', True)
|
|
|
|
def get_grappling(self):
|
|
if self.probe is None:
|
|
return self.vessel.parts.with_name('GrapplingDevice')[0]
|
|
|
|
return self.probe.parts.with_name('GrapplingDevice')[0]
|
|
|
|
def get_probe_port(self):
|
|
return self.get_docking_port('Tug Probe Port')
|
|
|
|
def get_bay_port(self):
|
|
if self.probe is not None:
|
|
return self.get_mothership_docking_port('Shuttle Bay Port')
|
|
return self.get_docking_port('Shuttle Bay Port')
|
|
|
|
def get_mothership_docking_port(self, name):
|
|
for m in self.mothership.parts.modules_with_name('ModuleDockingNodeNamed'):
|
|
if m.get_field_by_id('portName') == name:
|
|
return m.part.docking_port
|
|
|
|
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
|
|
|
|
def get_probe_docking_port(self, name):
|
|
for m in self.probe.parts.modules_with_name('ModuleDockingNodeNamed'):
|
|
if m.get_field_by_id('portName') == name:
|
|
return m.part.docking_port
|
|
|
|
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
|
|
|
|
def deliver_probe(self, target=None):
|
|
bay = self.mothership.parts.cargo_bays[0]
|
|
bay.open = True
|
|
|
|
with get_connexion().stream(getattr, bay, 'state') as state:
|
|
with state.condition:
|
|
while not state() == CargoBayState.open:
|
|
state.wait()
|
|
|
|
hinge = self.mothership.parts.robotic_hinges[0]
|
|
hinge.target_angle = 90
|
|
|
|
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
|
|
with current_angle.condition:
|
|
while current_angle() < hinge.target_angle - 1:
|
|
current_angle.wait()
|
|
|
|
if target is not None:
|
|
self.vessel.parts.controlling = self.get_bay_port().part
|
|
point_toward_target(get_connexion(), self.vessel, target)
|
|
|
|
bay_port = self.get_bay_port()
|
|
self.probe = bay_port.undock()
|
|
|
|
return self.probe
|
|
|
|
def recover_probe(self):
|
|
bay_port = self.get_bay_port()
|
|
if bay_port.state != DockingPortState.docked:
|
|
raise Exception("Probe is not parked in shuttle")
|
|
self.probe = None
|
|
|
|
hinge = self.mothership.parts.robotic_hinges[0]
|
|
hinge.target_angle = 0
|
|
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
|
|
with current_angle.condition:
|
|
while current_angle() > hinge.target_angle:
|
|
current_angle.wait()
|
|
|
|
bay = self.mothership.parts.cargo_bays[0]
|
|
bay.open = False
|
|
|
|
def get_part_with_free_seat(self):
|
|
crew_size = self.mothership.crew_count
|
|
if self.mothership.crew_capacity > 10:
|
|
crew_size = crew_size - 1
|
|
if crew_size <= 2:
|
|
return self.first_cabin
|
|
elif crew_size <= 6:
|
|
return self.second_cabin
|
|
elif crew_size <= 10:
|
|
return self.third_cabin
|
|
else:
|
|
raise Exception('Vessel already has no free sit')
|
|
|
|
|
|
class ComsatKerbin(MissionControl):
|
|
mission_types = [MissionType.comsat]
|
|
|
|
def __init__(self, vessel_name):
|
|
super().__init__()
|
|
self.vessel_name = vessel_name
|
|
self.vessel = get_vessel(vessel_name)
|
|
|
|
def toggle_engines(self, value=True):
|
|
self.vessel.parts.engines[0].active = value |