2021-03-28 21:31:14 +02:00
|
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
# lint: pylint
|
|
|
|
"""MySQL database (offline)
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# import error is ignored because the admin has to install mysql manually to use
|
|
|
|
# the engine
|
2022-07-28 13:02:56 +02:00
|
|
|
import mysql.connector # pyright: ignore # pylint: disable=import-error
|
2021-03-28 21:31:14 +02:00
|
|
|
|
|
|
|
engine_type = 'offline'
|
|
|
|
auth_plugin = 'caching_sha2_password'
|
|
|
|
host = "127.0.0.1"
|
2022-01-11 23:47:40 +01:00
|
|
|
port = 3306
|
2021-03-28 21:31:14 +02:00
|
|
|
database = ""
|
|
|
|
username = ""
|
|
|
|
password = ""
|
|
|
|
query_str = ""
|
|
|
|
limit = 10
|
|
|
|
paging = True
|
|
|
|
result_template = 'key-value.html'
|
|
|
|
_connection = None
|
|
|
|
|
2021-12-27 09:26:22 +01:00
|
|
|
|
2021-03-28 21:31:14 +02:00
|
|
|
def init(engine_settings):
|
|
|
|
global _connection # pylint: disable=global-statement
|
|
|
|
|
|
|
|
if 'query_str' not in engine_settings:
|
|
|
|
raise ValueError('query_str cannot be empty')
|
|
|
|
|
|
|
|
if not engine_settings['query_str'].lower().startswith('select '):
|
|
|
|
raise ValueError('only SELECT query is supported')
|
|
|
|
|
|
|
|
_connection = mysql.connector.connect(
|
2021-12-27 09:26:22 +01:00
|
|
|
database=database,
|
|
|
|
user=username,
|
|
|
|
password=password,
|
|
|
|
host=host,
|
2022-01-11 23:47:40 +01:00
|
|
|
port=port,
|
2021-03-28 21:31:14 +02:00
|
|
|
auth_plugin=auth_plugin,
|
|
|
|
)
|
|
|
|
|
2021-12-27 09:26:22 +01:00
|
|
|
|
2021-03-28 21:31:14 +02:00
|
|
|
def search(query, params):
|
|
|
|
query_params = {'query': query}
|
|
|
|
query_to_run = query_str + ' LIMIT {0} OFFSET {1}'.format(limit, (params['pageno'] - 1) * limit)
|
|
|
|
|
|
|
|
with _connection.cursor() as cur:
|
|
|
|
cur.execute(query_to_run, query_params)
|
|
|
|
|
|
|
|
return _fetch_results(cur)
|
|
|
|
|
2021-12-27 09:26:22 +01:00
|
|
|
|
2021-03-28 21:31:14 +02:00
|
|
|
def _fetch_results(cur):
|
|
|
|
results = []
|
|
|
|
for res in cur:
|
|
|
|
result = dict(zip(cur.column_names, map(str, res)))
|
|
|
|
result['template'] = result_template
|
|
|
|
results.append(result)
|
|
|
|
|
|
|
|
return results
|