from connector import get_connexion import json import math class Timeslot: def __init__(self, ut_start, duration): self.ut_start = ut_start self.duration = duration @property def ut_end(self): return self.ut_start + self.duration @ut_end.setter def ut_end(self, value): self.duration = value - self.start class ManeuverScheduler: # alarm_manager = get_connexion().space_center.alarm_manager alarm_manager = get_connexion().kerbal_alarm_clock node_offsets = 60. default_duration = 5 * 60. @classmethod def get_last_alarm(cls): return cls.get_ordered_alarms()[0] @classmethod def get_ordered_alarms(cls): return sorted(cls.alarm_manager.alarms, key=lambda el: el.time) @classmethod def book_timeslot_for_node(cls, vessel, node, maneuver, alarm_start=None, duration=None): time_required = (node.delta_v * vessel.mass) / vessel.available_thrust if duration is None: duration = math.floor(2 * cls.node_offsets + time_required) description = { 'duration': duration, 'vessel_name': vessel.name } # arg_dict = { # 'title': "{}' Maneuver: {}".format(vessel.name, maneuver.name), # 'description': json.dumps(description), # 'offset': cls.node_offsets # } # cls.alarm_manager.add_maneuver_node_alarm( # vessel, # vessel.control.nodes[0], # **arg_dict) if alarm_start is None: alarm_start = node.ut - (duration / 2 + cls.node_offsets) if not cls.timeslot_is_free(alarm_start, duration): raise alarm = cls.alarm_manager.create_alarm( cls.alarm_manager.AlarmType.maneuver, "{}' Maneuver: {}".format(vessel.name, maneuver.name), alarm_start ) alarm.vessel = vessel # alarm.margin = cls.node_offsets alarm.notes = json.dumps(description) alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only @classmethod def book_timeslot(cls, ut, vessel, duration=None): if duration is None: duration = cls.default_duration if not cls.timeslot_is_free(ut, duration): raise description = { 'duration': duration, 'vessel_name': vessel.name } alarm = cls.alarm_manager.create_alarm( cls.alarm_manager.AlarmType.raw, "{}' Timeslot".format(vessel.name), ut ) alarm.vessel = vessel alarm.margin = cls.node_offsets alarm.notes = json.dumps(description) alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only @classmethod def book_timeslot_for_soi(cls, vessel, maneuver, duration=None): if duration is None: duration = cls.default_duration soi_change = vessel.orbit.time_to_soi_change if math.isnan(soi_change): raise ut_start = get_connexion().space_center.ut + soi_change if not cls.timeslot_is_free(ut_start, duration): raise notes = { 'duration': duration, 'vessel_name': vessel.name } alarm = cls.alarm_manager.create_alarm( cls.alarm_manager.AlarmType.soi_change, "{}' SOI".format(vessel.name), ut_start ) alarm.vessel = vessel alarm.margin = cls.node_offsets alarm.notes = json.dumps(notes) alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only @classmethod def timeslot_is_free(cls, ut_start: int, duration: int) -> bool: ut_end = ut_start + duration for a in cls.get_ordered_alarms(): try: notes = json.loads(a.notes) alarm_start = a.time alarm_end = a.time + notes['duration'] if alarm_end < ut_start: continue elif alarm_start <= ut_start <= alarm_end: return False elif ut_start <= alarm_end <= ut_end: return False else: return True except json.JSONDecodeError: continue return True @classmethod def next_free_timeslot(cls, from_ut=None, duration=None) -> int: if from_ut is None: from_ut = get_connexion().space_center.ut if duration is None: duration = cls.default_duration if cls.timeslot_is_free(from_ut, duration): return from_ut for a in cls.get_ordered_alarms(): try: notes = json.loads(a.notes) alarm_end = a.time + int(notes['duration']) if cls.timeslot_is_free(alarm_end + 1, duration): return alarm_end + 1 except json.JSONDecodeError: continue raise EOFError('Excepted to find a free timeslot at the end alarm list') @classmethod def get_reservation(cls, ut_at) -> Timeslot: pass @classmethod def delete_reservation(cls, ut_at, priority): reservation = cls.get_reservation(ut_at) if priority <= reservation.priority: raise