153 lines
3.9 KiB
Python
153 lines
3.9 KiB
Python
import os
|
|
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
import configparser
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
|
|
from mw40v import Mw40V
|
|
|
|
app = FastAPI()
|
|
|
|
security = HTTPBasic()
|
|
|
|
PATH = '/etc/'
|
|
# PATH = './'
|
|
USB_DHCPD_FILENAME = 'dhcpcd-usb.conf'
|
|
ON_FILENAME = 'dhcpcd-usb-on.conf'
|
|
OFF_FILENAME = 'dhcpcd-usb-off.conf'
|
|
|
|
|
|
class Config:
|
|
config = None
|
|
|
|
@classmethod
|
|
def get(cls):
|
|
if cls.config is None:
|
|
cls.config = configparser.ConfigParser()
|
|
cls.config.read('./config.ini')
|
|
|
|
return cls.config
|
|
|
|
|
|
class AirModem:
|
|
airModem = None
|
|
|
|
@classmethod
|
|
def get(cls):
|
|
if cls.airModem is None:
|
|
config = Config.get()
|
|
|
|
cls.airModem = Mw40V(
|
|
config['MW40V']['username'],
|
|
config['MW40V']['password'],
|
|
config['MW40V']['encrypt_key'],
|
|
config['MW40V']['verification_key'],
|
|
)
|
|
|
|
return cls.airModem
|
|
|
|
|
|
def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
|
|
config = Config.get()
|
|
|
|
if credentials.username != config['credentials']['username']:
|
|
raise_unauthorized()
|
|
|
|
try:
|
|
PasswordHasher().verify(config['credentials']['password_hash'], credentials.password)
|
|
except VerifyMismatchError:
|
|
raise_unauthorized()
|
|
|
|
return credentials.username
|
|
|
|
|
|
def raise_unauthorized():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
|
|
@app.get('/status')
|
|
async def get_status(username: Annotated[str, Depends(get_username)]):
|
|
return {"status": "usb" if is_usb_on() else "eth"}
|
|
|
|
|
|
@app.post('/status/{status}')
|
|
async def set_status(username: Annotated[str, Depends(get_username)], status):
|
|
if status == 'usb' and not is_usb_on():
|
|
switch_status(True)
|
|
elif status == 'eth' and is_usb_on():
|
|
switch_status(False)
|
|
|
|
return {'status': 'usb' if is_usb_on() else 'eth'}
|
|
|
|
|
|
def is_usb_on():
|
|
real_filename = os.path.basename(os.path.realpath(PATH + USB_DHCPD_FILENAME))
|
|
return real_filename == ON_FILENAME
|
|
|
|
|
|
def switch_status(turn_on):
|
|
if turn_on:
|
|
update_usb_modem(turn_on)
|
|
update_dhcp_conf(turn_on)
|
|
else:
|
|
update_dhcp_conf(turn_on)
|
|
update_usb_modem(turn_on)
|
|
|
|
|
|
def update_dhcp_conf(turn_on):
|
|
os.remove(PATH + USB_DHCPD_FILENAME)
|
|
source_file = ON_FILENAME if turn_on else OFF_FILENAME
|
|
os.symlink(PATH + source_file, PATH + USB_DHCPD_FILENAME)
|
|
|
|
|
|
def update_usb_modem(turn_on):
|
|
if turn_on:
|
|
AirModem.get().connect()
|
|
else:
|
|
AirModem.get().disconnect()
|
|
|
|
|
|
def initialize_app():
|
|
script_dir = Path(__file__).parent.absolute()
|
|
credentials_file_path = str(script_dir) + '/config.ini'
|
|
if not os.path.isfile(credentials_file_path):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
username = input("Enter a username for the API")
|
|
password = input("Enter a password for the API")
|
|
config['credentials'] = {
|
|
'username': username,
|
|
'password_hash': PasswordHasher().hash(password)
|
|
}
|
|
|
|
username = input("Enter the username for MW40V")
|
|
password = input("Enter the password for MW40V")
|
|
encrypt_key = input("Enter the encrypt_key for MW40V")
|
|
verification_key = input("Enter the verification_key for MW40V")
|
|
config['MW40V'] = {
|
|
'username': username,
|
|
'password': PasswordHasher().hash(password),
|
|
'encrypt_key': encrypt_key,
|
|
'verification_key': verification_key
|
|
}
|
|
|
|
with open(credentials_file_path, 'w') as configfile:
|
|
config.write(configfile)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
initialize_app()
|
|
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|