From ccce1f1248ce4baa103ae5575c5a2d01db66beb8 Mon Sep 17 00:00:00 2001 From: ewandor Date: Thu, 19 Oct 2023 20:19:10 +0200 Subject: [PATCH] Adding mw40v APi client --- README.md | 15 ++++++++- main.py | 66 ++++++++++++++++++++++++++++++------ mw40v.py | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 157 insertions(+), 12 deletions(-) create mode 100644 mw40v.py diff --git a/README.md b/README.md index c8b13c2..2bd8389 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,15 @@ -# network-roaming +# Network Roaming +Api that allows to switch a router from landline to 4G and back + +## Alcatel MW40V configuration + +### Find the encryption key +in default.html: + +`````` + +### Find the verification key +in js/sdk.js: + +```headers['_TclRequestVerificationKey'] = "here_is_the_verification_keys";``` \ No newline at end of file diff --git a/main.py b/main.py index 5e20ee5..afe4992 100644 --- a/main.py +++ b/main.py @@ -9,6 +9,8 @@ import configparser from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials +from mw40v import Mw40V + app = FastAPI() security = HTTPBasic() @@ -20,9 +22,38 @@ ON_FILENAME = 'dhcpcd-usb-on.conf' OFF_FILENAME = 'dhcpcd-usb-off.conf' +class Config: + config = None + + @classmethod + def get(cls): + if cls.config is None: + cls.config = configparser.ConfigParser() + cls.config.read('./config.ini') + + return cls.config + + +class AirModem: + airModem = None + + @classmethod + def get(cls): + if cls.airModem is None: + config = Config.get() + + cls.airModem = Mw40V( + config['MW40V']['username'], + config['MW40V']['password'], + config['MW40V']['encrypt_key'], + config['MW40V']['verification_key'], + ) + + return cls.airModem + + def get_username(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): - config = configparser.ConfigParser() - config.read('./config.ini') + config = Config.get() if credentials.username != config['credentials']['username']: raise_unauthorized() @@ -65,13 +96,10 @@ def is_usb_on(): def switch_status(turn_on): if turn_on: - # call usb modem api to turn it on update_usb_modem(turn_on) - - update_dhcp_conf(turn_on) - - if not turn_on: - # call usb modem api to turn it off + update_dhcp_conf(turn_on) + else: + update_dhcp_conf(turn_on) update_usb_modem(turn_on) @@ -82,21 +110,37 @@ def update_dhcp_conf(turn_on): def update_usb_modem(turn_on): - pass + if turn_on: + AirModem.get().connect() + else: + AirModem.get().disconnect() def initialize_app(): script_dir = Path(__file__).parent.absolute() credentials_file_path = str(script_dir) + '/config.ini' if not os.path.isfile(credentials_file_path): - username = input("Enter a username") - password = input("Enter a password") config = configparser.ConfigParser() + + username = input("Enter a username for the API") + password = input("Enter a password for the API") config['credentials'] = { 'username': username, 'password_hash': PasswordHasher().hash(password) } + + username = input("Enter the username for MW40V") + password = input("Enter the password for MW40V") + encrypt_key = input("Enter the encrypt_key for MW40V") + verification_key = input("Enter the verification_key for MW40V") + config['MW40V'] = { + 'username': username, + 'password': PasswordHasher().hash(password), + 'encrypt_key': encrypt_key, + 'verification_key': verification_key + } + with open(credentials_file_path, 'w') as configfile: config.write(configfile) diff --git a/mw40v.py b/mw40v.py new file mode 100644 index 0000000..e558a7f --- /dev/null +++ b/mw40v.py @@ -0,0 +1,87 @@ + +import requests +import json + + +class Mw40V: + def __init__(self, username, password, encrypt_key, verification_key): + self.username = username + self.password = password + self.encrypt_key = encrypt_key + self.verification_key = verification_key + self.login_token = None + + def request(self, method, params, id): + body = { + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": id + } + + r = requests.post( + f"http://bbox.nomad/jrd/webapi?api={method}", + data=json.dumps(body), + headers=self.headers(method) + ) + + return r.json()['result'] + + def headers(self, method): + return { + 'Referer': 'http://bbox.nomad/index.html', + '_TclRequestVerificationKey': self.verification_key, + '_TclRequestVerificationToken': self.encrypt(str(self.login_token)) or "null" + } + + def login(self, user, password): + result = self.request( + "Login", + { + "UserName": self.encrypt(user), + "Password": self.encrypt(password) + }, + "1.1" + ) + + if 'token' not in result: + raise Exception("Error while login to nomad bbox") + + self.login_token = result['token'] + + def connect(self): + self.login(self.username, self.password) + result = self.request( + "Connect", + None, + "3.2" + ) + + return result + + def disconnect(self): + self.login(self.username, self.password) + result = self.request( + "DisConnect", + None, + "3.3" + ) + + return result + + def encrypt(self, subject): + if not str: + return "" + + subject = str(subject) + result_list = [] + for i, char_i in enumerate(subject): + num_char_i = ord(char_i) + x = (ord(self.encrypt_key[i % len(self.encrypt_key)]) & 0xf0) \ + | ((num_char_i & 0xf) ^ (ord(self.encrypt_key[i % len(self.encrypt_key)]) & 0xf)) + y = (ord(self.encrypt_key[i % len(self.encrypt_key)]) & 0xf0) \ + | ((num_char_i >> 4) ^ (ord(self.encrypt_key[i % len(self.encrypt_key)]) & 0xf)) + result_list.append(chr(x)) + result_list.append(chr(y)) + + return ''.join(result_list) diff --git a/requirements.txt b/requirements.txt index 561ed6d..50399f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ fastapi uvicorn argon2-cffi +requests