#!/usr/bin/env python import os import subprocess from pathlib import Path from typing import Annotated from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError import configparser from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from mw40v import Mw40V app = FastAPI() security = HTTPBasic() USB_DHCPD_FILENAME = '/etc/dhcpcd-backup.conf' #USB_DHCPD_FILENAME = './dhcpcd-backup.conf' AIR_INTERFACE_NAME = 'usb0' METRIC_ON = 50 METRIC_OFF = 300 class Config: config = None @classmethod def get(cls): if cls.config is None: cls.config = configparser.ConfigParser() cls.config.read('./config.ini') return cls.config class AirModem: airModem = None @classmethod def get(cls): if cls.airModem is None: config = Config.get() cls.airModem = Mw40V( config['MW40V']['username'], config['MW40V']['password'], config['MW40V']['encrypt_key'], config['MW40V']['verification_key'], ) return cls.airModem def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): config = Config.get() if credentials.username != config['credentials']['username']: raise_unauthorized() try: PasswordHasher().verify(config['credentials']['password_hash'], credentials.password) except VerifyMismatchError: raise_unauthorized() return credentials.username def raise_unauthorized(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"}, ) @app.get('/status') async def get_status(username: Annotated[str, Depends(get_username)]): return {"status": "usb" if is_usb_on() else "eth"} @app.post('/status/{status}') async def set_status(username: Annotated[str, Depends(get_username)], status): if status == 'usb' and not is_usb_on(): switch_status(True) elif status == 'eth' and is_usb_on(): switch_status(False) return {'status': 'usb' if is_usb_on() else 'eth'} def is_usb_on(): with open(USB_DHCPD_FILENAME, 'r') as dhcpd_file: right_interface = False for line in dhcpd_file: if line.startswith('interface'): right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME if right_interface and 'metric' in line: return line.strip().split(' ')[-1] == str(METRIC_ON) def switch_status(turn_on): if turn_on: update_usb_modem(turn_on) update_dhcp_conf(turn_on) else: update_dhcp_conf(turn_on) update_usb_modem(turn_on) subprocess.run('service dhcpcd restart && service networking restart') def update_dhcp_conf(turn_on): with open(USB_DHCPD_FILENAME, 'w') as dhcpd_file: dhcpd_file.write("interface {}\n".format(AIR_INTERFACE_NAME)) dhcpd_file.write("\tmetric {}\n".format(METRIC_ON if turn_on else METRIC_OFF)) def update_usb_modem(turn_on): if turn_on: AirModem.get().connect() else: AirModem.get().disconnect() def initialize_app(): script_dir = Path(__file__).parent.absolute() credentials_file_path = str(script_dir) + '/config.ini' if not os.path.isfile(credentials_file_path): config = configparser.ConfigParser() username = input("Enter a username for the API") password = input("Enter a password for the API") config['credentials'] = { 'username': username, 'password_hash': PasswordHasher().hash(password) } username_mw40v = input("Enter the username for MW40V") password_mw40v = input("Enter the password for MW40V") encrypt_key = input("Enter the encrypt_key for MW40V") verification_key = input("Enter the verification_key for MW40V") config['MW40V'] = { 'username': username_mw40v, 'password': password_mw40v, 'encrypt_key': encrypt_key, 'verification_key': verification_key } with open(credentials_file_path, 'w') as configfile: config.write(configfile) if __name__ == "__main__": initialize_app() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)