Files
network-roaming/main.py

165 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python
import os
import subprocess
from pathlib import Path
from typing import Annotated
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
import configparser
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from mw40v import Mw40V
app = FastAPI()
security = HTTPBasic()
USB_DHCPD_FILENAME = '/etc/dhcpcd-backup.conf'
#USB_DHCPD_FILENAME = './dhcpcd-backup.conf'
AIR_INTERFACE_NAME = 'usb0'
METRIC_ON = 50
METRIC_OFF = 300
class Config:
config = None
@classmethod
def get(cls):
if cls.config is None:
cls.config = configparser.ConfigParser()
cls.config.read('./config.ini')
return cls.config
class AirModem:
airModem = None
@classmethod
def get(cls):
if cls.airModem is None:
config = Config.get()
cls.airModem = Mw40V(
config['MW40V']['username'],
config['MW40V']['password'],
config['MW40V']['encrypt_key'],
config['MW40V']['verification_key'],
)
return cls.airModem
def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
config = Config.get()
if credentials.username != config['credentials']['username']:
raise_unauthorized()
try:
PasswordHasher().verify(config['credentials']['password_hash'], credentials.password)
except VerifyMismatchError:
raise_unauthorized()
return credentials.username
def raise_unauthorized():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"},
)
@app.get('/status')
async def get_status(username: Annotated[str, Depends(get_username)]):
return {"status": "usb" if is_usb_on() else "eth"}
@app.post('/status/{status}')
async def set_status(username: Annotated[str, Depends(get_username)], status):
if status == 'usb' and not is_usb_on():
switch_status(True)
elif status == 'eth' and is_usb_on():
switch_status(False)
return {'status': 'usb' if is_usb_on() else 'eth'}
def is_usb_on():
with open(USB_DHCPD_FILENAME, 'r') as dhcpd_file:
right_interface = False
for line in dhcpd_file:
if line.startswith('interface'):
right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME
if right_interface and 'metric' in line:
return line.strip().split(' ')[-1] == str(METRIC_ON)
def switch_status(turn_on):
if turn_on:
update_usb_modem(turn_on)
update_dhcp_conf(turn_on)
else:
update_dhcp_conf(turn_on)
update_usb_modem(turn_on)
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'dhcpcd', 'restart'])
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'networking', 'restart'])
def update_dhcp_conf(turn_on):
with open(USB_DHCPD_FILENAME, 'w') as dhcpd_file:
dhcpd_file.write("interface {}\n".format(AIR_INTERFACE_NAME))
dhcpd_file.write("\tmetric {}\n".format(METRIC_ON if turn_on else METRIC_OFF))
def update_usb_modem(turn_on):
if turn_on:
AirModem.get().connect()
else:
AirModem.get().disconnect()
def initialize_app():
script_dir = Path(__file__).parent.absolute()
credentials_file_path = str(script_dir) + '/config.ini'
if not os.path.isfile(credentials_file_path):
config = configparser.ConfigParser()
username = input("Enter a username for the API")
password = input("Enter a password for the API")
config['credentials'] = {
'username': username,
'password_hash': PasswordHasher().hash(password)
}
username_mw40v = input("Enter the username for MW40V")
password_mw40v = input("Enter the password for MW40V")
encrypt_key = input("Enter the encrypt_key for MW40V")
verification_key = input("Enter the verification_key for MW40V")
config['MW40V'] = {
'username': username_mw40v,
'password': password_mw40v,
'encrypt_key': encrypt_key,
'verification_key': verification_key
}
with open(credentials_file_path, 'w') as configfile:
config.write(configfile)
if __name__ == "__main__":
initialize_app()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)