Adding a detector to know if internet is down or not
This commit is contained in:
164
router.py
Executable file
164
router.py
Executable file
@@ -0,0 +1,164 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
from argon2 import PasswordHasher
|
||||
from argon2.exceptions import VerifyMismatchError
|
||||
import configparser
|
||||
|
||||
from fastapi import FastAPI, Depends, HTTPException, status
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
|
||||
from mw40v import Mw40V
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
security = HTTPBasic()
|
||||
|
||||
USB_DHCPD_FILENAME = '/etc/dhcpcd-backup.conf'
|
||||
#USB_DHCPD_FILENAME = './dhcpcd-backup.conf'
|
||||
AIR_INTERFACE_NAME = 'usb0'
|
||||
METRIC_ON = 50
|
||||
METRIC_OFF = 300
|
||||
|
||||
|
||||
class Config:
|
||||
config = None
|
||||
|
||||
@classmethod
|
||||
def get(cls):
|
||||
if cls.config is None:
|
||||
cls.config = configparser.ConfigParser()
|
||||
cls.config.read('./config.ini')
|
||||
|
||||
return cls.config
|
||||
|
||||
|
||||
class AirModem:
|
||||
airModem = None
|
||||
|
||||
@classmethod
|
||||
def get(cls):
|
||||
if cls.airModem is None:
|
||||
config = Config.get()
|
||||
|
||||
cls.airModem = Mw40V(
|
||||
config['MW40V']['username'],
|
||||
config['MW40V']['password'],
|
||||
config['MW40V']['encrypt_key'],
|
||||
config['MW40V']['verification_key'],
|
||||
)
|
||||
|
||||
return cls.airModem
|
||||
|
||||
|
||||
def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
|
||||
config = Config.get()
|
||||
|
||||
if credentials.username != config['credentials']['username']:
|
||||
raise_unauthorized()
|
||||
|
||||
try:
|
||||
PasswordHasher().verify(config['credentials']['password_hash'], credentials.password)
|
||||
except VerifyMismatchError:
|
||||
raise_unauthorized()
|
||||
|
||||
return credentials.username
|
||||
|
||||
|
||||
def raise_unauthorized():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect username or password",
|
||||
headers={"WWW-Authenticate": "Basic"},
|
||||
)
|
||||
|
||||
|
||||
@app.get('/status')
|
||||
async def get_status(username: Annotated[str, Depends(get_username)]):
|
||||
return {"status": "usb" if is_usb_on() else "eth"}
|
||||
|
||||
|
||||
@app.post('/status/{status}')
|
||||
async def set_status(username: Annotated[str, Depends(get_username)], status):
|
||||
if status == 'usb' and not is_usb_on():
|
||||
switch_status(True)
|
||||
elif status == 'eth' and is_usb_on():
|
||||
switch_status(False)
|
||||
|
||||
return {'status': 'usb' if is_usb_on() else 'eth'}
|
||||
|
||||
|
||||
def is_usb_on():
|
||||
with open(USB_DHCPD_FILENAME, 'r') as dhcpd_file:
|
||||
right_interface = False
|
||||
for line in dhcpd_file:
|
||||
if line.startswith('interface'):
|
||||
right_interface = line.strip().split(' ')[-1] == AIR_INTERFACE_NAME
|
||||
|
||||
if right_interface and 'metric' in line:
|
||||
return line.strip().split(' ')[-1] == str(METRIC_ON)
|
||||
|
||||
|
||||
def switch_status(turn_on):
|
||||
if turn_on:
|
||||
update_usb_modem(turn_on)
|
||||
update_dhcp_conf(turn_on)
|
||||
else:
|
||||
update_dhcp_conf(turn_on)
|
||||
update_usb_modem(turn_on)
|
||||
|
||||
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'dhcpcd', 'restart'])
|
||||
subprocess.run(['/usr/bin/sudo', '/usr/sbin/service', 'networking', 'restart'])
|
||||
|
||||
|
||||
def update_dhcp_conf(turn_on):
|
||||
with open(USB_DHCPD_FILENAME, 'w') as dhcpd_file:
|
||||
dhcpd_file.write("interface {}\n".format(AIR_INTERFACE_NAME))
|
||||
dhcpd_file.write("\tmetric {}\n".format(METRIC_ON if turn_on else METRIC_OFF))
|
||||
|
||||
|
||||
def update_usb_modem(turn_on):
|
||||
if turn_on:
|
||||
AirModem.get().connect()
|
||||
else:
|
||||
AirModem.get().disconnect()
|
||||
|
||||
|
||||
def initialize_app():
|
||||
script_dir = Path(__file__).parent.absolute()
|
||||
credentials_file_path = str(script_dir) + '/config.ini'
|
||||
if not os.path.isfile(credentials_file_path):
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
|
||||
username = input("Enter a username for the API")
|
||||
password = input("Enter a password for the API")
|
||||
config['credentials'] = {
|
||||
'username': username,
|
||||
'password_hash': PasswordHasher().hash(password)
|
||||
}
|
||||
|
||||
username_mw40v = input("Enter the username for MW40V")
|
||||
password_mw40v = input("Enter the password for MW40V")
|
||||
encrypt_key = input("Enter the encrypt_key for MW40V")
|
||||
verification_key = input("Enter the verification_key for MW40V")
|
||||
config['MW40V'] = {
|
||||
'username': username_mw40v,
|
||||
'password': password_mw40v,
|
||||
'encrypt_key': encrypt_key,
|
||||
'verification_key': verification_key
|
||||
}
|
||||
|
||||
with open(credentials_file_path, 'w') as configfile:
|
||||
config.write(configfile)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
initialize_app()
|
||||
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user