#!/usr/bin/env python import os import fileinput from pathlib import Path from typing import Annotated from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError import configparser from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from mw40v import Mw40V app = FastAPI() security = HTTPBasic() USB_DHCPD_FILENAME = '/etc/dhcpcd-backup.conf' #USB_DHCPD_FILENAME = './dhcpcd-backup.conf' AIR_INTERFACE_NAME = 'usb0' METRIC_ON = 50 METRIC_OFF = 300 class Config: config = None @classmethod def get(cls): if cls.config is None: cls.config = configparser.ConfigParser() cls.config.read('./config.ini') return cls.config class AirModem: airModem = None @classmethod def get(cls): if cls.airModem is None: config = Config.get() cls.airModem = Mw40V( config['MW40V']['username'], config['MW40V']['password'], config['MW40V']['encrypt_key'], config['MW40V']['verification_key'], ) return cls.airModem def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): config = Config.get() if credentials.username != config['credentials']['username']: raise_unauthorized() try: PasswordHasher().verify(config['credentials']['password_hash'], credentials.password) except VerifyMismatchError: raise_unauthorized() return credentials.username def raise_unauthorized(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"}, ) @app.get('/status') async def get_status(username: Annotated[str, Depends(get_username)]): return {"status": "usb" if is_usb_on() else "eth"} @app.post('/status/{status}') async def set_status(username: Annotated[str, Depends(get_username)], status): if status == 'usb' and not is_usb_on(): switch_status(True) elif status == 'eth' and is_usb_on(): switch_status(False) return {'status': 'usb' if is_usb_on() else 'eth'} def is_usb_on(): with open(USB_DHCPD_FILENAME, 'r') as dhcpd_file: right_interface = False for line in dhcpd_file: if line.startswith('interface'): right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME if right_interface and 'metric' in line: return line.strip().split(' ')[-1] == str(METRIC_ON) def switch_status(turn_on): if turn_on: update_usb_modem(turn_on) update_dhcp_conf(turn_on) else: update_dhcp_conf(turn_on) update_usb_modem(turn_on) def update_dhcp_conf(turn_on): right_interface = False for line in fileinput.input(USB_DHCPD_FILENAME, inplace=True): if line.startswith('interface'): right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME if right_interface and 'metric' in line: print("\tmetric {}\n".format(METRIC_ON if turn_on else METRIC_OFF), end='') else: print(line, end='') def update_usb_modem(turn_on): if turn_on: AirModem.get().connect() else: AirModem.get().disconnect() def initialize_app(): script_dir = Path(__file__).parent.absolute() credentials_file_path = str(script_dir) + '/config.ini' if not os.path.isfile(credentials_file_path): config = configparser.ConfigParser() username = input("Enter a username for the API") password = input("Enter a password for the API") config['credentials'] = { 'username': username, 'password_hash': PasswordHasher().hash(password) } username_mw40v = input("Enter the username for MW40V") password_mw40v = input("Enter the password for MW40V") encrypt_key = input("Enter the encrypt_key for MW40V") verification_key = input("Enter the verification_key for MW40V") config['MW40V'] = { 'username': username_mw40v, 'password': password_mw40v, 'encrypt_key': encrypt_key, 'verification_key': verification_key } with open(credentials_file_path, 'w') as configfile: config.write(configfile) if __name__ == "__main__": initialize_app() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)