Compare commits

..

11 Commits

25 changed files with 1436 additions and 336 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.idea

8
.idea/.gitignore generated vendored
View File

@@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

1
.idea/.name generated
View File

@@ -1 +0,0 @@
main.py

View File

@@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

10
.idea/kttd.iml generated
View File

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

4
.idea/misc.xml generated
View File

@@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (rocket_autopilot)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated
View File

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/kttd.iml" filepath="$PROJECT_DIR$/.idea/kttd.iml" />
</modules>
</component>
</project>

View File

@@ -1,35 +0,0 @@
class Timeslot:
def __init__(self, ut_start, duration):
self.ut_start = ut_start
self.duration = duration
@property
def ut_end(self):
return self.ut_start + self.duration
@ut_end.setter
def ut_end(self, value):
self.duration = self.value - self.start
class Calendar:
def create_reservation(self, ut_start, duration, maneuver):
if not self.timeslot_is_free(ut_start, duration):
raise
pass
def timeslot_is_free(self, ut_start: int, duration: int) -> bool:
pass
def next_free_timeslot(self, from_ut, duration=None) -> int:
pass
def get_reservation(self, ut_at) -> Timeslot:
pass
def delete_reservation(self, ut_at, priority):
reservation = self.get_re(ut_at)
if priority <= reservation.priority:
raise

16
connector.py Normal file
View File

@@ -0,0 +1,16 @@
import krpc
class Connector:
connexion = None
@classmethod
def get_connexion(cls):
if cls.connexion is None:
cls.connexion = krpc.connect()
return cls.connexion
def get_connexion():
return Connector.get_connexion()

View File

@@ -1,15 +1,44 @@
from time import sleep
import numpy as np
from simple_pid import PID
import numpy as np
import krpc
from maneuvers.utils import get_required_rcs_thrust
conn = krpc.connect()
sc = conn.space_center
vessel = sc.active_vessel
docking_part = vessel.parts.root.children[0].children[10].children[0].children[0].children[0].children[0]
target = sc.target_vessel
target.velocity(vessel.reference_frame)
conn.drawing.add_direction(target.velocity(vessel.reference_frame), vessel.reference_frame)
get_required_rcs_thrust(vessel, target.velocity(vessel.reference_frame))
from maneuvers.utils import magnitude
kill_relative_velocity_rcs(vessel, target)
mj = conn.mech_jeb
sa = mj.smart_ass
reference_frame = sc.ReferenceFrame.create_relative(target.reference_frame, rotation=(1., 0., 0., 0.))
conn.drawing.add_direction((0, 1, 0), reference_frame)
from time import sleep
import krpc
from maneuvers.utils import magnitude
conn = krpc.connect()
sc = conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
mj = conn.mech_jeb
sa = mj.smart_ass
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_plus
sa.update(False)
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_minus
sa.update(False)
while True:
print(magnitude(vessel.angular_velocity(target.reference_frame)))
sleep(.1)
from mission_control import ShuttleKerbin
s = ShuttleKerbin("KKS Gagarin")
s.recover_probe()

33
lib.py Normal file
View File

@@ -0,0 +1,33 @@
from connector import get_connexion
def get_contract(title):
for c in get_connexion().space_center.contract_manager.active_contracts:
if c.title == title:
return c
raise LookupError('Contract "{}" not found'.format(title))
def get_vessel(name):
for v in get_connexion().space_center.vessels:
if v.name == name:
return v
raise LookupError("Vessel {} not found".format(name))
def get_rescuee_vessel(rescuee_name):
for v in get_connexion().space_center.vessels:
if rescuee_name in v.name:
return v
raise LookupError("Rescuee {} vessel not found".format(rescuee_name))
def get_body(name):
bodies = get_connexion().space_center.bodies
if name in bodies:
return bodies[name]
raise LookupError("Celestial body {} not found".format(name))

64
main.py
View File

@@ -1,16 +1,62 @@
# This is a sample Python script.
import sys
import signal
# Press Maj+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
from mission_control import ShuttleKerbin, ComsatKerbin
from mission import RescueMission, ComSatNetworkMission
from maneuver_scheduler import ManeuverScheduler
from connector import get_connexion
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
class Backlog:
missions = {
'Kerbin': [RescueMission('Rescue Rossby from orbit of Kerbin.'), ] # ComSatNetworkMission('Kerbin')]
}
kerbin_to_orbit = []
kerbin_to_ground = []
def signal_handler(signal, frame):
print("\nBye!")
sys.exit(0)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('PyCharm')
signal.signal(signal.SIGINT, signal_handler)
ships = []
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
ships.append(ShuttleKerbin('KKS Gagarin'))
ships.append(ComsatKerbin('KKR Shepard'))
conn = get_connexion()
# conn.space_center.GameMode
space_center = conn.space_center
for s in ships:
s.pick_missions(Backlog)
while True:
for s in ships:
s.pick_missions(Backlog)
space_center.rails_warp_factor = 7
with conn.stream(getattr, space_center, 'rails_warp_factor') as rails_warp_factor:
with rails_warp_factor.condition:
while rails_warp_factor() > 0:
rails_warp_factor.wait()
alarm = ManeuverScheduler.get_last_alarm()
vessel = alarm.vessel
current_ship = None
for s in ships:
if s.vessel.name == vessel.name:
current_ship = s
break
if current_ship is None:
raise LookupError("Current ship {} not found".format(vessel.name))
current_ship.execute_mission(alarm)
alarm.remove()
print("turn")

163
maneuver_scheduler.py Normal file
View File

@@ -0,0 +1,163 @@
from connector import get_connexion
import json
import math
class Timeslot:
def __init__(self, ut_start, duration):
self.ut_start = ut_start
self.duration = duration
@property
def ut_end(self):
return self.ut_start + self.duration
@ut_end.setter
def ut_end(self, value):
self.duration = value - self.start
class ManeuverScheduler:
# alarm_manager = get_connexion().space_center.alarm_manager
alarm_manager = get_connexion().kerbal_alarm_clock
node_offsets = 60.
default_duration = 5 * 60.
@classmethod
def get_last_alarm(cls):
return cls.get_ordered_alarms()[0]
@classmethod
def get_ordered_alarms(cls):
return sorted(cls.alarm_manager.alarms, key=lambda el: el.time)
@classmethod
def book_timeslot_for_node(cls, vessel, node, maneuver, alarm_start=None, duration=None):
time_required = (node.delta_v * vessel.mass) / vessel.available_thrust
if duration is None:
duration = math.floor(2 * cls.node_offsets + time_required)
description = {
'duration': duration,
'vessel_name': vessel.name
}
if alarm_start is None:
alarm_start = node.ut - (duration / 2 + cls.node_offsets)
if not cls.timeslot_is_free(alarm_start, duration):
raise Exception('Timeslot is occupied')
alarm = cls.alarm_manager.create_alarm(
cls.alarm_manager.AlarmType.maneuver,
"{}' Maneuver: {}".format(vessel.name, maneuver.name),
alarm_start
)
alarm.vessel = vessel
# alarm.margin = cls.node_offsets
alarm.notes = json.dumps(description)
alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only
@classmethod
def book_timeslot(cls, ut, vessel, duration=None):
if duration is None:
duration = cls.default_duration
if not cls.timeslot_is_free(ut, duration):
raise
description = {
'duration': duration,
'vessel_name': vessel.name
}
alarm = cls.alarm_manager.create_alarm(
cls.alarm_manager.AlarmType.raw,
"{}' Timeslot".format(vessel.name),
ut
)
alarm.vessel = vessel
alarm.margin = cls.node_offsets
alarm.notes = json.dumps(description)
alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only
@classmethod
def book_timeslot_for_soi(cls, vessel, maneuver, duration=None):
if duration is None:
duration = cls.default_duration
soi_change = vessel.orbit.time_to_soi_change
if math.isnan(soi_change):
raise
ut_start = get_connexion().space_center.ut + soi_change
if not cls.timeslot_is_free(ut_start, duration):
raise
notes = {
'duration': duration,
'vessel_name': vessel.name
}
alarm = cls.alarm_manager.create_alarm(
cls.alarm_manager.AlarmType.soi_change,
"{}' SOI".format(vessel.name),
ut_start
)
alarm.vessel = vessel
alarm.margin = cls.node_offsets
alarm.notes = json.dumps(notes)
alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only
@classmethod
def timeslot_is_free(cls, ut_start: int, duration: int) -> bool:
ut_end = ut_start + duration
for a in cls.get_ordered_alarms():
try:
notes = json.loads(a.notes)
alarm_start = a.time
alarm_end = a.time + notes['duration']
if alarm_end < ut_start:
continue
elif alarm_start <= ut_start <= alarm_end:
return False
elif ut_start <= alarm_end <= ut_end:
return False
else:
return True
except json.JSONDecodeError:
continue
return True
@classmethod
def next_free_timeslot(cls, from_ut=None, duration=None) -> int:
if from_ut is None:
from_ut = get_connexion().space_center.ut
if duration is None:
duration = cls.default_duration
if cls.timeslot_is_free(from_ut, duration):
return from_ut
for a in cls.get_ordered_alarms():
try:
notes = json.loads(a.notes)
alarm_end = a.time + int(notes['duration'])
if cls.timeslot_is_free(alarm_end + 1, duration):
return alarm_end + 1
except json.JSONDecodeError:
continue
raise EOFError('Expected to find a free timeslot at the end alarm list')
@classmethod
def get_reservation(cls, ut_at) -> Timeslot:
pass
@classmethod
def delete_reservation(cls, ut_at, priority):
reservation = cls.get_reservation(ut_at)
if priority <= reservation.priority:
raise

View File

@@ -0,0 +1,74 @@
import math
from enum import Enum
from maneuver_scheduler import ManeuverScheduler
class ManeuverAlarmType(Enum):
ManeuverNode = 1,
SOI = 2
class Maneuver:
def __init__(self, conn, mission_control):
self.mission_control = mission_control
self.vessel = mission_control.vessel
self.conn = conn
def plan_next_maneuver(self):
pass
class NodeManeuver(Maneuver):
alarm_type = ManeuverAlarmType.ManeuverNode
def __init__(self, conn, mission_control):
super().__init__(conn, mission_control)
self.mech_jeb = conn.mech_jeb
self.node_executor = self.mech_jeb.node_executor
def execute(self) -> bool:
sc = self.conn.space_center
if sc.active_vessel.name != self.vessel.name:
sc.active_vessel = self.vessel
while self.vessel.control.nodes:
self._execute_node()
return self.plan_next_maneuver()
def _execute_node(self):
self.node_executor.execute_all_nodes()
with self.conn.stream(getattr, self.node_executor, "enabled") as enabled:
enabled.rate = 1
with enabled.condition:
while enabled():
enabled.wait()
def book_timeslot_for_node(self, node, maneuver, duration=None):
if node.time_to < 0:
node.remove()
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(self.conn.space_center.ut + planning_duration, planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
time_required = (node.delta_v * self.vessel.mass) / self.vessel.available_thrust
duration = math.floor(2 * ManeuverScheduler.node_offsets + time_required)
alarm_start = node.ut - (duration / 2 + ManeuverScheduler.node_offsets)
if ManeuverScheduler.timeslot_is_free(alarm_start, duration):
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self, alarm_start=alarm_start, duration=duration)
else:
node.remove()
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(alarm_start, planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
class MechJebManeuver(NodeManeuver):
def __init__(self, conn, mission_control):
super().__init__(conn, mission_control)
self.maneuver_planner = self.mech_jeb.maneuver_planner

View File

@@ -1,35 +1,53 @@
from krpc.services.spacecenter import SASMode
import numpy as np
from time import time, sleep
from utils import magnitude, kill_relative_velocity, correct_course
from .utils import magnitude, unitary, kill_relative_velocity, correct_course, get_safety_radius, point_toward_direction
from . import Maneuver
def unitary(vector):
return np.array(vector) / magnitude(vector)
class ApproachManeuver(Maneuver):
def __init__(self, conn, mission_control, reference_frame):
super().__init__(conn, mission_control)
self.reference_frame = reference_frame
def get_safety_radius(vessel):
bbox = vessel.bounding_box(vessel.reference_frame)
return max(magnitude(bbox[0]), magnitude(bbox[1]))
def start(self):
sc = self.conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
kill_relative_velocity(self.conn, vessel, self.reference_frame)
def point_toward_direction(vessel, direction, reference_frame):
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = unitary(direction)
ap.target_roll = 0
ap.sas = False
ap.engage()
sleep(.1)
self.conn.drawing.add_direction((0, 1, 0), self.reference_frame)
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
vessel.control.rcs = False
ap.disengage()
ap.sas_mode = SASMode.stability_assist
ap.sas = True
pv = vessel.position(self.reference_frame)
safety_radius = get_safety_radius(vessel) + get_safety_radius(target) + SAFETY_RADIUS_MARGIN
# if under and inside safety cylinder's circle
if pv[1] < safety_radius and pow(pv[0], 2) + pow(pv[2], 2) <= pow(safety_radius, 2):
print("We're under the target and inside the safety cylinder, getting out")
# get out of the cylinder
plane_move_vector = unitary(tuple((pv[0], pv[2]))) * (safety_radius - magnitude(tuple((pv[0], pv[2]))))
pv = vessel.position(self.reference_frame)
move_vector = np.array((plane_move_vector[0], 0, plane_move_vector[1]))
move_to_waypoint(self.conn, vessel, pv + move_vector, self.reference_frame)
print("We're outside of the safety cylinder, setting vertical distance")
pv = vessel.position(self.reference_frame)
move_to_waypoint(self.conn, vessel, (pv[0], safety_radius, pv[2]), self.reference_frame)
# should be above and outside => get inside
print("We're at the right vertical distance to the target, setting horizontal position")
move_to_waypoint(self.conn, vessel, (0, safety_radius, 0), self.reference_frame)
point_toward_direction(vessel, - np.array(vessel.position(self.reference_frame)), self.reference_frame)
return True
THROTTLE = .1
@@ -106,8 +124,6 @@ def move_to_waypoint(conn, vessel, waypoint, reference_frame):
thrust(vessel, -velocity, reference_frame)
print("Ship decelerated")
#do positition correction
sa.autopilot_mode = mj.SmartASSAutopilotMode.off
sa.update(True)
@@ -118,6 +134,7 @@ def move_to_waypoint(conn, vessel, waypoint, reference_frame):
SAFETY_RADIUS_MARGIN = 10
# DEPRECATED
def maneuver_to_approach(conn, reference_frame):
print("Handling approach")
sc = conn.space_center
@@ -158,6 +175,7 @@ def maneuver_to_approach(conn, reference_frame):
TARGET_VELOCITY = 2
# DEPRECATED
def move_with_vector(conn, vessel, vector, reference_frame):
mj = conn.mech_jeb
sa = mj.smart_ass

101
maneuvers/approach_rcs.py Normal file
View File

@@ -0,0 +1,101 @@
import numpy as np
from time import time, sleep
from .utils import magnitude, unitary, kill_relative_velocity_rcs, correct_course, get_safety_radius,\
point_toward_direction
from . import Maneuver
class ApproachRCSManeuver(Maneuver):
SAFETY_RADIUS_MARGIN = 10
def __init__(self, conn, mission_control, reference_frame):
super().__init__(conn, mission_control)
self.reference_frame = reference_frame
def start(self):
sc = self.conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
kill_relative_velocity_rcs(vessel, target)
self.conn.drawing.add_direction((0, 1, 0), self.reference_frame)
vessel.control.rcs = False
pv = vessel.position(self.reference_frame)
safety_radius = get_safety_radius(vessel) + get_safety_radius(target) + self.SAFETY_RADIUS_MARGIN
# if under and inside safety cylinder's circle
if pv[1] < safety_radius and pow(pv[0], 2) + pow(pv[2], 2) <= pow(safety_radius, 2):
print("We're under the target and inside the safety cylinder, getting out")
# get out of the cylinder
planar_move_vector = unitary((pv[0], pv[2])) * (safety_radius - magnitude((pv[0], pv[2])))
spacial_move_vector = np.array((planar_move_vector[0], 0, planar_move_vector[1]))
pv = vessel.position(self.reference_frame)
move_to_waypoint(self.conn, vessel, pv + spacial_move_vector, self.reference_frame)
print("We're outside of the safety cylinder, setting vertical distance")
pv = vessel.position(self.reference_frame)
move_to_waypoint(self.conn, vessel, (pv[0], safety_radius, pv[2]), self.reference_frame)
# should be above and outside => get inside
print("We're at the right vertical distance to the target, setting horizontal position")
move_to_waypoint(self.conn, vessel, (0, safety_radius, 0), self.reference_frame)
# point_toward_direction(vessel, - np.array(vessel.position(self.reference_frame)), self.reference_frame)
point_toward_direction(vessel, (0, -1, 0), self.reference_frame)
return True
def move_to_waypoint(conn, vessel, waypoint, reference_frame):
target = conn.space_center.target_vessel
kill_relative_velocity_rcs(vessel, target)
conn.drawing.add_line(vessel.position(reference_frame), waypoint, reference_frame)
waypoint = np.array(waypoint)
start_position = np.array(vessel.position(reference_frame))
vector = waypoint - start_position
distance = magnitude(vector)
direction = unitary(vector)
acceleration_distance = distance / 4
point_toward_direction(vessel, direction, reference_frame)
print("Starting acceleration")
remaining_distance = distance
vessel.control.rcs = True
vessel.control.forward = 1
while remaining_distance > 3 * acceleration_distance:
sleep(.1)
remaining_distance = magnitude(waypoint - vessel.position(reference_frame))
vessel.control.forward = 0
print("Target velocity achieved")
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
while remaining_distance > acceleration_distance:
sleep(.1)
correct_course(conn, vessel, waypoint, reference_frame)
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
print(remaining_distance)
print("Starting deceleration")
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
vessel.control.forward = -1
while remaining_distance > 0:
sleep(.1)
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
vessel.control.forward = 0
kill_relative_velocity_rcs(vessel, target)
print("Ship decelerated")
print("destination position: {}".format(waypoint))
print("end position: {}".format(np.array(vessel.position(reference_frame))))

95
maneuvers/comsat.py Normal file
View File

@@ -0,0 +1,95 @@
import math
from maneuver_scheduler import ManeuverScheduler
from . import MechJebManeuver
class ComsatManeuver(MechJebManeuver):
def __init__(self, conn, mission_control, target_body):
super().__init__(conn, mission_control)
self.target_body = target_body
body = self.target_body
if body.satellites:
lowest_sat = min(body.satellites, key=lambda sat: sat.orbit.periapsis)
max_orbit = lowest_sat.orbit.periapsis_altitude - lowest_sat.sphere_of_influence
else:
max_orbit = body.sphere_of_influence
self.target_altitude = max_orbit - ((5 / 100) * max_orbit)
def start(self):
self.plan_next_maneuver()
def plan_next_maneuver(self):
sc = self.conn.space_center
vessel = sc.active_vessel
if vessel.orbit.body.name != self.target_body.name:
raise NotImplementedError
if not math.isclose(vessel.orbit.apoapsis_altitude, self.target_altitude, rel_tol=.01):
SetOrbitApoapsis(self.conn, self.mission_control, self.target_body).prepare_maneuver()
elif not math.isclose(vessel.orbit.eccentricity, 0, abs_tol=.001) or self.vessel.control.current_stage > 1:
CircularizeOrbitAndDeliver(self.conn, self.mission_control, self.target_body).prepare_maneuver()
elif self.vessel.control.current_stage <= 1:
return True
return False
class SetOrbitApoapsis(ComsatManeuver):
name = "Set target orbit's apoapsis"
def prepare_maneuver(self):
oa = self.maneuver_planner.operation_apoapsis
oa.new_apoapsis = self.target_altitude
oa.time_selector.time_reference = self.mech_jeb.TimeReference.periapsis
nodes = oa.make_nodes()
node = nodes[0]
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self)
class CircularizeOrbitAndDeliver(ComsatManeuver):
name = "Circularize orbit and deliver comsat"
def prepare_maneuver(self):
oc = self.maneuver_planner.operation_circularize
oc.time_selector.time_reference = self.mech_jeb.TimeReference.apoapsis
nodes = oc.make_nodes()
node = nodes[0]
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self)
def execute(self) -> bool:
sc = self.conn.space_center
if sc.active_vessel.name != self.vessel.name:
sc.active_vessel = self.vessel
while self.vessel.control.nodes:
self._execute_node()
current_stage = self.vessel.control.current_stage
if current_stage > 1:
relay = self.vessel.control.activate_next_stage()
sc.active_vessel = relay[0]
sc.active_vessel.name = self.vessel.name + " " + current_stage
sc.active_vessel.control.solar_panels = True
sc.active_vessel = self.vessel
oro = self.maneuver_planner.operation_resonant_orbit
oro.resonance_numerator = 2
oro.resonance_denominator = 3
oro.time_selector.lead_time = 10
oro.time_selector.time_reference = self.mech_jeb.TimeReference.x_from_now
oro.make_nodes()
while self.vessel.control.nodes:
self._execute_node()
return self.plan_next_maneuver()
else:
return True

View File

@@ -1,156 +1,36 @@
from time import sleep
from utils import kill_relative_velocity, correct_course, magnitude
from .utils import kill_relative_velocity, kill_relative_velocity_rcs, correct_course, magnitude
from . import Maneuver
def set_attitude_and_roll(conn, vessel, reference_frame):
fl = vessel.flight(reference_frame)
vessel.control.rcs = False
class DockingManeuver(Maneuver):
def __init__(self, conn, mission_control, docking_port, target_docking_port):
super().__init__(conn, mission_control)
self.mech_jeb = conn.mech_jeb
self.docking_port = docking_port
self.target_docking_port = target_docking_port
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = (0, -1, 0)
ap.target_roll = 0
ap.sas = False
ap.engage()
ap.wait()
ap.disengage()
def start(self):
vessel = self.conn.space_center.active_vessel
mj = conn.mech_jeb
sa = mj.smart_ass
self.conn.space_center.active_vessel = self.mission_control.vessel
vessel.parts.controlling = self.docking_port.part
self.conn.space_center.target_docking_port = self.target_docking_port
sa.autopilot_mode = mj.SmartASSAutopilotMode.target_plus
sa.update(False)
da = self.mech_jeb.docking_autopilot
da.speed_limit = 10
da.roll = 0
da.force_roll = True
da.enabled = True
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
sleep(1)
print("Ship pointing to dock")
with self.conn.stream(getattr, da, "enabled") as enabled:
enabled.rate = 1
with enabled.condition:
while enabled():
enabled.wait()
def rcs_push(vessel, axis, duration):
vessel.control.rcs = True
if "x" in axis:
vessel.control.up = axis["x"]
elif "y" in axis:
vessel.control.forward = axis["y"]
elif "z" in axis:
vessel.control.right = axis["z"]
sleep(duration)
if "x" in axis:
vessel.control.up = 0
elif "y" in axis:
vessel.control.forward = 0
elif "z" in axis:
vessel.control.right = 0
vessel.control.rcs = False
def kill_rcs_velocity(vessel, reference_frame):
print("Killing RCS velocity")
velo = vessel.velocity(reference_frame)
vessel.control.rcs = True
while any(abs(component) > .05 for component in velo) > .05:
if abs(velo[0]) > .05:
sign = -velo[0] / abs(velo[0])
if abs(velo[0]) > .1:
vessel.control.up = 1 * sign
elif abs(velo[0]) > .05:
vessel.control.up = .1 * sign
else:
vessel.control.up = 0
if abs(velo[1]) > .05:
sign = -velo[1] / abs(velo[1])
if abs(velo[1]) > .1:
vessel.control.forward = 1 * sign
elif abs(velo[1]) > .05:
vessel.control.forward = .1 * sign
else:
vessel.control.forward = 0
if abs(velo[2]) > .05:
sign = velo[2] / abs(velo[2])
if abs(velo[2]) > .1:
vessel.control.right = 1 * sign
elif abs(velo[2]) > .05:
vessel.control.right = .1 * sign
else:
vessel.control.right = 0
sleep(.1)
velo = vessel.velocity(reference_frame)
vessel.control.rcs = False
print("RCS velocity killed")
def align_horizontally(conn, vessel, reference_frame):
conn.drawing.add_direction((1, 0, 0), vessel.reference_frame)
while abs(vessel.position(reference_frame)[0]) > .1 \
or abs(vessel.position(reference_frame)[2]) > .1:
# determine power requirements of each
sign_x = 1 if vessel.position(reference_frame)[0] > 0 else -1
if abs(vessel.position(reference_frame)[0]) > 1:
power_x = 1
elif abs(vessel.position(reference_frame)[0]) > .1:
power_x = .1
else:
power_x = 0
sign_x = 0
sign_z = 1 if vessel.position(reference_frame)[2] > 0 else -1
if abs(vessel.position(reference_frame)[2]) > 1:
power_z = 1
elif abs(vessel.position(reference_frame)[2]) > .1:
power_z = .1
else:
power_z = 0
sign_z = 0
axis = {}
if power_x > 0:
axis["x"] = -sign_x * power_x
if power_z > 0:
axis["z"] = sign_z * power_z
rcs_push(vessel, axis, 1)
while (sign_x > 0 and vessel.position(reference_frame)[0] > .1
or sign_x < 0 and vessel.position(reference_frame)[0] < -.1
or sign_x == 0) \
and (sign_z > 0 and vessel.position(reference_frame)[2] > .1
or sign_z < 0 and vessel.position(reference_frame)[2] < -.1
or sign_z == 0):
print(vessel.position(reference_frame))
sleep(.1)
kill_rcs_velocity(vessel, reference_frame)
print("Vertical alignment done!")
def dock_ship(conn, vessel, docking_part, reference_frame):
conn.drawing.add_direction((0, 1, 0), reference_frame)
conn.drawing.add_direction((1, 0, 0), reference_frame)
vessel.parts.controlling = docking_part
kill_relative_velocity(conn, vessel, reference_frame)
set_attitude_and_roll(conn, vessel, reference_frame)
align_horizontally(conn, vessel, reference_frame)
print("Starting docking procedure")
vessel.control.set_action_group(0, True)
rcs_push(vessel, {"y": 1}, .5)
vessel.control.rcs = True
try:
while vessel.position(reference_frame)[1] > 0:
print(vessel.position(reference_frame)[1])
correct_course(conn, vessel, (0, 0, 0), reference_frame)
sleep(1)
except ValueError as e:
vessel = conn.space_center.active_vessel
finally:
vessel.control.rcs = False
return True

36
maneuvers/grapple.py Normal file
View File

@@ -0,0 +1,36 @@
from time import sleep
from . import Maneuver
from .utils import magnitude, correct_course_to_target, rcs_push, point_toward_target
class GrappleManeuver(Maneuver):
def __init__(self, conn, mission_control, target):
super().__init__(conn, mission_control)
self.target = target
def start(self):
vessel = self.mission_control.vessel
vessel.parts.controlling = self.mission_control.get_grappling()
point_toward_target(self.conn, vessel, self.target)
self.mission_control.toggle_grappling(True)
rcs_push(vessel, {"y": 1}, .75)
vessel.control.rcs = True
try:
while magnitude(vessel.position(self.target.reference_frame)) > 0:
print(magnitude(vessel.position(self.target.reference_frame)))
sa = self.conn.mech_jeb.smart_ass
sa.autopilot_mode = self.conn.mech_jeb.SmartASSAutopilotMode.relative_plus
sa.update(False)
correct_course_to_target(vessel, self.target)
except ValueError:
vessel = self.conn.space_center.active_vessel
self.mission_control.vessel = vessel
finally:
vessel.control.rcs = False
return True

View File

@@ -1,103 +1,87 @@
import numpy as np
from utils import execute_node, magnitude
from .utils import magnitude
from maneuver_scheduler import ManeuverScheduler
from . import MechJebManeuver
def align_orbit_planes(conn):
print("Aligning planes")
mj = conn.mech_jeb
mp = mj.maneuver_planner
oi = mp.operation_inclination
oi.time_selector.time_reference = mj.TimeReference.eq_nearest_ad
nodes = oi.make_nodes()
class RendezvousManeuver(MechJebManeuver):
def start(self):
self.plan_next_maneuver()
# kac = conn.kerbal_alarm_clock
# kac.create_alarm(
# kac.AlarmType.maneuver,
# "{}'s Orbital transfer".format(v.name),
# nodes[0].ut
# )
def plan_next_maneuver(self):
sc = self.conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
execute_node(conn)
if vessel.orbit.distance_at_closest_approach(target.orbit) > 1000:
if vessel.orbit.relative_inclination(target.orbit) > 0.0001:
AlignOrbitPlaneWithTarget(self.conn, self.mission_control).prepare_maneuver()
print("Planes aligned!")
elif vessel.orbit.distance_at_closest_approach(target.orbit) > 10000:
InterceptTargetOrbit(self.conn, self.mission_control).prepare_maneuver()
else:
TuneClosestApproach(self.conn, self.mission_control).prepare_maneuver()
return False
elif vessel.orbit.distance_at_closest_approach(target.orbit) <= 1000 < magnitude(
np.array(vessel.position(vessel.reference_frame)) - np.array(target.position(vessel.reference_frame))):
MatchVelocityWithTarget(self.conn, self.mission_control).prepare_maneuver()
return False
else:
return True
def intercepting_target_orbit(conn):
print("Intercepting target orbit")
sc = conn.space_center
v = sc.active_vessel
mj = conn.mech_jeb
mp = mj.maneuver_planner
ot = mp.operation_transfer
ot.time_selector.time_reference = mj.TimeReference.computed
nodes = ot.make_nodes()
nodes[0].ut = nodes[0].ut + 0.1
class AlignOrbitPlaneWithTarget(RendezvousManeuver):
name = "Align orbit plane with target's"
# kac = conn.kerbal_alarm_clock
# kac.create_alarm(
# kac.AlarmType.maneuver,
# "{}'s Orbital transfer".format(v.name),
# nodes[0].ut
# )
def prepare_maneuver(self):
oi = self.maneuver_planner.operation_inclination
oi.time_selector.time_reference = self.mech_jeb.TimeReference.eq_nearest_ad
nodes = oi.make_nodes()
execute_node(conn)
node = nodes[0]
print("Target orbit intercepted!")
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self)
def tune_closest_approach(conn):
print("Tuning closest approach")
sc = conn.space_center
v = sc.active_vessel
mj = conn.mech_jeb
mp = mj.maneuver_planner
occ = mp.operation_course_correction
nodes = occ.make_nodes()
class InterceptTargetOrbit(RendezvousManeuver):
name = "Intercept target's orbit"
# kac = conn.kerbal_alarm_clock
# kac.create_alarm(
# kac.AlarmType.maneuver,
# "{}'s Orbital transfer".format(v.name),
# nodes[0].ut
# )
def prepare_maneuver(self):
ot = self.maneuver_planner.operation_transfer
ot.time_selector.time_reference = self.mech_jeb.TimeReference.computed
nodes = ot.make_nodes()
execute_node(conn)
node = nodes[0]
node.ut = node.ut + 1
print("Closest approach tuned!")
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self)
def match_velocities(conn):
print("Matching velocities")
sc = conn.space_center
v = sc.active_vessel
mj = conn.mech_jeb
mp = mj.maneuver_planner
okrv = mp.operation_kill_rel_vel
nodes = okrv.make_nodes()
class TuneClosestApproach(RendezvousManeuver):
name = "Tune closest approach with target"
# kac = conn.kerbal_alarm_clock
# kac.create_alarm(
# kac.AlarmType.maneuver,
# "{}'s Orbital transfer".format(v.name),
# nodes[0].ut
# )
def prepare_maneuver(self):
occ = self.maneuver_planner.operation_course_correction
nodes = occ.make_nodes()
execute_node(conn)
node = nodes[0]
print("Velocities matched!")
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self)
def maneuver_to_rendezvous(conn, vessel, target):
if vessel.orbit.distance_at_closest_approach(target.orbit) > 1000:
if vessel.orbit.relative_inclination(target.orbit) > 0.0001:
align_orbit_planes(conn)
class MatchVelocityWithTarget(RendezvousManeuver):
name = "Math velocity with target's"
duration = 10 * 60
if vessel.orbit.distance_at_closest_approach(target.orbit) > 10000:
intercepting_target_orbit(conn)
def prepare_maneuver(self):
okrv = self.maneuver_planner.operation_kill_rel_vel
nodes = okrv.make_nodes()
tune_closest_approach(conn)
node = nodes[0]
if vessel.orbit.distance_at_closest_approach(target.orbit) <= 1000 < magnitude(
np.array(vessel.position(vessel.reference_frame)) - np.array(target.position(vessel.reference_frame))):
match_velocities(conn)
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self, duration=self.duration)

View File

@@ -1,23 +1,26 @@
from time import time, sleep
import math
import numpy as np
from krpc.services.spacecenter import SASMode
def execute_node(conn):
ne = conn.mech_jeb.node_executor
ne.execute_all_nodes()
from connector import get_connexion
with conn.stream(getattr, ne, "enabled") as enabled:
enabled.rate = 1
with enabled.condition:
while enabled():
enabled.wait()
def magnitude(vector):
return np.linalg.norm(vector)
def node_thrust_time(vessel, node):
return (node.delta_v * vessel.mass) / vessel.available_thrust
def unitary(vector):
return np.array(vector) / magnitude(vector)
THROTTLE = .1
@@ -66,3 +69,208 @@ def correct_course(conn, vessel, waypoint, reference_frame):
vessel.control.up = -.1
else:
vessel.control.up = 0
def correct_course_to_target(vessel, target):
target_position = target.position(vessel.reference_frame)
angle_x = math.atan(target_position[0]/target_position[1])
if math.isclose(angle_x, 0, abs_tol=0.05):
vessel.control.right = 0
elif angle_x < 0:
vessel.control.right = -.1
elif angle_x > 0:
vessel.control.right = .1
angle_z = math.atan(target_position[2]/target_position[1])
if math.isclose(angle_z, 0, abs_tol=0.05):
vessel.control.up = 0
elif angle_z < 0:
vessel.control.up = .1
elif angle_z > 0:
vessel.control.up = -.1
def kill_relative_velocity_rcs(vessel, target):
print("Killing RCS velocity")
vessel.control.sas = True
vessel.control.rcs = True
velocity = target.velocity(vessel.reference_frame)
while any(abs(component) >= .1 for component in velocity):
thrust = get_required_rcs_thrust(vessel, velocity)
vessel.control.right = thrust[0] if abs(velocity[0]) >= .1 else 0
vessel.control.forward = thrust[1] if abs(velocity[1]) >= .1 else 0
vessel.control.up = - thrust[2] if abs(velocity[2]) >= .1 else 0
print(target.velocity(vessel.reference_frame))
print((thrust[0], thrust[1], thrust[2]))
print((vessel.control.right, vessel.control.forward, vessel.control.up))
sleep(.1)
velocity = target.velocity(vessel.reference_frame)
continue
if abs(velocity[0]) > .05:
sign = velocity[0] / abs(velocity[0])
if abs(velocity[0]) > 1:
vessel.control.right = 1 * sign
elif abs(velocity[0]) > .1:
vessel.control.right = .1 * sign
else:
vessel.control.right = 0
if abs(velocity[1]) > .05:
sign = velocity[1] / abs(velocity[1])
if abs(velocity[1]) > 1:
vessel.control.forward = 1 * sign
elif abs(velocity[1]) > .1:
vessel.control.forward = .1 * sign
else:
vessel.control.forward = 0
if abs(velocity[2]) > .05:
sign = - velocity[2] / abs(velocity[2])
if abs(velocity[2]) > 1:
vessel.control.up = 1 * sign
elif abs(velocity[2]) > .1:
vessel.control.up = .1 * sign
else:
vessel.control.up = 0
sleep(.1)
velocity = target.velocity(vessel.reference_frame)
vessel.control.rcs = False
vessel.control.sas = False
print("RCS velocity killed")
def get_required_rcs_thrust(vessel, delta_v, polling=.1):
acceleration = np.array(vessel.available_rcs_force) / vessel.mass
thrust = [0, 0, 0]
for i in range(3):
if delta_v[i] >= 0:
thrust[i] = max(min(delta_v[i] / acceleration[0][i]*polling, 1), .051)
else:
thrust[i] = min(max(-delta_v[i] / acceleration[1][i]*polling, -1), -.051)
return thrust
def get_safety_radius(vessel):
bbox = vessel.bounding_box(vessel.reference_frame)
return max(magnitude(bbox[0]), magnitude(bbox[1]))
def point_toward_direction(vessel, direction, reference_frame):
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = unitary(direction)
ap.target_roll = 0
ap.rcs = False
ap.sas = False
ap.engage()
sleep(1)
ap.wait()
ap.disengage()
ap.sas_mode = SASMode.stability_assist
ap.sas = True
def point_toward_target(conn, vessel, target, force_roll=False):
sa = conn.mech_jeb.smart_ass
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_plus
sa.force_roll = force_roll
sa.update(False)
while magnitude(vessel.angular_velocity(target.reference_frame)) > .002:
sleep(.1)
def rcs_push(vessel, axis, duration):
vessel.control.rcs = True
if "x" in axis:
vessel.control.up = axis["x"]
elif "y" in axis:
vessel.control.forward = axis["y"]
elif "z" in axis:
vessel.control.right = axis["z"]
sleep(duration)
if "x" in axis:
vessel.control.up = 0
elif "y" in axis:
vessel.control.forward = 0
elif "z" in axis:
vessel.control.right = 0
vessel.control.rcs = False
def set_attitude_and_roll(conn, vessel, reference_frame):
vessel.control.rcs = False
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = (0, -1, 0)
ap.target_roll = 0
ap.sas = False
ap.engage()
ap.wait()
ap.disengage()
mj = conn.mech_jeb
sa = mj.smart_ass
sa.autopilot_mode = mj.SmartASSAutopilotMode.target_plus
sa.update(False)
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
print("Ship pointing to dock")
def align_horizontally(conn, vessel, reference_frame):
conn.drawing.add_direction((1, 0, 0), vessel.reference_frame)
target = conn.space_center.target_vessel
while abs(vessel.position(reference_frame)[0]) > .1 \
or abs(vessel.position(reference_frame)[2]) > .1:
# determine power requirements of each
sign_x = 1 if vessel.position(reference_frame)[0] > 0 else -1
if abs(vessel.position(reference_frame)[0]) > 1:
power_x = 1
elif abs(vessel.position(reference_frame)[0]) > .1:
power_x = .1
else:
power_x = 0
sign_x = 0
sign_z = 1 if vessel.position(reference_frame)[2] > 0 else -1
if abs(vessel.position(reference_frame)[2]) > 1:
power_z = 1
elif abs(vessel.position(reference_frame)[2]) > .1:
power_z = .1
else:
power_z = 0
sign_z = 0
axis = {}
if power_x > 0:
axis["x"] = -sign_x * power_x
if power_z > 0:
axis["z"] = sign_z * power_z
rcs_push(vessel, axis, 1)
while (sign_x > 0 and vessel.position(reference_frame)[0] > .1
or sign_x < 0 and vessel.position(reference_frame)[0] < -.1
or sign_x == 0) \
and (sign_z > 0 and vessel.position(reference_frame)[2] > .1
or sign_z < 0 and vessel.position(reference_frame)[2] < -.1
or sign_z == 0):
print(vessel.position(reference_frame))
sleep(.1)
kill_relative_velocity_rcs(vessel, target)
print("Vertical alignment done!")

49
map.py Normal file
View File

@@ -0,0 +1,49 @@
import rustworkx as rx
graph_dict = {
'kerbol': {
'moho': {},
'eve': {'gilly': {}},
'kerbin': {'mun': {}, 'minmus': {}},
'duna': {'ike': {}},
'dres': {},
'jool': {'laythe': {}, 'vall': {}, 'tylo': {}, 'bop': {}, 'pol': {}},
'eeloo': {},
}
}
def create_graph_node(graph_map, name, graph_dict):
ground_name = "{}_ground".format(name)
ground = graph_map.add_node(ground_name)
orbit_name = "{}_orbit".format(name)
orbit = graph_map.add_node(orbit_name)
graph_map.add_edge(orbit, ground, 1)
for body_name, body_dict in graph_dict.items():
child_orbit = create_graph_node(graph_map, body_name, body_dict)
graph_map.add_edge(orbit, child_orbit, 1)
return orbit
class Map:
instance = None
@classmethod
def get_map(cls):
if cls.instance is None:
cls.instance = Map()
return cls.instance
def __init__(self):
self.graph = rx.PyGraph()
self.index_dict = {}
create_graph_node(self.graph, 'kerbol', graph_dict['kerbol'])
self.reverse_index = self.graph.nodes()
self.index_dict = {value: key for key, value in enumerate(self.reverse_index)}
def get_shortest_path(self, body_a, body_b):
result = rx.dijkstra_shortest_paths(self.graph, self.index_dict[body_a], self.index_dict[body_b])
return [self.reverse_index[body_index] for body_index in result[self.index_dict[body_b]]]

120
mission/__init__.py Normal file
View File

@@ -0,0 +1,120 @@
from time import sleep
from enum import Enum
from maneuvers.grapple import GrappleManeuver
from maneuvers.rendezvous import RendezvousManeuver
from maneuvers.approach_rcs import ApproachRCSManeuver
from maneuvers.docking import DockingManeuver
from maneuvers.comsat import ComsatManeuver
from lib import get_contract, get_rescuee_vessel, get_body, get_connexion
class MissionType(Enum):
transport_kerbal = 1
orbit_rescue = 2
comsat = 3
transport_cargo = 4
class MissionStatus(Enum):
Backlog = 1
Assigned = 2
InProgress = 3
Done = 4
class MissionReport:
mission_status = None
new_vessel_id = None
new_missions = []
def __int__(self):
pass
class Mission:
pass
class TransportMission(Mission):
type = MissionType.transport_kerbal
def __init__(self, kerbal_name, destination):
self.kerbal_name = kerbal_name
self.destination = destination
class RescueMission(Mission):
type = MissionType.orbit_rescue
def __init__(self, contract_title):
self.contract_title = contract_title
self.contract = get_contract(contract_title)
rescuee_first_name = contract_title.split()[1]
self.rescuee_vessel = get_rescuee_vessel(rescuee_first_name)
self.rescuee_vessel_name = self.rescuee_vessel.name
self.rescuee_name = self.rescuee_vessel.crew[0].name
def execute(self, mission_control):
conn = get_connexion()
sc = conn.space_center
if sc.active_vessel.name != mission_control.vessel.name:
sc.active_vessel = mission_control.vessel
if sc.target_vessel is None or sc.target_vessel.name != self.rescuee_vessel.name:
sc.target_vessel = self.rescuee_vessel
rendezvous_done = RendezvousManeuver(conn, mission_control).execute()
if rendezvous_done:
sc.active_vessel = mission_control.deliver_probe(self.rescuee_vessel)
sc.target_vessel = self.rescuee_vessel
sc.active_vessel.parts.controlling = mission_control.get_grappling()
target_grapple_done = GrappleManeuver(conn, mission_control, sc.target_vessel).start()
if target_grapple_done:
mothership_docking_done = DockingManeuver(conn,
mission_control,
mission_control.get_probe_port(),
mission_control.get_bay_port()).start()
if mothership_docking_done:
mission_control.probe = None
kerbal = mission_control.get_kerbal(self.rescuee_name)
sc.transfer_crew(kerbal, mission_control.get_part_with_free_seat())
mission_control.toggle_grappling(False)
mission_control.recover_probe()
# Destroy Capsule
report = MissionReport()
report.mission_status = MissionStatus.Done
report.new_missions.append(
TransportMission(self.rescuee_name, 'kerbin_orbit')
)
return report
class ComSatNetworkMission:
type = MissionType.comsat
def __init__(self, body_name):
self.body = get_body(body_name)
def execute(self, mission_control):
conn = get_connexion()
sc = conn.space_center
if sc.active_vessel.name != mission_control.vessel.name:
sc.active_vessel = mission_control.vessel
mission_control.toggle_engines(True)
comsat_done = ComsatManeuver(conn, mission_control, self.body).execute()
if comsat_done:
report = MissionReport()
report.mission_status = MissionStatus.Done
return report

121
mission/station_build.py Normal file
View File

@@ -0,0 +1,121 @@
from maneuvers.docking import DockingManeuver
from . import Mission
class StationBuildMission(Mission):
def __init__(self, conn):
self.conn = conn
def execute(self, mission_control):
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.solar_panel_port())
separate_solar_panels()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Stern Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_tank_holder_port())
separate_tank_holder()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Bow Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_dock_left_port())
separate_left_dock()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Port Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Dock Left 1 Nadir Port"))
# Ship part package to LKO
# if destination not Kerbin:
# Transfer part package to Right orbit
# if station core not in package:
# Rendez-vous with station (if station core not in package)
# separate package
# for each part:
# Determine connection ports and angle
# Start docking
class Station:
pass
class StationPart:
joints = []
ports = []
class StationCore(StationPart):
joints = [
'Station Core Bow Joint',
'Station Core Starboard Joint',
'Station Core Stern Joint',
'Station Core Port Joint'
]
class StationSolarPanels(StationPart):
def __init__(self, number):
self.joints = [
f'Station Solar Panels {number} Bow Joint',
f'Station Solar Panels {number} Port Joint',
]
class StationTankHolder(StationPart):
joints = [
'Station Tank Holder Stern Joint',
]
ports = [
'Station Tank Holder Fuel Port 1',
'Station Tank Holder Fuel Port 2',
'Station Tank Holder Oxygen Port 1',
'Station Tank Holder Oxygen Port 2',
'Station Tank Holder RCS Port 1',
'Station Tank Holder RCS Port 2',
]
class StationTank(StationPart):
def __init__(self, resource, number):
self.ports = [
f'Station Tank {resource} {number} Bow Port'
f'Station Tank {resource} {number} Stern Port'
]
class StationDockLeft(StationPart):
def __init__(self, number):
self.joints = [
f'Station Dock Left {number} Starboard Joint',
f'Station Dock Left {number} Port Joint',
]
self.ports = [
f'Station Dock Left {number} Zenith Port',
f'Station Dock Left {number} Bow Port',
f'Station Dock Left {number} Nadir Port'
]
class StationDockRight(StationPart):
def __init__(self, number):
self.joints = [
f'Station Dock Right {number} Starboard Joint',
f'Station Dock Right {number} Port Joint',
]
self.ports = [
f'Station Dock Right {number} Zenith Port',
f'Station Dock Right {number} Bow Port',
f'Station Dock Right {number} Nadir Port'
]
class StationResourceConverter(StationPart):
def __int__(self, number):
self.joints = [
f'Station Resource Converter {number} Bow Joint'
]
self.ports = [
f'Station Resource Converter {number} Stern Port'
]

198
mission_control/__init__.py Normal file
View File

@@ -0,0 +1,198 @@
from time import sleep
import numpy as np
from krpc.services.spacecenter import CargoBayState, DockingPortState
from maneuvers.utils import point_toward_direction, point_toward_target
from maneuver_scheduler import ManeuverScheduler
from mission import MissionStatus, MissionType
from lib import get_connexion, get_vessel
class MissionControl:
mission_types = []
vessel = None
def __init__(self):
self.current_mission = None
self.current_orbit_missions = []
def pick_missions(self, backlog):
current_body_name = self.vessel.orbit.body.name
for i, mission in enumerate(backlog.missions[current_body_name]):
if mission.type in self.mission_types:
if self.current_mission is None:
self.plan_mission(mission)
else:
self.current_orbit_missions.append(mission)
del (backlog.missions[current_body_name][i])
def plan_mission(self, mission):
self.current_mission = mission
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(
from_ut=get_connexion().space_center.ut + planning_duration,
duration=planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
def execute_mission(self, alarm):
if self.current_mission is None:
if self.current_orbit_missions:
self.current_mission = self.current_orbit_missions.pop(0)
else:
return
report = self.current_mission.execute(self)
if report and report.mission_status == MissionStatus.Done:
self.current_mission = None
def get_kerbal(self, name):
for k in self.vessel.crew:
if k.name == name:
return k
raise LookupError('Kerbal {} not found onboard vessel {}'.format(name, self.vessel.name))
def get_docking_port(self, name):
for m in self.vessel.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.vessel.name))
class ShuttleKerbin(MissionControl):
mission_types = [MissionType.orbit_rescue, MissionType.transport]
PISTON_MAX_EXTENSION = 2.4
def __init__(self, vessel_name):
super().__init__()
self.vessel_name = vessel_name
self.mothership = get_vessel(vessel_name)
self.probe = None
for e in self.vessel.parts.engines:
e.active = True
self.current_orbit_missions = []
self.first_cabin = self.mothership.parts.with_name('mk2Cockpit.Inline')[0]
self.second_cabin = self.mothership.parts.with_name('mk2CrewCabin')[0]
self.third_cabin = self.mothership.parts.with_name('mk2CrewCabin')[1]
self.docking_ports = self.mothership.parts.docking_ports
@property
def vessel(self):
if self.probe is None:
return self.mothership
else:
return self.probe
@vessel.setter
def vessel(self, value):
if self.probe is None:
self.mothership = value
else:
self.probe = value
def toggle_grappling(self, value=True):
arm = self.get_grappling()
if arm.modules[1].has_event('Arm') and value \
or arm.modules[1].has_event('Disarm') and not value:
arm.modules[1].set_action_by_id('ToggleAction', True)
def get_grappling(self):
if self.probe is None:
return self.vessel.parts.with_name('GrapplingDevice')[0]
return self.probe.parts.with_name('GrapplingDevice')[0]
def get_probe_port(self):
return self.get_docking_port('Tug Probe Port')
def get_bay_port(self):
if self.probe is not None:
return self.get_mothership_docking_port('Shuttle Bay Port')
return self.get_docking_port('Shuttle Bay Port')
def get_mothership_docking_port(self, name):
for m in self.mothership.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
def get_probe_docking_port(self, name):
for m in self.probe.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
def deliver_probe(self, target=None):
bay = self.mothership.parts.cargo_bays[0]
bay.open = True
with get_connexion().stream(getattr, bay, 'state') as state:
with state.condition:
while not state() == CargoBayState.open:
state.wait()
hinge = self.mothership.parts.robotic_hinges[0]
hinge.target_angle = 90
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
with current_angle.condition:
while current_angle() < hinge.target_angle - 1:
current_angle.wait()
if target is not None:
self.vessel.parts.controlling = self.get_bay_port().part
point_toward_target(get_connexion(), self.vessel, target)
bay_port = self.get_bay_port()
self.probe = bay_port.undock()
return self.probe
def recover_probe(self):
bay_port = self.get_bay_port()
if bay_port.state != DockingPortState.docked:
raise Exception("Probe is not parked in shuttle")
self.probe = None
hinge = self.mothership.parts.robotic_hinges[0]
hinge.target_angle = 0
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
with current_angle.condition:
while current_angle() > hinge.target_angle:
current_angle.wait()
bay = self.mothership.parts.cargo_bays[0]
bay.open = False
def get_part_with_free_seat(self):
crew_size = self.mothership.crew_count
if self.mothership.crew_capacity > 10:
crew_size = crew_size - 1
if crew_size <= 2:
return self.first_cabin
elif crew_size <= 6:
return self.second_cabin
elif crew_size <= 10:
return self.third_cabin
else:
raise Exception('Vessel already has no free sit')
class ComsatKerbin(MissionControl):
mission_types = [MissionType.comsat]
def __init__(self, vessel_name):
super().__init__()
self.vessel_name = vessel_name
self.vessel = get_vessel(vessel_name)
def toggle_engines(self, value=True):
self.vessel.parts.engines[0].active = value