Compare commits

..

1 Commits

Author SHA1 Message Date
0c95326a94 Implementing comsat missions 2023-08-29 17:22:16 +02:00
16 changed files with 337 additions and 924 deletions

1
.gitignore vendored
View File

@@ -1 +0,0 @@
.idea

View File

@@ -1,44 +1,15 @@
from time import sleep
from simple_pid import PID
import numpy as np
from simple_pid import PID
import krpc
from maneuvers.utils import get_required_rcs_thrust
conn = krpc.connect()
sc = conn.space_center
vessel = sc.active_vessel
docking_part = vessel.parts.root.children[0].children[10].children[0].children[0].children[0].children[0]
target = sc.target_vessel
target.velocity(vessel.reference_frame)
conn.drawing.add_direction(target.velocity(vessel.reference_frame), vessel.reference_frame)
get_required_rcs_thrust(vessel, target.velocity(vessel.reference_frame))
from maneuvers.utils import magnitude
kill_relative_velocity_rcs(vessel, target)
mj = conn.mech_jeb
sa = mj.smart_ass
reference_frame = sc.ReferenceFrame.create_relative(target.reference_frame, rotation=(1., 0., 0., 0.))
conn.drawing.add_direction((0, 1, 0), reference_frame)
from time import sleep
import krpc
from maneuvers.utils import magnitude
conn = krpc.connect()
sc = conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
mj = conn.mech_jeb
sa = mj.smart_ass
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_plus
sa.update(False)
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_minus
sa.update(False)
while True:
print(magnitude(vessel.angular_velocity(target.reference_frame)))
sleep(.1)
from mission_control import ShuttleKerbin
s = ShuttleKerbin("KKS Gagarin")
s.recover_probe()

10
lib.py
View File

@@ -22,12 +22,4 @@ def get_rescuee_vessel(rescuee_name):
if rescuee_name in v.name:
return v
raise LookupError("Rescuee {} vessel not found".format(rescuee_name))
def get_body(name):
bodies = get_connexion().space_center.bodies
if name in bodies:
return bodies[name]
raise LookupError("Celestial body {} not found".format(name))
raise LookupError("Rescuee {} vessel not found".format(rescuee_name))

118
main.py
View File

@@ -1,18 +1,117 @@
import sys
import signal
from mission_control import ShuttleKerbin, ComsatKerbin
from mission import RescueMission, ComSatNetworkMission
from enum import Enum
from maneuver_scheduler import ManeuverScheduler
from maneuvers.rendezvous import RendezvousManeuver
from maneuvers.approach import ApproachManeuver
from maneuvers.docking import DockingManeuver
from connector import get_connexion
from lib import get_contract, get_vessel, get_rescuee_vessel
class MissionStatus(Enum):
Backlog = 1
Assigned = 2
InProgress = 3
Done = 4
class MissionReport:
mission_status = None
new_vessel_id = None
new_missions = []
def __int__(self):
pass
class Mission:
pass
class TransportMission(Mission):
def __init__(self, kerbal_name, destination):
self.kerbal_name = kerbal_name
self.destination = destination
class RescueMission(Mission):
def __init__(self, contract_title):
self.contract_title = contract_title
self.contract = get_contract(contract_title)
rescuee_first_name = contract_title.split()[1]
self.rescuee_vessel = get_rescuee_vessel(rescuee_first_name)
self.rescuee_vessel_name = self.rescuee_vessel.name
self.rescuee_name = self.rescuee_vessel.crew[0].name
def execute(self, vessel):
conn = get_connexion()
sc = conn.space_center
if sc.active_vessel.name != vessel.name:
sc.active_vessel = vessel
if sc.target_vessel is None or sc.target_vessel.name != self.rescuee_vessel.name:
sc.target_vessel = self.rescuee_vessel
rendezvous_done = RendezvousManeuver(conn, vessel).execute()
if rendezvous_done:
reference_frame = sc.ReferenceFrame.create_relative(
self.rescuee_vessel.reference_frame,
rotation=(1., 0., 0., 0.)
)
approach_done = ApproachManeuver(conn, vessel, reference_frame).start()
if approach_done:
docking_part = vessel.parts.root.children[0].children[10].children[0].children[0].children[0].children[
0]
docking_done = DockingManeuver(conn, vessel, docking_part, reference_frame).start()
# Find new creat member name
# sc.transfer_crew(self.rescuee_name, target_part)
# Release Grapple
# Destroy Capsule
MissionReport.mission_status = MissionStatus.Done
MissionReport.new_missions.append(
TransportMission(self.rescuee_name, 'kerbin_orbit')
)
class ShuttleKerbin:
def __init__(self, vessel_name):
self.vessel_name = vessel_name
self.vessel = get_vessel(vessel_name)
self.current_orbit_missions = []
self.current_mission = None
def plan_mission(self, mission):
self.current_mission = mission
planning_duration = (5*60)
ut = ManeuverScheduler.next_free_timeslot(duration=planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel)
def pick_missions(self, backlog):
for i, mission in enumerate(backlog.kerbin_orbit_rescue):
if self.current_mission is None:
self.plan_mission(mission)
else:
self.current_orbit_missions.append(mission)
del(backlog.kerbin_orbit_rescue[i])
def execute_mission(self, alarm):
if self.current_mission is None:
if self.current_orbit_missions:
self.current_mission = self.current_orbit_missions.pop(0)
else:
return
self.current_mission.execute(self.vessel)
class Backlog:
missions = {
'Kerbin': [RescueMission('Rescue Rossby from orbit of Kerbin.'), ] # ComSatNetworkMission('Kerbin')]
}
kerbin_orbit_rescue = []
kerbin_to_orbit = []
kerbin_to_ground = []
@@ -26,8 +125,10 @@ if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
ships = []
m = RescueMission('Rescue Rossby from orbit of Kerbin.')
Backlog.kerbin_orbit_rescue.append(m)
ships.append(ShuttleKerbin('KKS Gagarin'))
ships.append(ComsatKerbin('KKR Shepard'))
conn = get_connexion()
# conn.space_center.GameMode
@@ -60,3 +161,6 @@ if __name__ == '__main__':
current_ship.execute_mission(alarm)
alarm.remove()
print("turn")

View File

@@ -2,6 +2,8 @@ from connector import get_connexion
import json
import math
from maneuvers import ManeuverAlarmType
class Timeslot:
def __init__(self, ut_start, duration):
@@ -33,21 +35,30 @@ class ManeuverScheduler:
return sorted(cls.alarm_manager.alarms, key=lambda el: el.time)
@classmethod
def book_timeslot_for_node(cls, vessel, node, maneuver, alarm_start=None, duration=None):
def book_timeslot_for_node(cls, vessel, node, maneuver, duration=None):
time_required = (node.delta_v * vessel.mass) / vessel.available_thrust
if duration is None:
duration = math.floor(2 * cls.node_offsets + time_required)
if not cls.timeslot_is_free(node.ut, duration):
raise
description = {
'duration': duration,
'vessel_name': vessel.name
}
if alarm_start is None:
alarm_start = node.ut - (duration / 2 + cls.node_offsets)
if not cls.timeslot_is_free(alarm_start, duration):
raise Exception('Timeslot is occupied')
# arg_dict = {
# 'title': "{}' Maneuver: {}".format(vessel.name, maneuver.name),
# 'description': json.dumps(description),
# 'offset': cls.node_offsets
# }
# cls.alarm_manager.add_maneuver_node_alarm(
# vessel,
# vessel.control.nodes[0],
# **arg_dict)
alarm_start = node.ut - (duration / 2 + cls.node_offsets)
alarm = cls.alarm_manager.create_alarm(
cls.alarm_manager.AlarmType.maneuver,
"{}' Maneuver: {}".format(vessel.name, maneuver.name),
@@ -82,6 +93,7 @@ class ManeuverScheduler:
alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only
@classmethod
def book_timeslot_for_soi(cls, vessel, maneuver, duration=None):
if duration is None:
@@ -95,7 +107,7 @@ class ManeuverScheduler:
if not cls.timeslot_is_free(ut_start, duration):
raise
notes = {
note = {
'duration': duration,
'vessel_name': vessel.name
}
@@ -107,7 +119,7 @@ class ManeuverScheduler:
)
alarm.vessel = vessel
alarm.margin = cls.node_offsets
alarm.notes = json.dumps(notes)
alarm.notes = json.dumps(note)
alarm.action = cls.alarm_manager.AlarmAction.kill_warp_only
@classmethod
@@ -115,9 +127,9 @@ class ManeuverScheduler:
ut_end = ut_start + duration
for a in cls.get_ordered_alarms():
try:
notes = json.loads(a.notes)
note = json.loads(a.note)
alarm_start = a.time
alarm_end = a.time + notes['duration']
alarm_end = a.time + note['duration']
if alarm_end < ut_start:
continue
elif alarm_start <= ut_start <= alarm_end:
@@ -143,14 +155,12 @@ class ManeuverScheduler:
for a in cls.get_ordered_alarms():
try:
notes = json.loads(a.notes)
alarm_end = a.time + int(notes['duration'])
if cls.timeslot_is_free(alarm_end + 1, duration):
return alarm_end + 1
except json.JSONDecodeError:
continue
note = json.loads(a.note)
alarm_end = a.time + int(note['duration'])
if cls.timeslot_is_free(alarm_end, duration):
return alarm_end
raise EOFError('Expected to find a free timeslot at the end alarm list')
raise EOFError('Excepted to find a free timeslot at the end alarm list')
@classmethod
def get_reservation(cls, ut_at) -> Timeslot:

View File

@@ -1,9 +1,5 @@
import math
from enum import Enum
from maneuver_scheduler import ManeuverScheduler
class ManeuverAlarmType(Enum):
ManeuverNode = 1,
@@ -11,9 +7,8 @@ class ManeuverAlarmType(Enum):
class Maneuver:
def __init__(self, conn, mission_control):
self.mission_control = mission_control
self.vessel = mission_control.vessel
def __init__(self, conn, vessel):
self.vessel = vessel
self.conn = conn
def plan_next_maneuver(self):
@@ -23,8 +18,8 @@ class Maneuver:
class NodeManeuver(Maneuver):
alarm_type = ManeuverAlarmType.ManeuverNode
def __init__(self, conn, mission_control):
super().__init__(conn, mission_control)
def __init__(self, conn, vessel):
super().__init__(conn, vessel)
self.mech_jeb = conn.mech_jeb
self.node_executor = self.mech_jeb.node_executor
@@ -47,28 +42,9 @@ class NodeManeuver(Maneuver):
while enabled():
enabled.wait()
def book_timeslot_for_node(self, node, maneuver, duration=None):
if node.time_to < 0:
node.remove()
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(self.conn.space_center.ut + planning_duration, planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
time_required = (node.delta_v * self.vessel.mass) / self.vessel.available_thrust
duration = math.floor(2 * ManeuverScheduler.node_offsets + time_required)
alarm_start = node.ut - (duration / 2 + ManeuverScheduler.node_offsets)
if ManeuverScheduler.timeslot_is_free(alarm_start, duration):
ManeuverScheduler.book_timeslot_for_node(self.vessel, node, self, alarm_start=alarm_start, duration=duration)
else:
node.remove()
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(alarm_start, planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
class MechJebManeuver(NodeManeuver):
def __init__(self, conn, mission_control):
super().__init__(conn, mission_control)
def __init__(self, conn, vessel):
super().__init__(conn, vessel)
self.maneuver_planner = self.mech_jeb.maneuver_planner

View File

@@ -1,15 +1,17 @@
from krpc.services.spacecenter import SASMode
import numpy as np
from time import time, sleep
from .utils import magnitude, unitary, kill_relative_velocity, correct_course, get_safety_radius, point_toward_direction
from .utils import magnitude, unitary, kill_relative_velocity, correct_course
from . import Maneuver
class ApproachManeuver(Maneuver):
def __init__(self, conn, mission_control, reference_frame):
super().__init__(conn, mission_control)
def __init__(self, conn, vessel_id, reference_frame):
super().__init__(conn, vessel_id)
self.reference_frame = reference_frame
def start(self):
@@ -50,6 +52,28 @@ class ApproachManeuver(Maneuver):
return True
def get_safety_radius(vessel):
bbox = vessel.bounding_box(vessel.reference_frame)
return max(magnitude(bbox[0]), magnitude(bbox[1]))
def point_toward_direction(vessel, direction, reference_frame):
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = unitary(direction)
ap.target_roll = 0
ap.sas = False
ap.engage()
sleep(.1)
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
ap.disengage()
ap.sas_mode = SASMode.stability_assist
ap.sas = True
THROTTLE = .1
VELOCITY_TOLERANCE = .1

View File

@@ -1,101 +0,0 @@
import numpy as np
from time import time, sleep
from .utils import magnitude, unitary, kill_relative_velocity_rcs, correct_course, get_safety_radius,\
point_toward_direction
from . import Maneuver
class ApproachRCSManeuver(Maneuver):
SAFETY_RADIUS_MARGIN = 10
def __init__(self, conn, mission_control, reference_frame):
super().__init__(conn, mission_control)
self.reference_frame = reference_frame
def start(self):
sc = self.conn.space_center
vessel = sc.active_vessel
target = sc.target_vessel
kill_relative_velocity_rcs(vessel, target)
self.conn.drawing.add_direction((0, 1, 0), self.reference_frame)
vessel.control.rcs = False
pv = vessel.position(self.reference_frame)
safety_radius = get_safety_radius(vessel) + get_safety_radius(target) + self.SAFETY_RADIUS_MARGIN
# if under and inside safety cylinder's circle
if pv[1] < safety_radius and pow(pv[0], 2) + pow(pv[2], 2) <= pow(safety_radius, 2):
print("We're under the target and inside the safety cylinder, getting out")
# get out of the cylinder
planar_move_vector = unitary((pv[0], pv[2])) * (safety_radius - magnitude((pv[0], pv[2])))
spacial_move_vector = np.array((planar_move_vector[0], 0, planar_move_vector[1]))
pv = vessel.position(self.reference_frame)
move_to_waypoint(self.conn, vessel, pv + spacial_move_vector, self.reference_frame)
print("We're outside of the safety cylinder, setting vertical distance")
pv = vessel.position(self.reference_frame)
move_to_waypoint(self.conn, vessel, (pv[0], safety_radius, pv[2]), self.reference_frame)
# should be above and outside => get inside
print("We're at the right vertical distance to the target, setting horizontal position")
move_to_waypoint(self.conn, vessel, (0, safety_radius, 0), self.reference_frame)
# point_toward_direction(vessel, - np.array(vessel.position(self.reference_frame)), self.reference_frame)
point_toward_direction(vessel, (0, -1, 0), self.reference_frame)
return True
def move_to_waypoint(conn, vessel, waypoint, reference_frame):
target = conn.space_center.target_vessel
kill_relative_velocity_rcs(vessel, target)
conn.drawing.add_line(vessel.position(reference_frame), waypoint, reference_frame)
waypoint = np.array(waypoint)
start_position = np.array(vessel.position(reference_frame))
vector = waypoint - start_position
distance = magnitude(vector)
direction = unitary(vector)
acceleration_distance = distance / 4
point_toward_direction(vessel, direction, reference_frame)
print("Starting acceleration")
remaining_distance = distance
vessel.control.rcs = True
vessel.control.forward = 1
while remaining_distance > 3 * acceleration_distance:
sleep(.1)
remaining_distance = magnitude(waypoint - vessel.position(reference_frame))
vessel.control.forward = 0
print("Target velocity achieved")
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
while remaining_distance > acceleration_distance:
sleep(.1)
correct_course(conn, vessel, waypoint, reference_frame)
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
print(remaining_distance)
print("Starting deceleration")
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
vessel.control.forward = -1
while remaining_distance > 0:
sleep(.1)
remaining_distance = distance - magnitude(start_position - vessel.position(reference_frame))
vessel.control.forward = 0
kill_relative_velocity_rcs(vessel, target)
print("Ship decelerated")
print("destination position: {}".format(waypoint))
print("end position: {}".format(np.array(vessel.position(reference_frame))))

View File

@@ -6,14 +6,14 @@ from . import MechJebManeuver
class ComsatManeuver(MechJebManeuver):
def __init__(self, conn, mission_control, target_body):
super().__init__(conn, mission_control)
def __init__(self, conn, vessel, target_body):
super().__init__(conn, vessel)
self.target_body = target_body
body = self.target_body
if body.satellites:
lowest_sat = min(body.satellites, key=lambda sat: sat.orbit.periapsis)
max_orbit = lowest_sat.orbit.periapsis_altitude - lowest_sat.sphere_of_influence
max_orbit = lowest_sat.orbit.periapsis - lowest_sat.sphere_of_influence
else:
max_orbit = body.sphere_of_influence
@@ -29,11 +29,11 @@ class ComsatManeuver(MechJebManeuver):
if vessel.orbit.body.name != self.target_body.name:
raise NotImplementedError
if not math.isclose(vessel.orbit.apoapsis_altitude, self.target_altitude, rel_tol=.01):
SetOrbitApoapsis(self.conn, self.mission_control, self.target_body).prepare_maneuver()
elif not math.isclose(vessel.orbit.eccentricity, 0, abs_tol=.001) or self.vessel.control.current_stage > 1:
CircularizeOrbitAndDeliver(self.conn, self.mission_control, self.target_body).prepare_maneuver()
elif self.vessel.control.current_stage <= 1:
if not math.isclose(vessel.orbit.apoapsis, self.target_altitude, rel_tol=.01):
SetOrbitApoapsis(self.conn, vessel, self.target_altitude).prepare_maneuver()
elif not math.isclose(vessel.orbit.eccentricity, 0, rel_tol=.01):
CircularizeOrbitAndDeliver(self.conn, vessel, self.target_altitude).prepare_maneuver()
elif self.vessel.control.current_stage == 0:
return True
return False
@@ -73,13 +73,8 @@ class CircularizeOrbitAndDeliver(ComsatManeuver):
while self.vessel.control.nodes:
self._execute_node()
current_stage = self.vessel.control.current_stage
if current_stage > 1:
relay = self.vessel.control.activate_next_stage()
sc.active_vessel = relay[0]
sc.active_vessel.name = self.vessel.name + " " + current_stage
sc.active_vessel.control.solar_panels = True
sc.active_vessel = self.vessel
if self.vessel.control.current_stage > 0:
self.vessel.control.activate_next_stage()
oro = self.maneuver_planner.operation_resonant_orbit
oro.resonance_numerator = 2

View File

@@ -1,36 +1,165 @@
from time import sleep
from .utils import kill_relative_velocity, kill_relative_velocity_rcs, correct_course, magnitude
from .utils import kill_relative_velocity, correct_course, magnitude
from . import Maneuver
class DockingManeuver(Maneuver):
def __init__(self, conn, mission_control, docking_port, target_docking_port):
super().__init__(conn, mission_control)
self.mech_jeb = conn.mech_jeb
self.docking_port = docking_port
self.target_docking_port = target_docking_port
def __init__(self, conn, vessel_id, docking_part, reference_frame):
super().__init__(conn, vessel_id)
self.docking_part = docking_part
self.reference_frame = reference_frame
def start(self):
vessel = self.conn.space_center.active_vessel
self.conn.drawing.add_direction((0, 1, 0), self.reference_frame)
self.conn.drawing.add_direction((1, 0, 0), self.reference_frame)
vessel.parts.controlling = self.docking_part
self.conn.space_center.active_vessel = self.mission_control.vessel
vessel.parts.controlling = self.docking_port.part
self.conn.space_center.target_docking_port = self.target_docking_port
kill_relative_velocity(self.conn, vessel, self.reference_frame)
set_attitude_and_roll(self.conn, vessel, self.reference_frame)
align_horizontally(self.conn, vessel, self.reference_frame)
da = self.mech_jeb.docking_autopilot
da.speed_limit = 10
da.roll = 0
da.force_roll = True
da.enabled = True
print("Starting docking procedure")
vessel.control.set_action_group(0, True)
rcs_push(vessel, {"y": 1}, .5)
sleep(1)
with self.conn.stream(getattr, da, "enabled") as enabled:
enabled.rate = 1
with enabled.condition:
while enabled():
enabled.wait()
vessel.control.rcs = True
try:
while vessel.position(self.reference_frame)[1] > 0:
print(vessel.position(self.reference_frame)[1])
correct_course(self.conn, vessel, (0, 0, 0), self.reference_frame)
sleep(1)
except ValueError as e:
vessel = self.conn.space_center.active_vessel
finally:
vessel.control.rcs = False
return True
def set_attitude_and_roll(conn, vessel, reference_frame):
fl = vessel.flight(reference_frame)
vessel.control.rcs = False
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = (0, -1, 0)
ap.target_roll = 0
ap.sas = False
ap.engage()
ap.wait()
ap.disengage()
mj = conn.mech_jeb
sa = mj.smart_ass
sa.autopilot_mode = mj.SmartASSAutopilotMode.target_plus
sa.update(False)
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
print("Ship pointing to dock")
def rcs_push(vessel, axis, duration):
vessel.control.rcs = True
if "x" in axis:
vessel.control.up = axis["x"]
elif "y" in axis:
vessel.control.forward = axis["y"]
elif "z" in axis:
vessel.control.right = axis["z"]
sleep(duration)
if "x" in axis:
vessel.control.up = 0
elif "y" in axis:
vessel.control.forward = 0
elif "z" in axis:
vessel.control.right = 0
vessel.control.rcs = False
def kill_rcs_velocity(vessel, reference_frame):
print("Killing RCS velocity")
velo = vessel.velocity(reference_frame)
vessel.control.rcs = True
while any(abs(component) > .05 for component in velo) > .05:
if abs(velo[0]) > .05:
sign = -velo[0] / abs(velo[0])
if abs(velo[0]) > .1:
vessel.control.up = 1 * sign
elif abs(velo[0]) > .05:
vessel.control.up = .1 * sign
else:
vessel.control.up = 0
if abs(velo[1]) > .05:
sign = -velo[1] / abs(velo[1])
if abs(velo[1]) > .1:
vessel.control.forward = 1 * sign
elif abs(velo[1]) > .05:
vessel.control.forward = .1 * sign
else:
vessel.control.forward = 0
if abs(velo[2]) > .05:
sign = velo[2] / abs(velo[2])
if abs(velo[2]) > .1:
vessel.control.right = 1 * sign
elif abs(velo[2]) > .05:
vessel.control.right = .1 * sign
else:
vessel.control.right = 0
sleep(.1)
velo = vessel.velocity(reference_frame)
vessel.control.rcs = False
print("RCS velocity killed")
def align_horizontally(conn, vessel, reference_frame):
conn.drawing.add_direction((1, 0, 0), vessel.reference_frame)
while abs(vessel.position(reference_frame)[0]) > .1 \
or abs(vessel.position(reference_frame)[2]) > .1:
# determine power requirements of each
sign_x = 1 if vessel.position(reference_frame)[0] > 0 else -1
if abs(vessel.position(reference_frame)[0]) > 1:
power_x = 1
elif abs(vessel.position(reference_frame)[0]) > .1:
power_x = .1
else:
power_x = 0
sign_x = 0
sign_z = 1 if vessel.position(reference_frame)[2] > 0 else -1
if abs(vessel.position(reference_frame)[2]) > 1:
power_z = 1
elif abs(vessel.position(reference_frame)[2]) > .1:
power_z = .1
else:
power_z = 0
sign_z = 0
axis = {}
if power_x > 0:
axis["x"] = -sign_x * power_x
if power_z > 0:
axis["z"] = sign_z * power_z
rcs_push(vessel, axis, 1)
while (sign_x > 0 and vessel.position(reference_frame)[0] > .1
or sign_x < 0 and vessel.position(reference_frame)[0] < -.1
or sign_x == 0) \
and (sign_z > 0 and vessel.position(reference_frame)[2] > .1
or sign_z < 0 and vessel.position(reference_frame)[2] < -.1
or sign_z == 0):
print(vessel.position(reference_frame))
sleep(.1)
kill_rcs_velocity(vessel, reference_frame)
print("Vertical alignment done!")

View File

@@ -1,36 +0,0 @@
from time import sleep
from . import Maneuver
from .utils import magnitude, correct_course_to_target, rcs_push, point_toward_target
class GrappleManeuver(Maneuver):
def __init__(self, conn, mission_control, target):
super().__init__(conn, mission_control)
self.target = target
def start(self):
vessel = self.mission_control.vessel
vessel.parts.controlling = self.mission_control.get_grappling()
point_toward_target(self.conn, vessel, self.target)
self.mission_control.toggle_grappling(True)
rcs_push(vessel, {"y": 1}, .75)
vessel.control.rcs = True
try:
while magnitude(vessel.position(self.target.reference_frame)) > 0:
print(magnitude(vessel.position(self.target.reference_frame)))
sa = self.conn.mech_jeb.smart_ass
sa.autopilot_mode = self.conn.mech_jeb.SmartASSAutopilotMode.relative_plus
sa.update(False)
correct_course_to_target(vessel, self.target)
except ValueError:
vessel = self.conn.space_center.active_vessel
self.mission_control.vessel = vessel
finally:
vessel.control.rcs = False
return True

View File

@@ -18,18 +18,18 @@ class RendezvousManeuver(MechJebManeuver):
if vessel.orbit.distance_at_closest_approach(target.orbit) > 1000:
if vessel.orbit.relative_inclination(target.orbit) > 0.0001:
AlignOrbitPlaneWithTarget(self.conn, self.mission_control).prepare_maneuver()
AlignOrbitPlaneWithTarget(self.conn, vessel).prepare_maneuver()
elif vessel.orbit.distance_at_closest_approach(target.orbit) > 10000:
InterceptTargetOrbit(self.conn, self.mission_control).prepare_maneuver()
InterceptTargetOrbit(self.conn, vessel).prepare_maneuver()
else:
TuneClosestApproach(self.conn, self.mission_control).prepare_maneuver()
TuneClosestApproach(self.conn, vessel).prepare_maneuver()
return False
elif vessel.orbit.distance_at_closest_approach(target.orbit) <= 1000 < magnitude(
np.array(vessel.position(vessel.reference_frame)) - np.array(target.position(vessel.reference_frame))):
MatchVelocityWithTarget(self.conn, self.mission_control).prepare_maneuver()
MatchVelocityWithTarget(self.conn, vessel).prepare_maneuver()
return False
else:
return True

View File

@@ -1,13 +1,7 @@
from time import time, sleep
import math
import numpy as np
from krpc.services.spacecenter import SASMode
from connector import get_connexion
def magnitude(vector):
return np.linalg.norm(vector)
@@ -69,208 +63,3 @@ def correct_course(conn, vessel, waypoint, reference_frame):
vessel.control.up = -.1
else:
vessel.control.up = 0
def correct_course_to_target(vessel, target):
target_position = target.position(vessel.reference_frame)
angle_x = math.atan(target_position[0]/target_position[1])
if math.isclose(angle_x, 0, abs_tol=0.05):
vessel.control.right = 0
elif angle_x < 0:
vessel.control.right = -.1
elif angle_x > 0:
vessel.control.right = .1
angle_z = math.atan(target_position[2]/target_position[1])
if math.isclose(angle_z, 0, abs_tol=0.05):
vessel.control.up = 0
elif angle_z < 0:
vessel.control.up = .1
elif angle_z > 0:
vessel.control.up = -.1
def kill_relative_velocity_rcs(vessel, target):
print("Killing RCS velocity")
vessel.control.sas = True
vessel.control.rcs = True
velocity = target.velocity(vessel.reference_frame)
while any(abs(component) >= .1 for component in velocity):
thrust = get_required_rcs_thrust(vessel, velocity)
vessel.control.right = thrust[0] if abs(velocity[0]) >= .1 else 0
vessel.control.forward = thrust[1] if abs(velocity[1]) >= .1 else 0
vessel.control.up = - thrust[2] if abs(velocity[2]) >= .1 else 0
print(target.velocity(vessel.reference_frame))
print((thrust[0], thrust[1], thrust[2]))
print((vessel.control.right, vessel.control.forward, vessel.control.up))
sleep(.1)
velocity = target.velocity(vessel.reference_frame)
continue
if abs(velocity[0]) > .05:
sign = velocity[0] / abs(velocity[0])
if abs(velocity[0]) > 1:
vessel.control.right = 1 * sign
elif abs(velocity[0]) > .1:
vessel.control.right = .1 * sign
else:
vessel.control.right = 0
if abs(velocity[1]) > .05:
sign = velocity[1] / abs(velocity[1])
if abs(velocity[1]) > 1:
vessel.control.forward = 1 * sign
elif abs(velocity[1]) > .1:
vessel.control.forward = .1 * sign
else:
vessel.control.forward = 0
if abs(velocity[2]) > .05:
sign = - velocity[2] / abs(velocity[2])
if abs(velocity[2]) > 1:
vessel.control.up = 1 * sign
elif abs(velocity[2]) > .1:
vessel.control.up = .1 * sign
else:
vessel.control.up = 0
sleep(.1)
velocity = target.velocity(vessel.reference_frame)
vessel.control.rcs = False
vessel.control.sas = False
print("RCS velocity killed")
def get_required_rcs_thrust(vessel, delta_v, polling=.1):
acceleration = np.array(vessel.available_rcs_force) / vessel.mass
thrust = [0, 0, 0]
for i in range(3):
if delta_v[i] >= 0:
thrust[i] = max(min(delta_v[i] / acceleration[0][i]*polling, 1), .051)
else:
thrust[i] = min(max(-delta_v[i] / acceleration[1][i]*polling, -1), -.051)
return thrust
def get_safety_radius(vessel):
bbox = vessel.bounding_box(vessel.reference_frame)
return max(magnitude(bbox[0]), magnitude(bbox[1]))
def point_toward_direction(vessel, direction, reference_frame):
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = unitary(direction)
ap.target_roll = 0
ap.rcs = False
ap.sas = False
ap.engage()
sleep(1)
ap.wait()
ap.disengage()
ap.sas_mode = SASMode.stability_assist
ap.sas = True
def point_toward_target(conn, vessel, target, force_roll=False):
sa = conn.mech_jeb.smart_ass
sa.autopilot_mode = conn.mech_jeb.SmartASSAutopilotMode.target_plus
sa.force_roll = force_roll
sa.update(False)
while magnitude(vessel.angular_velocity(target.reference_frame)) > .002:
sleep(.1)
def rcs_push(vessel, axis, duration):
vessel.control.rcs = True
if "x" in axis:
vessel.control.up = axis["x"]
elif "y" in axis:
vessel.control.forward = axis["y"]
elif "z" in axis:
vessel.control.right = axis["z"]
sleep(duration)
if "x" in axis:
vessel.control.up = 0
elif "y" in axis:
vessel.control.forward = 0
elif "z" in axis:
vessel.control.right = 0
vessel.control.rcs = False
def set_attitude_and_roll(conn, vessel, reference_frame):
vessel.control.rcs = False
ap = vessel.auto_pilot
ap.reference_frame = reference_frame
ap.target_direction = (0, -1, 0)
ap.target_roll = 0
ap.sas = False
ap.engage()
ap.wait()
ap.disengage()
mj = conn.mech_jeb
sa = mj.smart_ass
sa.autopilot_mode = mj.SmartASSAutopilotMode.target_plus
sa.update(False)
while magnitude(vessel.angular_velocity(reference_frame)) > .1:
sleep(.1)
print("Ship pointing to dock")
def align_horizontally(conn, vessel, reference_frame):
conn.drawing.add_direction((1, 0, 0), vessel.reference_frame)
target = conn.space_center.target_vessel
while abs(vessel.position(reference_frame)[0]) > .1 \
or abs(vessel.position(reference_frame)[2]) > .1:
# determine power requirements of each
sign_x = 1 if vessel.position(reference_frame)[0] > 0 else -1
if abs(vessel.position(reference_frame)[0]) > 1:
power_x = 1
elif abs(vessel.position(reference_frame)[0]) > .1:
power_x = .1
else:
power_x = 0
sign_x = 0
sign_z = 1 if vessel.position(reference_frame)[2] > 0 else -1
if abs(vessel.position(reference_frame)[2]) > 1:
power_z = 1
elif abs(vessel.position(reference_frame)[2]) > .1:
power_z = .1
else:
power_z = 0
sign_z = 0
axis = {}
if power_x > 0:
axis["x"] = -sign_x * power_x
if power_z > 0:
axis["z"] = sign_z * power_z
rcs_push(vessel, axis, 1)
while (sign_x > 0 and vessel.position(reference_frame)[0] > .1
or sign_x < 0 and vessel.position(reference_frame)[0] < -.1
or sign_x == 0) \
and (sign_z > 0 and vessel.position(reference_frame)[2] > .1
or sign_z < 0 and vessel.position(reference_frame)[2] < -.1
or sign_z == 0):
print(vessel.position(reference_frame))
sleep(.1)
kill_relative_velocity_rcs(vessel, target)
print("Vertical alignment done!")

View File

@@ -1,120 +0,0 @@
from time import sleep
from enum import Enum
from maneuvers.grapple import GrappleManeuver
from maneuvers.rendezvous import RendezvousManeuver
from maneuvers.approach_rcs import ApproachRCSManeuver
from maneuvers.docking import DockingManeuver
from maneuvers.comsat import ComsatManeuver
from lib import get_contract, get_rescuee_vessel, get_body, get_connexion
class MissionType(Enum):
transport_kerbal = 1
orbit_rescue = 2
comsat = 3
transport_cargo = 4
class MissionStatus(Enum):
Backlog = 1
Assigned = 2
InProgress = 3
Done = 4
class MissionReport:
mission_status = None
new_vessel_id = None
new_missions = []
def __int__(self):
pass
class Mission:
pass
class TransportMission(Mission):
type = MissionType.transport_kerbal
def __init__(self, kerbal_name, destination):
self.kerbal_name = kerbal_name
self.destination = destination
class RescueMission(Mission):
type = MissionType.orbit_rescue
def __init__(self, contract_title):
self.contract_title = contract_title
self.contract = get_contract(contract_title)
rescuee_first_name = contract_title.split()[1]
self.rescuee_vessel = get_rescuee_vessel(rescuee_first_name)
self.rescuee_vessel_name = self.rescuee_vessel.name
self.rescuee_name = self.rescuee_vessel.crew[0].name
def execute(self, mission_control):
conn = get_connexion()
sc = conn.space_center
if sc.active_vessel.name != mission_control.vessel.name:
sc.active_vessel = mission_control.vessel
if sc.target_vessel is None or sc.target_vessel.name != self.rescuee_vessel.name:
sc.target_vessel = self.rescuee_vessel
rendezvous_done = RendezvousManeuver(conn, mission_control).execute()
if rendezvous_done:
sc.active_vessel = mission_control.deliver_probe(self.rescuee_vessel)
sc.target_vessel = self.rescuee_vessel
sc.active_vessel.parts.controlling = mission_control.get_grappling()
target_grapple_done = GrappleManeuver(conn, mission_control, sc.target_vessel).start()
if target_grapple_done:
mothership_docking_done = DockingManeuver(conn,
mission_control,
mission_control.get_probe_port(),
mission_control.get_bay_port()).start()
if mothership_docking_done:
mission_control.probe = None
kerbal = mission_control.get_kerbal(self.rescuee_name)
sc.transfer_crew(kerbal, mission_control.get_part_with_free_seat())
mission_control.toggle_grappling(False)
mission_control.recover_probe()
# Destroy Capsule
report = MissionReport()
report.mission_status = MissionStatus.Done
report.new_missions.append(
TransportMission(self.rescuee_name, 'kerbin_orbit')
)
return report
class ComSatNetworkMission:
type = MissionType.comsat
def __init__(self, body_name):
self.body = get_body(body_name)
def execute(self, mission_control):
conn = get_connexion()
sc = conn.space_center
if sc.active_vessel.name != mission_control.vessel.name:
sc.active_vessel = mission_control.vessel
mission_control.toggle_engines(True)
comsat_done = ComsatManeuver(conn, mission_control, self.body).execute()
if comsat_done:
report = MissionReport()
report.mission_status = MissionStatus.Done
return report

View File

@@ -1,121 +0,0 @@
from maneuvers.docking import DockingManeuver
from . import Mission
class StationBuildMission(Mission):
def __init__(self, conn):
self.conn = conn
def execute(self, mission_control):
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.solar_panel_port())
separate_solar_panels()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Stern Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_tank_holder_port())
separate_tank_holder()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Bow Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_dock_left_port())
separate_left_dock()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Core Port Joint"))
transfer_ship = mission_control.seperate_transfer_ship()
DockingManeuver(self.conn, mission_control, transfer_ship.get_front_port(), mission_control.get_port("Station Dock Left 1 Nadir Port"))
# Ship part package to LKO
# if destination not Kerbin:
# Transfer part package to Right orbit
# if station core not in package:
# Rendez-vous with station (if station core not in package)
# separate package
# for each part:
# Determine connection ports and angle
# Start docking
class Station:
pass
class StationPart:
joints = []
ports = []
class StationCore(StationPart):
joints = [
'Station Core Bow Joint',
'Station Core Starboard Joint',
'Station Core Stern Joint',
'Station Core Port Joint'
]
class StationSolarPanels(StationPart):
def __init__(self, number):
self.joints = [
f'Station Solar Panels {number} Bow Joint',
f'Station Solar Panels {number} Port Joint',
]
class StationTankHolder(StationPart):
joints = [
'Station Tank Holder Stern Joint',
]
ports = [
'Station Tank Holder Fuel Port 1',
'Station Tank Holder Fuel Port 2',
'Station Tank Holder Oxygen Port 1',
'Station Tank Holder Oxygen Port 2',
'Station Tank Holder RCS Port 1',
'Station Tank Holder RCS Port 2',
]
class StationTank(StationPart):
def __init__(self, resource, number):
self.ports = [
f'Station Tank {resource} {number} Bow Port'
f'Station Tank {resource} {number} Stern Port'
]
class StationDockLeft(StationPart):
def __init__(self, number):
self.joints = [
f'Station Dock Left {number} Starboard Joint',
f'Station Dock Left {number} Port Joint',
]
self.ports = [
f'Station Dock Left {number} Zenith Port',
f'Station Dock Left {number} Bow Port',
f'Station Dock Left {number} Nadir Port'
]
class StationDockRight(StationPart):
def __init__(self, number):
self.joints = [
f'Station Dock Right {number} Starboard Joint',
f'Station Dock Right {number} Port Joint',
]
self.ports = [
f'Station Dock Right {number} Zenith Port',
f'Station Dock Right {number} Bow Port',
f'Station Dock Right {number} Nadir Port'
]
class StationResourceConverter(StationPart):
def __int__(self, number):
self.joints = [
f'Station Resource Converter {number} Bow Joint'
]
self.ports = [
f'Station Resource Converter {number} Stern Port'
]

View File

@@ -1,198 +0,0 @@
from time import sleep
import numpy as np
from krpc.services.spacecenter import CargoBayState, DockingPortState
from maneuvers.utils import point_toward_direction, point_toward_target
from maneuver_scheduler import ManeuverScheduler
from mission import MissionStatus, MissionType
from lib import get_connexion, get_vessel
class MissionControl:
mission_types = []
vessel = None
def __init__(self):
self.current_mission = None
self.current_orbit_missions = []
def pick_missions(self, backlog):
current_body_name = self.vessel.orbit.body.name
for i, mission in enumerate(backlog.missions[current_body_name]):
if mission.type in self.mission_types:
if self.current_mission is None:
self.plan_mission(mission)
else:
self.current_orbit_missions.append(mission)
del (backlog.missions[current_body_name][i])
def plan_mission(self, mission):
self.current_mission = mission
planning_duration = 60
ut = ManeuverScheduler.next_free_timeslot(
from_ut=get_connexion().space_center.ut + planning_duration,
duration=planning_duration)
ManeuverScheduler.book_timeslot(ut, self.vessel, duration=planning_duration)
def execute_mission(self, alarm):
if self.current_mission is None:
if self.current_orbit_missions:
self.current_mission = self.current_orbit_missions.pop(0)
else:
return
report = self.current_mission.execute(self)
if report and report.mission_status == MissionStatus.Done:
self.current_mission = None
def get_kerbal(self, name):
for k in self.vessel.crew:
if k.name == name:
return k
raise LookupError('Kerbal {} not found onboard vessel {}'.format(name, self.vessel.name))
def get_docking_port(self, name):
for m in self.vessel.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.vessel.name))
class ShuttleKerbin(MissionControl):
mission_types = [MissionType.orbit_rescue, MissionType.transport]
PISTON_MAX_EXTENSION = 2.4
def __init__(self, vessel_name):
super().__init__()
self.vessel_name = vessel_name
self.mothership = get_vessel(vessel_name)
self.probe = None
for e in self.vessel.parts.engines:
e.active = True
self.current_orbit_missions = []
self.first_cabin = self.mothership.parts.with_name('mk2Cockpit.Inline')[0]
self.second_cabin = self.mothership.parts.with_name('mk2CrewCabin')[0]
self.third_cabin = self.mothership.parts.with_name('mk2CrewCabin')[1]
self.docking_ports = self.mothership.parts.docking_ports
@property
def vessel(self):
if self.probe is None:
return self.mothership
else:
return self.probe
@vessel.setter
def vessel(self, value):
if self.probe is None:
self.mothership = value
else:
self.probe = value
def toggle_grappling(self, value=True):
arm = self.get_grappling()
if arm.modules[1].has_event('Arm') and value \
or arm.modules[1].has_event('Disarm') and not value:
arm.modules[1].set_action_by_id('ToggleAction', True)
def get_grappling(self):
if self.probe is None:
return self.vessel.parts.with_name('GrapplingDevice')[0]
return self.probe.parts.with_name('GrapplingDevice')[0]
def get_probe_port(self):
return self.get_docking_port('Tug Probe Port')
def get_bay_port(self):
if self.probe is not None:
return self.get_mothership_docking_port('Shuttle Bay Port')
return self.get_docking_port('Shuttle Bay Port')
def get_mothership_docking_port(self, name):
for m in self.mothership.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
def get_probe_docking_port(self, name):
for m in self.probe.parts.modules_with_name('ModuleDockingNodeNamed'):
if m.get_field_by_id('portName') == name:
return m.part.docking_port
raise LookupError('Docking port {} not found on vessel {}'.format(name, self.mothership.name))
def deliver_probe(self, target=None):
bay = self.mothership.parts.cargo_bays[0]
bay.open = True
with get_connexion().stream(getattr, bay, 'state') as state:
with state.condition:
while not state() == CargoBayState.open:
state.wait()
hinge = self.mothership.parts.robotic_hinges[0]
hinge.target_angle = 90
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
with current_angle.condition:
while current_angle() < hinge.target_angle - 1:
current_angle.wait()
if target is not None:
self.vessel.parts.controlling = self.get_bay_port().part
point_toward_target(get_connexion(), self.vessel, target)
bay_port = self.get_bay_port()
self.probe = bay_port.undock()
return self.probe
def recover_probe(self):
bay_port = self.get_bay_port()
if bay_port.state != DockingPortState.docked:
raise Exception("Probe is not parked in shuttle")
self.probe = None
hinge = self.mothership.parts.robotic_hinges[0]
hinge.target_angle = 0
with get_connexion().stream(getattr, hinge, 'current_angle') as current_angle:
with current_angle.condition:
while current_angle() > hinge.target_angle:
current_angle.wait()
bay = self.mothership.parts.cargo_bays[0]
bay.open = False
def get_part_with_free_seat(self):
crew_size = self.mothership.crew_count
if self.mothership.crew_capacity > 10:
crew_size = crew_size - 1
if crew_size <= 2:
return self.first_cabin
elif crew_size <= 6:
return self.second_cabin
elif crew_size <= 10:
return self.third_cabin
else:
raise Exception('Vessel already has no free sit')
class ComsatKerbin(MissionControl):
mission_types = [MissionType.comsat]
def __init__(self, vessel_name):
super().__init__()
self.vessel_name = vessel_name
self.vessel = get_vessel(vessel_name)
def toggle_engines(self, value=True):
self.vessel.parts.engines[0].active = value