165 lines
4.4 KiB
Python
Executable File
165 lines
4.4 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import os
|
|
import subprocess
|
|
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
import configparser
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
|
|
from services.mw40v import Mw40V
|
|
|
|
app = FastAPI()
|
|
|
|
security = HTTPBasic()
|
|
|
|
USB_DHCPD_FILENAME = '/etc/dhcpcd-backup.conf'
|
|
#USB_DHCPD_FILENAME = './dhcpcd-backup.conf'
|
|
AIR_INTERFACE_NAME = 'usb0'
|
|
METRIC_ON = 50
|
|
METRIC_OFF = 300
|
|
|
|
|
|
class Config:
|
|
config = None
|
|
|
|
@classmethod
|
|
def get(cls):
|
|
if cls.config is None:
|
|
cls.config = configparser.ConfigParser()
|
|
cls.config.read('./config.ini')
|
|
|
|
return cls.config
|
|
|
|
|
|
class AirModem:
|
|
airModem = None
|
|
|
|
@classmethod
|
|
def get(cls):
|
|
if cls.airModem is None:
|
|
config = Config.get()
|
|
|
|
cls.airModem = Mw40V(
|
|
config['MW40V']['username'],
|
|
config['MW40V']['password'],
|
|
config['MW40V']['encrypt_key'],
|
|
config['MW40V']['verification_key'],
|
|
)
|
|
|
|
return cls.airModem
|
|
|
|
|
|
def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
|
|
config = Config.get()
|
|
|
|
if credentials.username != config['credentials']['username']:
|
|
raise_unauthorized()
|
|
|
|
try:
|
|
PasswordHasher().verify(config['credentials']['password_hash'], credentials.password)
|
|
except VerifyMismatchError:
|
|
raise_unauthorized()
|
|
|
|
return credentials.username
|
|
|
|
|
|
def raise_unauthorized():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
|
|
@app.get('/status')
|
|
async def get_status(username: Annotated[str, Depends(get_username)]):
|
|
return {"status": "usb" if is_usb_on() else "eth"}
|
|
|
|
|
|
@app.post('/status/{status}')
|
|
async def set_status(username: Annotated[str, Depends(get_username)], status):
|
|
if status == 'usb' and not is_usb_on():
|
|
switch_status(True)
|
|
elif status == 'eth' and is_usb_on():
|
|
switch_status(False)
|
|
|
|
return {'status': 'usb' if is_usb_on() else 'eth'}
|
|
|
|
|
|
def is_usb_on():
|
|
with open(USB_DHCPD_FILENAME, 'r') as dhcpd_file:
|
|
right_interface = False
|
|
for line in dhcpd_file:
|
|
if line.startswith('interface'):
|
|
right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME
|
|
|
|
if right_interface and 'metric' in line:
|
|
return line.strip().split(' ')[-1] == str(METRIC_ON)
|
|
|
|
|
|
def switch_status(turn_on):
|
|
if turn_on:
|
|
update_usb_modem(True)
|
|
update_dhcp_conf(True)
|
|
else:
|
|
update_dhcp_conf(False)
|
|
update_usb_modem(False)
|
|
|
|
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'dhcpcd', 'restart'])
|
|
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'networking', 'restart'])
|
|
|
|
|
|
def update_dhcp_conf(turn_on):
|
|
with open(USB_DHCPD_FILENAME, 'w') as dhcpd_file:
|
|
dhcpd_file.write("interface {}\n".format(AIR_INTERFACE_NAME))
|
|
dhcpd_file.write("\tmetric {}\n".format(METRIC_ON if turn_on else METRIC_OFF))
|
|
|
|
|
|
def update_usb_modem(turn_on):
|
|
if turn_on:
|
|
AirModem.get().connect()
|
|
else:
|
|
AirModem.get().disconnect()
|
|
|
|
|
|
def initialize_app():
|
|
script_dir = Path(__file__).parent.absolute()
|
|
credentials_file_path = str(script_dir) + '/config.ini'
|
|
if not os.path.isfile(credentials_file_path):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
username = input("Enter a username for the API")
|
|
password = input("Enter a password for the API")
|
|
config['credentials'] = {
|
|
'username': username,
|
|
'password_hash': PasswordHasher().hash(password)
|
|
}
|
|
|
|
username_mw40v = input("Enter the username for MW40V")
|
|
password_mw40v = input("Enter the password for MW40V")
|
|
encrypt_key = input("Enter the encrypt_key for MW40V")
|
|
verification_key = input("Enter the verification_key for MW40V")
|
|
config['MW40V'] = {
|
|
'username': username_mw40v,
|
|
'password': password_mw40v,
|
|
'encrypt_key': encrypt_key,
|
|
'verification_key': verification_key
|
|
}
|
|
|
|
with open(credentials_file_path, 'w') as configfile:
|
|
config.write(configfile)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
initialize_app()
|
|
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|