diff --git a/src/dashboard.py b/src/dashboard.py index 76ce3fa..f63a14c 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -32,11 +32,13 @@ from util import regex_match, check_DNS, check_Allowed_IPs, check_remote_endpoin # Dashboard Version DASHBOARD_VERSION = 'v3.0' +# WireGuard configuration path +WG_CONF_PATH = None # Dashboard Config Name configuration_path = os.getenv('CONFIGURATION_PATH', '.') -db_path = os.path.join(configuration_path, 'db') -if not os.path.isdir(db_path): - os.mkdir(db_path) +DB_PATH = os.path.join(configuration_path, 'db') +if not os.path.isdir(DB_PATH): + os.mkdir(DB_PATH) DASHBOARD_CONF = os.path.join(configuration_path, 'wg-dashboard.ini') # Upgrade Required UPDATE = None @@ -53,37 +55,61 @@ def connect_db(): return sqlite3.connect(os.path.join(configuration_path, 'db', 'wgdashboard.db')) +# TODO use class and object oriented programming + # Read / Write Dashboard Config File def get_dashboard_conf(): + """Dashboard Configuration Related + + :return: A config parser object + :rtype: configparser.ConfigParser """ - Dashboard Configuration Related - """ + config = configparser.ConfigParser(strict=False) config.read(DASHBOARD_CONF) return config def set_dashboard_conf(config): + """Configuration writer + + :param config: A config parser object + :type config: configparser.ConfigParser + """ + with open(DASHBOARD_CONF, "w", encoding='utf-8') as conf_object: config.write(conf_object) # Get all keys from a configuration def get_conf_peer_key(config_name): + """Get the peers keys of wireguard interface. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return list of peers keys or text if configuration not running + :rtype: list, str """ - Configuration Related - """ + try: - peer_key = subprocess.run(f"wg show {config_name} peers", - check=True, shell=True, capture_output=True).stdout - peer_key = peer_key.decode("UTF-8").split() - return peer_key + peers_keys = subprocess.run(f"wg show {config_name} peers", + check=True, shell=True, capture_output=True).stdout + peers_keys = peers_keys.decode("UTF-8").split() + return peers_keys except subprocess.CalledProcessError: return config_name + " is not running." # Get numbers of connected peer of a configuration def get_conf_running_peer_number(config_name): + """Get number of running peers on wireguard interface. + + :param config_name: Name of WG interface + :type config_name: str + :return: Number of running peers, or test if configuration not running + :rtype: int, str + """ + running = 0 # Get latest handshakes try: @@ -103,9 +129,18 @@ def get_conf_running_peer_number(config_name): return running +# TODO use modules for working with ini(configparser or wireguard) # Read [Interface] section from configuration file def read_conf_file_interface(config_name): - conf_location = wg_conf_path + "/" + config_name + ".conf" + """Get interface settings. + + :param config_name: Name of WG interface + :type config_name: str + :return: Dictionary with interface settings + :rtype: dict + """ + + conf_location = WG_CONF_PATH + "/" + config_name + ".conf" with open(conf_location, 'r', encoding='utf-8') as file_object: file = file_object.read().split("\n") data = {} @@ -119,10 +154,19 @@ def read_conf_file_interface(config_name): return data +# TODO use modules for working with ini(configparser or wireguard) # Read the whole configuration file def read_conf_file(config_name): + """Get configurations from file of wireguard interface. + + :param config_name: Name of WG interface + :type config_name: str + :return: Dictionary with interface and peers settings + :rtype: dict + """ + # Read Configuration File Start - conf_location = wg_conf_path + "/" + config_name + ".conf" + conf_location = WG_CONF_PATH + "/" + config_name + ".conf" f = open(conf_location, 'r') file = f.read().split("\n") conf_peer_data = { @@ -316,8 +360,15 @@ def get_all_peers_data(config_name): # Search for peers def get_peers(config_name, search, sort_t): - """ - Frontend Related Functions + """Get all peers. + + :param config_name: Name of WG interface + :type config_name: str + :param search: Search string + :type search: str + :param sort_t: TODO + :type sort_t: str + :return: TODO """ tic = time.perf_counter() col = g.cur.execute("PRAGMA table_info(" + config_name + ")").fetchall() @@ -342,9 +393,17 @@ def get_peers(config_name, search, sort_t): # Get configuration public key def get_conf_pub_key(config_name): + """Get public key for configuration. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return public key or empty string + :rtype: str + """ + try: conf = configparser.ConfigParser(strict=False) - conf.read(wg_conf_path + "/" + config_name + ".conf") + conf.read(WG_CONF_PATH + "/" + config_name + ".conf") pri = conf.get("Interface", "PrivateKey") pub = subprocess.run(f"echo '{pri}' | wg pubkey", check=True, shell=True, capture_output=True).stdout conf.clear() @@ -355,8 +414,16 @@ def get_conf_pub_key(config_name): # Get configuration listen port def get_conf_listen_port(config_name): + """Get listen port number. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return number of port or empty string + :rtype: str + """ + conf = configparser.ConfigParser(strict=False) - conf.read(wg_conf_path + "/" + config_name + ".conf") + conf.read(WG_CONF_PATH + "/" + config_name + ".conf") port = "" try: port = conf.get("Interface", "ListenPort") @@ -394,8 +461,14 @@ def get_conf_status(config_name): # Get all configuration as a list def get_conf_list(): + """Get all wireguard interfaces with status. + + :return: Return a list of dicts with interfaces and its statuses + :rtype: list + """ + conf = [] - for i in os.listdir(wg_conf_path): + for i in os.listdir(WG_CONF_PATH): if regex_match("^(.{1,}).(conf)$", i): i = i.replace('.conf', '') create_table = f""" @@ -434,6 +507,14 @@ def gen_private_key(): # Generate public key def gen_public_key(private_key): + """Generate the public key. + + :param private_key: Pricate key + :type private_key: str + :return: Return dict with public key or error message + :rtype: dict + """ + with open('private_key.txt', 'w', encoding='utf-8') as file_object: file_object.write(private_key) try: @@ -450,6 +531,18 @@ def gen_public_key(private_key): # Check if private key and public key match def f_check_key_match(private_key, public_key, config_name): + """TODO + + :param private_key: Private key + :type private_key: str + :param public_key: Public key + :type public_key: str + :param config_name: Name of WG interface + :type config_name: str + :return: Return dictionary with status + :rtype: dict + """ + result = gen_public_key(private_key) if result['status'] == 'failed': return result @@ -556,6 +649,12 @@ Sign In / Sign Out # Sign In @app.route('/signin', methods=['GET']) def signin(): + """Sign in request. + + :return: TODO + :rtype: TODO + """ + message = "" if "message" in session: message = session['message'] @@ -566,6 +665,12 @@ def signin(): # Sign Out @app.route('/signout', methods=['GET']) def signout(): + """Sign out request. + + :return: TODO + :rtype: TODO + """ + if "username" in session: session.pop("username") message = "Sign out successfully!" @@ -575,6 +680,12 @@ def signout(): # Authentication @app.route('/auth', methods=['POST']) def auth(): + """Authentication request. + + :return: TODO + :rtype: TODO + """ + config = get_dashboard_conf() password = hashlib.sha256(request.form['password'].encode()) if password.hexdigest() == config["Account"]["password"] \ @@ -590,8 +701,10 @@ def auth(): @app.route('/', methods=['GET']) def index(): - """ - Index Page Related + """Index Page Related. + + :return: TODO + :rtype: TODO """ return render_template('index.html', conf=get_conf_list()) @@ -599,8 +712,10 @@ def index(): # Setting Page @app.route('/settings', methods=['GET']) def settings(): - """ - Setting Page Related + """Setting Page Related. + + :return: TODO + :rtype: TODO """ message = "" status = "" @@ -624,6 +739,12 @@ def settings(): # Update account username @app.route('/update_acct', methods=['POST']) def update_acct(): + """Change account user name. + + :return: TODO + :rtype: TODO + """ + if len(request.form['username']) == 0: session['message'] = "Username cannot be empty." session['message_status'] = "danger" @@ -647,6 +768,12 @@ def update_acct(): # Update peer default settting @app.route('/update_peer_default_config', methods=['POST']) def update_peer_default_config(): + """Change default configurations for peers. + + :return: TODO + :rtype: TODO + """ + config = get_dashboard_conf() if len(request.form['peer_endpoint_allowed_ip']) == 0 or \ len(request.form['peer_global_DNS']) == 0 or \ @@ -712,6 +839,12 @@ def update_peer_default_config(): # Update dashboard password @app.route('/update_pwd', methods=['POST']) def update_pwd(): + """Change account password. + + :return: TODO + :rtype: TODO + """ + config = get_dashboard_conf() if hashlib.sha256(request.form['currentpass'].encode()).hexdigest() == config.get("Account", "password"): if hashlib.sha256(request.form['newpass'].encode()).hexdigest() == hashlib.sha256( @@ -743,6 +876,9 @@ def update_pwd(): # Update dashboard IP and port @app.route('/update_app_ip_port', methods=['POST']) def update_app_ip_port(): + """Change port number of dashboard. + """ + config = get_dashboard_conf() config.set("Server", "app_ip", request.form['app_ip']) config.set("Server", "app_port", request.form['app_port']) @@ -754,6 +890,9 @@ def update_app_ip_port(): # Update WireGuard configuration file path @app.route('/update_wg_conf_path', methods=['POST']) def update_wg_conf_path(): + """Change path to dashboard configuration. + """ + config = get_dashboard_conf() config.set("Server", "wg_conf_path", request.form['wg_conf_path']) set_dashboard_conf(config) @@ -766,9 +905,9 @@ def update_wg_conf_path(): # Update configuration sorting @app.route('/update_dashboard_sort', methods=['POST']) def update_dashbaord_sort(): + """Configuration Page Related """ - Configuration Page Related - """ + config = get_dashboard_conf() data = request.get_json() sort_tag = ['name', 'status', 'allowed_ip'] @@ -784,6 +923,12 @@ def update_dashbaord_sort(): # Update configuration refresh interval @app.route('/update_dashboard_refresh_interval', methods=['POST']) def update_dashboard_refresh_interval(): + """Change the refresh time. + + :return: Return text with result + :rtype: str + """ + preset_interval = ["5000", "10000", "30000", "60000"] if request.form["interval"] in preset_interval: config = get_dashboard_conf() @@ -798,6 +943,14 @@ def update_dashboard_refresh_interval(): # Configuration Page @app.route('/configuration/', methods=['GET']) def configuration(config_name): + """Show wireguard interface view. + + :param config_name: Name of WG interface + :type config_name: str + :return: TODO + :rtype: TODO + """ + config = get_dashboard_conf() conf_data = { "name": config_name, @@ -830,6 +983,14 @@ def configuration(config_name): # Get configuration details @app.route('/get_config/', methods=['GET']) def get_conf(config_name): + """Get configuration setting of wireguard interface. + + :param config_name: Name of WG interface + :type config_name: str + :return: TODO + :rtype: TODO + """ + config_interface = read_conf_file_interface(config_name) search = request.args.get('search') if len(search) == 0: @@ -865,9 +1026,18 @@ def get_conf(config_name): return jsonify(conf_data) + # Turn on / off a configuration @app.route('/switch/', methods=['GET']) def switch(config_name): + """On/off the wireguard interface. + + :param config_name: Name of WG interface + :type config_name: str + :return: TODO + :rtype: TODO + """ + if "username" not in session: print("User not logged in") return redirect(url_for("signin")) @@ -1006,6 +1176,14 @@ def add_peer(config_name): # Remove peer @app.route('/remove_peer/', methods=['POST']) def remove_peer(config_name): + """Remove peer. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return result of action or recommendations + :rtype: str + """ + if get_conf_status(config_name) == "stopped": return "Your need to turn on " + config_name + " first." data = request.get_json() @@ -1037,6 +1215,14 @@ def remove_peer(config_name): # Save peer settings @app.route('/save_peer_setting/', methods=['POST']) def save_peer_setting(config_name): + """Save peer configuration. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return status of action and text with recommendations + :rtype: TODO + """ + data = request.get_json() id = data['id'] name = data['name'] @@ -1066,16 +1252,16 @@ def save_peer_setting(config_name): tmp_psk = open("tmp_edit_psk.txt", "w+") tmp_psk.write(preshared_key) tmp_psk.close() - change_psk = subprocess.check_output(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt", - shell=True, stderr=subprocess.STDOUT) + change_psk = subprocess.run(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt", + shell=True, check=True, stderr=subprocess.STDOUT) if change_psk.decode("UTF-8") != "": return jsonify({"status": "failed", "msg": change_psk.decode("UTF-8")}) if allowed_ip == "": allowed_ip = '""' allowed_ip = allowed_ip.replace(" ", "") - change_ip = subprocess.check_output(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}", - shell=True, stderr=subprocess.STDOUT) - subprocess.check_output(f'wg-quick save {config_name}', shell=True, stderr=subprocess.STDOUT) + change_ip = subprocess.run(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}", + shell=True, check=True, stderr=subprocess.STDOUT) + subprocess.run(f'wg-quick save {config_name}', shell=True, check=True, stderr=subprocess.STDOUT) if change_ip.decode("UTF-8") != "": return jsonify({"status": "failed", "msg": change_ip.decode("UTF-8")}) sql = "UPDATE " + config_name + " SET name = ?, private_key = ?, DNS = ?, endpoint_allowed_ip = ?, mtu = ?, keepalive = ?, preshared_key = ? WHERE id = ?" @@ -1091,6 +1277,14 @@ def save_peer_setting(config_name): # Get peer settings @app.route('/get_peer_data/', methods=['POST']) def get_peer_name(config_name): + """Get peer settings. + + :param config_name: Name of WG interface + :type config_name: str + :return: Return settings of peer + :rtype: TODO + """ + data = request.get_json() peer_id = data['id'] result = g.cur.execute( @@ -1111,12 +1305,24 @@ def available_ips(config_name): # Generate a private key @app.route('/generate_peer', methods=['GET']) def generate_peer(): + """Generate the private key for peer. + + :return: Return dict with private, public and preshared keys + :rtype: TODO + """ + return jsonify(gen_private_key()) # Generate a public key from a private key @app.route('/generate_public_key', methods=['POST']) def generate_public_key(): + """Generate the public key. + + :return: Return dict with public key or error message + :rtype: TODO + """ + data = request.get_json() private_key = data['private_key'] return jsonify(gen_public_key(private_key)) @@ -1125,6 +1331,14 @@ def generate_public_key(): # Check if both key match @app.route('/check_key_match/', methods=['POST']) def check_key_match(config_name): + """TODO + + :param config_name: Name of WG interface + :type config_name: str + :return: Return dictionary with status + :rtype: TODO + """ + data = request.get_json() private_key = data['private_key'] public_key = data['public_key'] @@ -1261,6 +1475,14 @@ def download(config_name): # Switch peer display mode @app.route('/switch_display_mode/', methods=['GET']) def switch_display_mode(mode): + """Change display view style. + + :param mode: Mode name + :type mode: str + :return: Return text with result + :rtype: str + """ + if mode in ['list', 'grid']: config = get_dashboard_conf() config.set("Peers", "peer_display_mode", mode) @@ -1278,6 +1500,12 @@ Dashboard Tools Related # Get all IP for ping @app.route('/get_ping_ip', methods=['POST']) def get_ping_ip(): + """Get ips for network testing. + + :return: TODO + :rtype: TODO + """ + config = request.form['config'] peers = g.cur.execute("SELECT id, name, allowed_ip, endpoint FROM " + config).fetchall() html = "" @@ -1298,6 +1526,12 @@ def get_ping_ip(): # Ping IP @app.route('/ping_ip', methods=['POST']) def ping_ip(): + """Execute ping command. + + :return: Return text with result + :rtype: str + """ + try: result = ping('' + request.form['ip'] + '', count=int(request.form['count']), privileged=True, source=None) returnjson = { @@ -1320,6 +1554,12 @@ def ping_ip(): # Traceroute IP @app.route('/traceroute_ip', methods=['POST']) def traceroute_ip(): + """Execute ping traceroute command. + + :return: Return text with result + :rtype: str + """ + try: result = traceroute('' + request.form['ip'] + '', first_hop=1, max_hops=30, count=1, fast=True) returnjson = [] @@ -1341,6 +1581,9 @@ Dashboard Initialization def init_dashboard(): + """Create dashboard default configuration. + """ + # Set Default INI File if not os.path.isfile(DASHBOARD_CONF): open(DASHBOARD_CONF, "w+").close() @@ -1390,8 +1633,10 @@ def init_dashboard(): def check_update(): - """ - Dashboard check update + """Dashboard check update + + :return: Retunt text with result + :rtype: str """ config = get_dashboard_conf() data = urllib.request.urlopen("https://api.github.com/repos/donaldzou/WGDashboard/releases").read() @@ -1414,6 +1659,6 @@ if __name__ == "__main__": configuration_settings = get_dashboard_conf() app_ip = configuration_settings.get("Server", "app_ip") app_port = int(configuration_settings.get("Server", "app_port")) - wg_conf_path = configuration_settings.get("Server", "wg_conf_path") + WG_CONF_PATH = configuration_settings.get("Server", "wg_conf_path") configuration_settings.clear() app.run(host=app_ip, debug=False, port=app_port)