1
0
mirror of https://github.com/donaldzou/WGDashboard.git synced 2024-11-16 12:30:12 +01:00

Merge branch 'main' into Migrate-to-SQLite

This commit is contained in:
Donald Zou 2022-01-12 21:41:41 -05:00 committed by GitHub
commit e5737ebd9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -32,11 +32,13 @@ from util import regex_match, check_DNS, check_Allowed_IPs, check_remote_endpoin
# Dashboard Version
DASHBOARD_VERSION = 'v3.0'
# WireGuard configuration path
WG_CONF_PATH = None
# Dashboard Config Name
configuration_path = os.getenv('CONFIGURATION_PATH', '.')
db_path = os.path.join(configuration_path, 'db')
if not os.path.isdir(db_path):
os.mkdir(db_path)
DB_PATH = os.path.join(configuration_path, 'db')
if not os.path.isdir(DB_PATH):
os.mkdir(DB_PATH)
DASHBOARD_CONF = os.path.join(configuration_path, 'wg-dashboard.ini')
# Upgrade Required
UPDATE = None
@ -53,37 +55,61 @@ def connect_db():
return sqlite3.connect(os.path.join(configuration_path, 'db', 'wgdashboard.db'))
# TODO use class and object oriented programming
# Read / Write Dashboard Config File
def get_dashboard_conf():
"""Dashboard Configuration Related
:return: A config parser object
:rtype: configparser.ConfigParser
"""
Dashboard Configuration Related
"""
config = configparser.ConfigParser(strict=False)
config.read(DASHBOARD_CONF)
return config
def set_dashboard_conf(config):
"""Configuration writer
:param config: A config parser object
:type config: configparser.ConfigParser
"""
with open(DASHBOARD_CONF, "w", encoding='utf-8') as conf_object:
config.write(conf_object)
# Get all keys from a configuration
def get_conf_peer_key(config_name):
"""Get the peers keys of wireguard interface.
:param config_name: Name of WG interface
:type config_name: str
:return: Return list of peers keys or text if configuration not running
:rtype: list, str
"""
Configuration Related
"""
try:
peer_key = subprocess.run(f"wg show {config_name} peers",
check=True, shell=True, capture_output=True).stdout
peer_key = peer_key.decode("UTF-8").split()
return peer_key
peers_keys = subprocess.run(f"wg show {config_name} peers",
check=True, shell=True, capture_output=True).stdout
peers_keys = peers_keys.decode("UTF-8").split()
return peers_keys
except subprocess.CalledProcessError:
return config_name + " is not running."
# Get numbers of connected peer of a configuration
def get_conf_running_peer_number(config_name):
"""Get number of running peers on wireguard interface.
:param config_name: Name of WG interface
:type config_name: str
:return: Number of running peers, or test if configuration not running
:rtype: int, str
"""
running = 0
# Get latest handshakes
try:
@ -103,9 +129,18 @@ def get_conf_running_peer_number(config_name):
return running
# TODO use modules for working with ini(configparser or wireguard)
# Read [Interface] section from configuration file
def read_conf_file_interface(config_name):
conf_location = wg_conf_path + "/" + config_name + ".conf"
"""Get interface settings.
:param config_name: Name of WG interface
:type config_name: str
:return: Dictionary with interface settings
:rtype: dict
"""
conf_location = WG_CONF_PATH + "/" + config_name + ".conf"
with open(conf_location, 'r', encoding='utf-8') as file_object:
file = file_object.read().split("\n")
data = {}
@ -119,10 +154,19 @@ def read_conf_file_interface(config_name):
return data
# TODO use modules for working with ini(configparser or wireguard)
# Read the whole configuration file
def read_conf_file(config_name):
"""Get configurations from file of wireguard interface.
:param config_name: Name of WG interface
:type config_name: str
:return: Dictionary with interface and peers settings
:rtype: dict
"""
# Read Configuration File Start
conf_location = wg_conf_path + "/" + config_name + ".conf"
conf_location = WG_CONF_PATH + "/" + config_name + ".conf"
f = open(conf_location, 'r')
file = f.read().split("\n")
conf_peer_data = {
@ -316,8 +360,15 @@ def get_all_peers_data(config_name):
# Search for peers
def get_peers(config_name, search, sort_t):
"""
Frontend Related Functions
"""Get all peers.
:param config_name: Name of WG interface
:type config_name: str
:param search: Search string
:type search: str
:param sort_t: TODO
:type sort_t: str
:return: TODO
"""
tic = time.perf_counter()
col = g.cur.execute("PRAGMA table_info(" + config_name + ")").fetchall()
@ -342,9 +393,17 @@ def get_peers(config_name, search, sort_t):
# Get configuration public key
def get_conf_pub_key(config_name):
"""Get public key for configuration.
:param config_name: Name of WG interface
:type config_name: str
:return: Return public key or empty string
:rtype: str
"""
try:
conf = configparser.ConfigParser(strict=False)
conf.read(wg_conf_path + "/" + config_name + ".conf")
conf.read(WG_CONF_PATH + "/" + config_name + ".conf")
pri = conf.get("Interface", "PrivateKey")
pub = subprocess.run(f"echo '{pri}' | wg pubkey", check=True, shell=True, capture_output=True).stdout
conf.clear()
@ -355,8 +414,16 @@ def get_conf_pub_key(config_name):
# Get configuration listen port
def get_conf_listen_port(config_name):
"""Get listen port number.
:param config_name: Name of WG interface
:type config_name: str
:return: Return number of port or empty string
:rtype: str
"""
conf = configparser.ConfigParser(strict=False)
conf.read(wg_conf_path + "/" + config_name + ".conf")
conf.read(WG_CONF_PATH + "/" + config_name + ".conf")
port = ""
try:
port = conf.get("Interface", "ListenPort")
@ -394,8 +461,14 @@ def get_conf_status(config_name):
# Get all configuration as a list
def get_conf_list():
"""Get all wireguard interfaces with status.
:return: Return a list of dicts with interfaces and its statuses
:rtype: list
"""
conf = []
for i in os.listdir(wg_conf_path):
for i in os.listdir(WG_CONF_PATH):
if regex_match("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
create_table = f"""
@ -434,6 +507,14 @@ def gen_private_key():
# Generate public key
def gen_public_key(private_key):
"""Generate the public key.
:param private_key: Pricate key
:type private_key: str
:return: Return dict with public key or error message
:rtype: dict
"""
with open('private_key.txt', 'w', encoding='utf-8') as file_object:
file_object.write(private_key)
try:
@ -450,6 +531,18 @@ def gen_public_key(private_key):
# Check if private key and public key match
def f_check_key_match(private_key, public_key, config_name):
"""TODO
:param private_key: Private key
:type private_key: str
:param public_key: Public key
:type public_key: str
:param config_name: Name of WG interface
:type config_name: str
:return: Return dictionary with status
:rtype: dict
"""
result = gen_public_key(private_key)
if result['status'] == 'failed':
return result
@ -556,6 +649,12 @@ Sign In / Sign Out
# Sign In
@app.route('/signin', methods=['GET'])
def signin():
"""Sign in request.
:return: TODO
:rtype: TODO
"""
message = ""
if "message" in session:
message = session['message']
@ -566,6 +665,12 @@ def signin():
# Sign Out
@app.route('/signout', methods=['GET'])
def signout():
"""Sign out request.
:return: TODO
:rtype: TODO
"""
if "username" in session:
session.pop("username")
message = "Sign out successfully!"
@ -575,6 +680,12 @@ def signout():
# Authentication
@app.route('/auth', methods=['POST'])
def auth():
"""Authentication request.
:return: TODO
:rtype: TODO
"""
config = get_dashboard_conf()
password = hashlib.sha256(request.form['password'].encode())
if password.hexdigest() == config["Account"]["password"] \
@ -590,8 +701,10 @@ def auth():
@app.route('/', methods=['GET'])
def index():
"""
Index Page Related
"""Index Page Related.
:return: TODO
:rtype: TODO
"""
return render_template('index.html', conf=get_conf_list())
@ -599,8 +712,10 @@ def index():
# Setting Page
@app.route('/settings', methods=['GET'])
def settings():
"""
Setting Page Related
"""Setting Page Related.
:return: TODO
:rtype: TODO
"""
message = ""
status = ""
@ -624,6 +739,12 @@ def settings():
# Update account username
@app.route('/update_acct', methods=['POST'])
def update_acct():
"""Change account user name.
:return: TODO
:rtype: TODO
"""
if len(request.form['username']) == 0:
session['message'] = "Username cannot be empty."
session['message_status'] = "danger"
@ -647,6 +768,12 @@ def update_acct():
# Update peer default settting
@app.route('/update_peer_default_config', methods=['POST'])
def update_peer_default_config():
"""Change default configurations for peers.
:return: TODO
:rtype: TODO
"""
config = get_dashboard_conf()
if len(request.form['peer_endpoint_allowed_ip']) == 0 or \
len(request.form['peer_global_DNS']) == 0 or \
@ -712,6 +839,12 @@ def update_peer_default_config():
# Update dashboard password
@app.route('/update_pwd', methods=['POST'])
def update_pwd():
"""Change account password.
:return: TODO
:rtype: TODO
"""
config = get_dashboard_conf()
if hashlib.sha256(request.form['currentpass'].encode()).hexdigest() == config.get("Account", "password"):
if hashlib.sha256(request.form['newpass'].encode()).hexdigest() == hashlib.sha256(
@ -743,6 +876,9 @@ def update_pwd():
# Update dashboard IP and port
@app.route('/update_app_ip_port', methods=['POST'])
def update_app_ip_port():
"""Change port number of dashboard.
"""
config = get_dashboard_conf()
config.set("Server", "app_ip", request.form['app_ip'])
config.set("Server", "app_port", request.form['app_port'])
@ -754,6 +890,9 @@ def update_app_ip_port():
# Update WireGuard configuration file path
@app.route('/update_wg_conf_path', methods=['POST'])
def update_wg_conf_path():
"""Change path to dashboard configuration.
"""
config = get_dashboard_conf()
config.set("Server", "wg_conf_path", request.form['wg_conf_path'])
set_dashboard_conf(config)
@ -766,9 +905,9 @@ def update_wg_conf_path():
# Update configuration sorting
@app.route('/update_dashboard_sort', methods=['POST'])
def update_dashbaord_sort():
"""Configuration Page Related
"""
Configuration Page Related
"""
config = get_dashboard_conf()
data = request.get_json()
sort_tag = ['name', 'status', 'allowed_ip']
@ -784,6 +923,12 @@ def update_dashbaord_sort():
# Update configuration refresh interval
@app.route('/update_dashboard_refresh_interval', methods=['POST'])
def update_dashboard_refresh_interval():
"""Change the refresh time.
:return: Return text with result
:rtype: str
"""
preset_interval = ["5000", "10000", "30000", "60000"]
if request.form["interval"] in preset_interval:
config = get_dashboard_conf()
@ -798,6 +943,14 @@ def update_dashboard_refresh_interval():
# Configuration Page
@app.route('/configuration/<config_name>', methods=['GET'])
def configuration(config_name):
"""Show wireguard interface view.
:param config_name: Name of WG interface
:type config_name: str
:return: TODO
:rtype: TODO
"""
config = get_dashboard_conf()
conf_data = {
"name": config_name,
@ -830,6 +983,14 @@ def configuration(config_name):
# Get configuration details
@app.route('/get_config/<config_name>', methods=['GET'])
def get_conf(config_name):
"""Get configuration setting of wireguard interface.
:param config_name: Name of WG interface
:type config_name: str
:return: TODO
:rtype: TODO
"""
config_interface = read_conf_file_interface(config_name)
search = request.args.get('search')
if len(search) == 0:
@ -865,9 +1026,18 @@ def get_conf(config_name):
return jsonify(conf_data)
# Turn on / off a configuration
@app.route('/switch/<config_name>', methods=['GET'])
def switch(config_name):
"""On/off the wireguard interface.
:param config_name: Name of WG interface
:type config_name: str
:return: TODO
:rtype: TODO
"""
if "username" not in session:
print("User not logged in")
return redirect(url_for("signin"))
@ -1006,6 +1176,14 @@ def add_peer(config_name):
# Remove peer
@app.route('/remove_peer/<config_name>', methods=['POST'])
def remove_peer(config_name):
"""Remove peer.
:param config_name: Name of WG interface
:type config_name: str
:return: Return result of action or recommendations
:rtype: str
"""
if get_conf_status(config_name) == "stopped":
return "Your need to turn on " + config_name + " first."
data = request.get_json()
@ -1037,6 +1215,14 @@ def remove_peer(config_name):
# Save peer settings
@app.route('/save_peer_setting/<config_name>', methods=['POST'])
def save_peer_setting(config_name):
"""Save peer configuration.
:param config_name: Name of WG interface
:type config_name: str
:return: Return status of action and text with recommendations
:rtype: TODO
"""
data = request.get_json()
id = data['id']
name = data['name']
@ -1066,16 +1252,16 @@ def save_peer_setting(config_name):
tmp_psk = open("tmp_edit_psk.txt", "w+")
tmp_psk.write(preshared_key)
tmp_psk.close()
change_psk = subprocess.check_output(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt",
shell=True, stderr=subprocess.STDOUT)
change_psk = subprocess.run(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt",
shell=True, check=True, stderr=subprocess.STDOUT)
if change_psk.decode("UTF-8") != "":
return jsonify({"status": "failed", "msg": change_psk.decode("UTF-8")})
if allowed_ip == "":
allowed_ip = '""'
allowed_ip = allowed_ip.replace(" ", "")
change_ip = subprocess.check_output(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}",
shell=True, stderr=subprocess.STDOUT)
subprocess.check_output(f'wg-quick save {config_name}', shell=True, stderr=subprocess.STDOUT)
change_ip = subprocess.run(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}",
shell=True, check=True, stderr=subprocess.STDOUT)
subprocess.run(f'wg-quick save {config_name}', shell=True, check=True, stderr=subprocess.STDOUT)
if change_ip.decode("UTF-8") != "":
return jsonify({"status": "failed", "msg": change_ip.decode("UTF-8")})
sql = "UPDATE " + config_name + " SET name = ?, private_key = ?, DNS = ?, endpoint_allowed_ip = ?, mtu = ?, keepalive = ?, preshared_key = ? WHERE id = ?"
@ -1091,6 +1277,14 @@ def save_peer_setting(config_name):
# Get peer settings
@app.route('/get_peer_data/<config_name>', methods=['POST'])
def get_peer_name(config_name):
"""Get peer settings.
:param config_name: Name of WG interface
:type config_name: str
:return: Return settings of peer
:rtype: TODO
"""
data = request.get_json()
peer_id = data['id']
result = g.cur.execute(
@ -1111,12 +1305,24 @@ def available_ips(config_name):
# Generate a private key
@app.route('/generate_peer', methods=['GET'])
def generate_peer():
"""Generate the private key for peer.
:return: Return dict with private, public and preshared keys
:rtype: TODO
"""
return jsonify(gen_private_key())
# Generate a public key from a private key
@app.route('/generate_public_key', methods=['POST'])
def generate_public_key():
"""Generate the public key.
:return: Return dict with public key or error message
:rtype: TODO
"""
data = request.get_json()
private_key = data['private_key']
return jsonify(gen_public_key(private_key))
@ -1125,6 +1331,14 @@ def generate_public_key():
# Check if both key match
@app.route('/check_key_match/<config_name>', methods=['POST'])
def check_key_match(config_name):
"""TODO
:param config_name: Name of WG interface
:type config_name: str
:return: Return dictionary with status
:rtype: TODO
"""
data = request.get_json()
private_key = data['private_key']
public_key = data['public_key']
@ -1261,6 +1475,14 @@ def download(config_name):
# Switch peer display mode
@app.route('/switch_display_mode/<mode>', methods=['GET'])
def switch_display_mode(mode):
"""Change display view style.
:param mode: Mode name
:type mode: str
:return: Return text with result
:rtype: str
"""
if mode in ['list', 'grid']:
config = get_dashboard_conf()
config.set("Peers", "peer_display_mode", mode)
@ -1278,6 +1500,12 @@ Dashboard Tools Related
# Get all IP for ping
@app.route('/get_ping_ip', methods=['POST'])
def get_ping_ip():
"""Get ips for network testing.
:return: TODO
:rtype: TODO
"""
config = request.form['config']
peers = g.cur.execute("SELECT id, name, allowed_ip, endpoint FROM " + config).fetchall()
html = ""
@ -1298,6 +1526,12 @@ def get_ping_ip():
# Ping IP
@app.route('/ping_ip', methods=['POST'])
def ping_ip():
"""Execute ping command.
:return: Return text with result
:rtype: str
"""
try:
result = ping('' + request.form['ip'] + '', count=int(request.form['count']), privileged=True, source=None)
returnjson = {
@ -1320,6 +1554,12 @@ def ping_ip():
# Traceroute IP
@app.route('/traceroute_ip', methods=['POST'])
def traceroute_ip():
"""Execute ping traceroute command.
:return: Return text with result
:rtype: str
"""
try:
result = traceroute('' + request.form['ip'] + '', first_hop=1, max_hops=30, count=1, fast=True)
returnjson = []
@ -1341,6 +1581,9 @@ Dashboard Initialization
def init_dashboard():
"""Create dashboard default configuration.
"""
# Set Default INI File
if not os.path.isfile(DASHBOARD_CONF):
open(DASHBOARD_CONF, "w+").close()
@ -1390,8 +1633,10 @@ def init_dashboard():
def check_update():
"""
Dashboard check update
"""Dashboard check update
:return: Retunt text with result
:rtype: str
"""
config = get_dashboard_conf()
data = urllib.request.urlopen("https://api.github.com/repos/donaldzou/WGDashboard/releases").read()
@ -1414,6 +1659,6 @@ if __name__ == "__main__":
configuration_settings = get_dashboard_conf()
app_ip = configuration_settings.get("Server", "app_ip")
app_port = int(configuration_settings.get("Server", "app_port"))
wg_conf_path = configuration_settings.get("Server", "wg_conf_path")
WG_CONF_PATH = configuration_settings.get("Server", "wg_conf_path")
configuration_settings.clear()
app.run(host=app_ip, debug=False, port=app_port)