1
0
mirror of https://github.com/searxng/searxng.git synced 2024-11-05 04:40:11 +01:00

Merge pull request #2593 from dalf/update-autocomplete

Update autocomplete
This commit is contained in:
Alexandre Flament 2021-03-04 10:51:09 +01:00 committed by GitHub
commit aac37f288f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 497 additions and 222 deletions

View File

@ -32,7 +32,7 @@ def ask(query):
results = [] results = []
query_parts = list(filter(None, query.query.split())) query_parts = list(filter(None, query.query.split()))
if query_parts[0] not in answerers_by_keywords: if not query_parts or query_parts[0] not in answerers_by_keywords:
return results return results
for answerer in answerers_by_keywords[query_parts[0]]: for answerer in answerers_by_keywords[query_parts[0]]:

View File

@ -20,97 +20,20 @@ from lxml import etree
from json import loads from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
from requests import RequestException
from searx import settings from searx import settings
from searx.languages import language_codes
from searx.engines import (
categories, engines, engine_shortcuts
)
from searx.poolrequests import get as http_get from searx.poolrequests import get as http_get
from searx.exceptions import SearxEngineResponseException
def get(*args, **kwargs): def get(*args, **kwargs):
if 'timeout' not in kwargs: if 'timeout' not in kwargs:
kwargs['timeout'] = settings['outgoing']['request_timeout'] kwargs['timeout'] = settings['outgoing']['request_timeout']
kwargs['raise_for_httperror'] = True
return http_get(*args, **kwargs) return http_get(*args, **kwargs)
def searx_bang(full_query):
'''check if the searchQuery contain a bang, and create fitting autocompleter results'''
# check if there is a query which can be parsed
if len(full_query.getQuery()) == 0:
return []
results = []
# check if current query stats with !bang
first_char = full_query.getQuery()[0]
if first_char == '!' or first_char == '?':
if len(full_query.getQuery()) == 1:
# show some example queries
# TODO, check if engine is not avaliable
results.append(first_char + "images")
results.append(first_char + "wikipedia")
results.append(first_char + "osm")
else:
engine_query = full_query.getQuery()[1:]
# check if query starts with categorie name
for categorie in categories:
if categorie.startswith(engine_query):
results.append(first_char + '{categorie}'.format(categorie=categorie))
# check if query starts with engine name
for engine in engines:
if engine.startswith(engine_query.replace('_', ' ')):
results.append(first_char + '{engine}'.format(engine=engine.replace(' ', '_')))
# check if query starts with engine shortcut
for engine_shortcut in engine_shortcuts:
if engine_shortcut.startswith(engine_query):
results.append(first_char + '{engine_shortcut}'.format(engine_shortcut=engine_shortcut))
# check if current query stats with :bang
elif first_char == ':':
if len(full_query.getQuery()) == 1:
# show some example queries
results.append(":en")
results.append(":en_us")
results.append(":english")
results.append(":united_kingdom")
else:
engine_query = full_query.getQuery()[1:]
for lc in language_codes:
lang_id, lang_name, country, english_name = map(str.lower, lc)
# check if query starts with language-id
if lang_id.startswith(engine_query):
if len(engine_query) <= 2:
results.append(':{lang_id}'.format(lang_id=lang_id.split('-')[0]))
else:
results.append(':{lang_id}'.format(lang_id=lang_id))
# check if query starts with language name
if lang_name.startswith(engine_query) or english_name.startswith(engine_query):
results.append(':{lang_name}'.format(lang_name=lang_name))
# check if query starts with country
if country.startswith(engine_query.replace('_', ' ')):
results.append(':{country}'.format(country=country.replace(' ', '_')))
# remove duplicates
result_set = set(results)
# remove results which are already contained in the query
for query_part in full_query.query_parts:
if query_part in result_set:
result_set.remove(query_part)
# convert result_set back to list
return list(result_set)
def dbpedia(query, lang): def dbpedia(query, lang):
# dbpedia autocompleter, no HTTPS # dbpedia autocompleter, no HTTPS
autocomplete_url = 'https://lookup.dbpedia.org/api/search.asmx/KeywordSearch?' autocomplete_url = 'https://lookup.dbpedia.org/api/search.asmx/KeywordSearch?'
@ -204,3 +127,14 @@ backends = {'dbpedia': dbpedia,
'qwant': qwant, 'qwant': qwant,
'wikipedia': wikipedia 'wikipedia': wikipedia
} }
def search_autocomplete(backend_name, query, lang):
backend = backends.get(backend_name)
if backend is None:
return []
try:
return backend(query, lang)
except (RequestException, SearxEngineResponseException):
return []

View File

@ -1,85 +1,88 @@
#!/usr/bin/env python # SPDX-License-Identifier: AGPL-3.0-or-later
'''
searx is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
searx is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with searx. If not, see < http://www.gnu.org/licenses/ >.
(C) 2014 by Thomas Pointhuber, <thomas.pointhuber@gmx.at>
'''
from abc import abstractmethod, ABC
import re import re
from searx.languages import language_codes from searx.languages import language_codes
from searx.engines import categories, engines, engine_shortcuts from searx.engines import categories, engines, engine_shortcuts
from searx.external_bang import get_bang_definition_and_autocomplete
from searx.search import EngineRef from searx.search import EngineRef
from searx.webutils import VALID_LANGUAGE_CODE from searx.webutils import VALID_LANGUAGE_CODE
class RawTextQuery: class QueryPartParser(ABC):
"""parse raw text query (the value from the html input)"""
def __init__(self, query, disabled_engines): __slots__ = "raw_text_query", "enable_autocomplete"
assert isinstance(query, str)
self.query = query
self.disabled_engines = []
if disabled_engines: @staticmethod
self.disabled_engines = disabled_engines @abstractmethod
def check(raw_value):
"""Check if raw_value can be parsed"""
self.query_parts = [] def __init__(self, raw_text_query, enable_autocomplete):
self.user_query_parts = [] self.raw_text_query = raw_text_query
self.enginerefs = [] self.enable_autocomplete = enable_autocomplete
self.languages = []
self.timeout_limit = None
self.external_bang = None
self.specific = False
self._parse_query()
# parse query, if tags are set, which @abstractmethod
# change the search engine or search-language def __call__(self, raw_value):
def _parse_query(self): """Try to parse raw_value: set the self.raw_text_query properties
self.query_parts = []
# split query, including whitespaces return True if raw_value has been parsed
raw_query_parts = re.split(r'(\s+)', self.query)
for query_part in raw_query_parts: self.raw_text_query.autocomplete_list is also modified
searx_query_part = False if self.enable_autocomplete is True
"""
# part does only contain spaces, skip def _add_autocomplete(self, value):
if query_part.isspace()\ if value not in self.raw_text_query.autocomplete_list:
or query_part == '': self.raw_text_query.autocomplete_list.append(value)
continue
# this force the timeout
if query_part[0] == '<': class TimeoutParser(QueryPartParser):
try:
raw_timeout_limit = int(query_part[1:]) @staticmethod
def check(raw_value):
return raw_value[0] == '<'
def __call__(self, raw_value):
value = raw_value[1:]
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not value:
self._autocomplete()
return found
def _parse(self, value):
if not value.isdigit():
return False
raw_timeout_limit = int(value)
if raw_timeout_limit < 100: if raw_timeout_limit < 100:
# below 100, the unit is the second ( <3 = 3 seconds timeout ) # below 100, the unit is the second ( <3 = 3 seconds timeout )
self.timeout_limit = float(raw_timeout_limit) self.raw_text_query.timeout_limit = float(raw_timeout_limit)
else: else:
# 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout ) # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
self.timeout_limit = raw_timeout_limit / 1000.0 self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
searx_query_part = True return True
except ValueError:
# error not reported to the user
pass
# this force a language def _autocomplete(self):
if query_part[0] == ':' and len(query_part) > 1: for suggestion in ['<3', '<850']:
lang = query_part[1:].lower().replace('_', '-') self._add_autocomplete(suggestion)
class LanguageParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == ':'
def __call__(self, raw_value):
value = raw_value[1:].lower().replace('_', '-')
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not found:
self._autocomplete(value)
return found
def _parse(self, value):
found = False
# check if any language-code is equal with # check if any language-code is equal with
# declared language-codes # declared language-codes
for lc in language_codes: for lc in language_codes:
@ -87,76 +90,241 @@ class RawTextQuery:
# if correct language-code is found # if correct language-code is found
# set it as new search-language # set it as new search-language
if (lang == lang_id
or lang == lang_name if (value == lang_id
or lang == english_name or value == lang_name
or lang.replace('-', ' ') == country)\ or value == english_name
and lang not in self.languages: or value.replace('-', ' ') == country)\
searx_query_part = True and value not in self.raw_text_query.languages:
found = True
lang_parts = lang_id.split('-') lang_parts = lang_id.split('-')
if len(lang_parts) == 2: if len(lang_parts) == 2:
self.languages.append(lang_parts[0] + '-' + lang_parts[1].upper()) self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
else: else:
self.languages.append(lang_id) self.raw_text_query.languages.append(lang_id)
# to ensure best match (first match is not necessarily the best one) # to ensure best match (first match is not necessarily the best one)
if lang == lang_id: if value == lang_id:
break break
# user may set a valid, yet not selectable language # user may set a valid, yet not selectable language
if VALID_LANGUAGE_CODE.match(lang): if VALID_LANGUAGE_CODE.match(value):
lang_parts = lang.split('-') lang_parts = value.split('-')
if len(lang_parts) > 1: if len(lang_parts) > 1:
lang = lang_parts[0].lower() + '-' + lang_parts[1].upper() value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
if lang not in self.languages: if value not in self.raw_text_query.languages:
self.languages.append(lang) self.raw_text_query.languages.append(value)
searx_query_part = True found = True
# external bang return found
if query_part[0:2] == "!!":
self.external_bang = query_part[2:]
searx_query_part = True
continue
# this force a engine or category
if query_part[0] == '!' or query_part[0] == '?':
prefix = query_part[1:].replace('-', ' ').replace('_', ' ')
def _autocomplete(self, value):
if not value:
# show some example queries
for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
self.raw_text_query.autocomplete_list.append(lang)
return
for lc in language_codes:
lang_id, lang_name, country, english_name = map(str.lower, lc)
# check if query starts with language-id
if lang_id.startswith(value):
if len(value) <= 2:
self._add_autocomplete(':' + lang_id.split('-')[0])
else:
self._add_autocomplete(':' + lang_id)
# check if query starts with language name
if lang_name.startswith(value) or english_name.startswith(value):
self._add_autocomplete(':' + lang_name)
# check if query starts with country
# here "new_zealand" is "new-zealand" (see __call__)
if country.startswith(value.replace('-', ' ')):
self._add_autocomplete(':' + country.replace(' ', '_'))
class ExternalBangParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value.startswith('!!')
def __call__(self, raw_value):
value = raw_value[2:]
found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
if self.enable_autocomplete:
self._autocomplete(bang_ac_list)
return found
def _parse(self, value):
found = False
bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
if bang_definition is not None:
self.raw_text_query.external_bang = value
found = True
return found, bang_ac_list
def _autocomplete(self, bang_ac_list):
if not bang_ac_list:
bang_ac_list = ['g', 'ddg', 'bing']
for external_bang in bang_ac_list:
self._add_autocomplete('!!' + external_bang)
class BangParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == '!' or raw_value[0] == '?'
def __call__(self, raw_value):
value = raw_value[1:].replace('-', ' ').replace('_', ' ')
found = self._parse(value) if len(value) > 0 else False
if found and raw_value[0] == '!':
self.raw_text_query.specific = True
if self.enable_autocomplete:
self._autocomplete(raw_value[0], value)
return found
def _parse(self, value):
# check if prefix is equal with engine shortcut # check if prefix is equal with engine shortcut
if prefix in engine_shortcuts: if value in engine_shortcuts:
searx_query_part = True value = engine_shortcuts[value]
engine_name = engine_shortcuts[prefix]
if engine_name in engines:
self.enginerefs.append(EngineRef(engine_name, 'none'))
# check if prefix is equal with engine name # check if prefix is equal with engine name
elif prefix in engines: if value in engines:
searx_query_part = True self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
self.enginerefs.append(EngineRef(prefix, 'none')) return True
# check if prefix is equal with categorie name # check if prefix is equal with categorie name
elif prefix in categories: if value in categories:
# using all engines for that search, which # using all engines for that search, which
# are declared under that categorie name # are declared under that categorie name
searx_query_part = True self.raw_text_query.enginerefs.extend(EngineRef(engine.name, value)
self.enginerefs.extend(EngineRef(engine.name, prefix) for engine in categories[value]
for engine in categories[prefix] if (engine.name, value) not in self.raw_text_query.disabled_engines)
if (engine.name, prefix) not in self.disabled_engines) return True
if query_part[0] == '!': return False
self.specific = True
def _autocomplete(self, first_char, value):
if not value:
# show some example queries
for suggestion in ['images', 'wikipedia', 'osm']:
if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
self._add_autocomplete(first_char + suggestion)
return
# check if query starts with categorie name
for category in categories:
if category.startswith(value):
self._add_autocomplete(first_char + category)
# check if query starts with engine name
for engine in engines:
if engine.startswith(value):
self._add_autocomplete(first_char + engine.replace(' ', '_'))
# check if query starts with engine shortcut
for engine_shortcut in engine_shortcuts:
if engine_shortcut.startswith(value):
self._add_autocomplete(first_char + engine_shortcut)
class RawTextQuery:
"""parse raw text query (the value from the html input)"""
PARSER_CLASSES = [
TimeoutParser, # this force the timeout
LanguageParser, # this force a language
ExternalBangParser, # external bang (must be before BangParser)
BangParser # this force a engine or category
]
def __init__(self, query, disabled_engines):
assert isinstance(query, str)
# input parameters
self.query = query
self.disabled_engines = disabled_engines if disabled_engines else []
# parsed values
self.enginerefs = []
self.languages = []
self.timeout_limit = None
self.external_bang = None
self.specific = False
self.autocomplete_list = []
# internal properties
self.query_parts = [] # use self.getFullQuery()
self.user_query_parts = [] # use self.getQuery()
self.autocomplete_location = None
self._parse_query()
def _parse_query(self):
"""
parse self.query, if tags are set, which
change the search engine or search-language
"""
# split query, including whitespaces
raw_query_parts = re.split(r'(\s+)', self.query)
last_index_location = None
autocomplete_index = len(raw_query_parts) - 1
for i, query_part in enumerate(raw_query_parts):
# part does only contain spaces, skip
if query_part.isspace()\
or query_part == '':
continue
# parse special commands
special_part = False
for parser_class in RawTextQuery.PARSER_CLASSES:
if parser_class.check(query_part):
special_part = parser_class(self, i == autocomplete_index)(query_part)
break
# append query part to query_part list # append query part to query_part list
if searx_query_part: qlist = self.query_parts if special_part else self.user_query_parts
self.query_parts.append(query_part) qlist.append(query_part)
else: last_index_location = (qlist, len(qlist) - 1)
self.user_query_parts.append(query_part)
self.autocomplete_location = last_index_location
def get_autocomplete_full_query(self, text):
qlist, position = self.autocomplete_location
qlist[position] = text
return self.getFullQuery()
def changeQuery(self, query): def changeQuery(self, query):
self.user_query_parts = query.strip().split() self.user_query_parts = query.strip().split()
self.query = self.getFullQuery()
self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
self.autocomplete_list = []
return self return self
def getQuery(self): def getQuery(self):
return ' '.join(self.user_query_parts) return ' '.join(self.user_query_parts)
def getFullQuery(self): def getFullQuery(self):
# get full querry including whitespaces """
get full querry including whitespaces
"""
return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip() return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
def __str__(self):
return self.getFullQuery()
def __repr__(self):
return f"<{self.__class__.__name__} " \
+ f"query={self.query!r} " \
+ f"disabled_engines={self.disabled_engines!r}\n " \
+ f"languages={self.languages!r} " \
+ f"timeout_limit={self.timeout_limit!r} "\
+ f"external_bang={self.external_bang!r} " \
+ f"specific={self.specific!r} " \
+ f"enginerefs={self.enginerefs!r}\n " \
+ f"autocomplete_list={self.autocomplete_list!r}\n " \
+ f"query_parts={self.query_parts!r}\n " \
+ f"user_query_parts={self.user_query_parts!r} >"

View File

@ -74,12 +74,13 @@ from searx.languages import language_codes as languages
from searx.search import SearchWithPlugins, initialize as search_initialize from searx.search import SearchWithPlugins, initialize as search_initialize
from searx.search.checker import get_result as checker_get_result from searx.search.checker import get_result as checker_get_result
from searx.query import RawTextQuery from searx.query import RawTextQuery
from searx.autocomplete import searx_bang, backends as autocomplete_backends from searx.autocomplete import search_autocomplete, backends as autocomplete_backends
from searx.plugins import plugins from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers from searx.answerers import answerers
from searx.poolrequests import get_global_proxies from searx.poolrequests import get_global_proxies
from searx.answerers import ask
from searx.metrology.error_recorder import errors_per_engines from searx.metrology.error_recorder import errors_per_engines
# serve pages with HTTP/1.1 # serve pages with HTTP/1.1
@ -763,27 +764,18 @@ def about():
def autocompleter(): def autocompleter():
"""Return autocompleter results""" """Return autocompleter results"""
# run autocompleter
results = []
# set blocked engines # set blocked engines
disabled_engines = request.preferences.engines.get_disabled() disabled_engines = request.preferences.engines.get_disabled()
# parse query # parse query
raw_text_query = RawTextQuery(request.form.get('q', ''), disabled_engines) raw_text_query = RawTextQuery(request.form.get('q', ''), disabled_engines)
# check if search query is set
if not raw_text_query.getQuery():
return '', 400
# run autocompleter
completer = autocomplete_backends.get(request.preferences.get_value('autocomplete'))
# parse searx specific autocompleter results like !bang
raw_results = searx_bang(raw_text_query)
# normal autocompletion results only appear if no inner results returned # normal autocompletion results only appear if no inner results returned
# and there is a query part besides the engine and language bangs # and there is a query part
if len(raw_results) == 0 and completer and (len(raw_text_query.query_parts) > 1 or if len(raw_text_query.autocomplete_list) == 0 and len(raw_text_query.getQuery()) > 0:
(len(raw_text_query.languages) == 0 and
not raw_text_query.specific)):
# get language from cookie # get language from cookie
language = request.preferences.get_value('language') language = request.preferences.get_value('language')
if not language or language == 'all': if not language or language == 'all':
@ -791,15 +783,18 @@ def autocompleter():
else: else:
language = language.split('-')[0] language = language.split('-')[0]
# run autocompletion # run autocompletion
raw_results.extend(completer(raw_text_query.getQuery(), language)) raw_results = search_autocomplete(request.preferences.get_value('autocomplete'),
raw_text_query.getQuery(), language)
# parse results (write :language and !engine back to result string)
results = []
for result in raw_results: for result in raw_results:
raw_text_query.changeQuery(result) results.append(raw_text_query.changeQuery(result).getFullQuery())
# add parsed result if len(raw_text_query.autocomplete_list) > 0:
results.append(raw_text_query.getFullQuery()) for autocomplete_text in raw_text_query.autocomplete_list:
results.append(raw_text_query.get_autocomplete_full_query(autocomplete_text))
for answers in ask(raw_text_query):
for answer in answers:
results.append(str(answer['answer']))
# return autocompleter results # return autocompleter results
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': if request.headers.get('X-Requested-With') == 'XMLHttpRequest':

View File

@ -1,7 +1,20 @@
from searx.search import initialize
from searx.query import RawTextQuery from searx.query import RawTextQuery
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
TEST_ENGINES = [
{
'name': 'dummy engine',
'engine': 'dummy',
'categories': 'general',
'shortcut': 'du',
'timeout': 3.0,
'tokens': [],
},
]
class TestQuery(SearxTestCase): class TestQuery(SearxTestCase):
def test_simple_query(self): def test_simple_query(self):
@ -14,6 +27,37 @@ class TestQuery(SearxTestCase):
self.assertEqual(len(query.languages), 0) self.assertEqual(len(query.languages), 0)
self.assertFalse(query.specific) self.assertFalse(query.specific)
def test_multiple_spaces_query(self):
query_text = '\tthe query'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), 'the query')
self.assertEqual(len(query.query_parts), 0)
self.assertEqual(len(query.user_query_parts), 2)
self.assertEqual(len(query.languages), 0)
self.assertFalse(query.specific)
def test_str_method(self):
query_text = '<7 the query'
query = RawTextQuery(query_text, [])
self.assertEqual(str(query), '<7 the query')
def test_repr_method(self):
query_text = '<8 the query'
query = RawTextQuery(query_text, [])
r = repr(query)
self.assertTrue(r.startswith(f"<RawTextQuery query='{query_text}' "))
def test_change_query(self):
query_text = '<8 the query'
query = RawTextQuery(query_text, [])
another_query = query.changeQuery('another text')
self.assertEqual(query, another_query)
self.assertEqual(query.getFullQuery(), '<8 another text')
class TestLanguageParser(SearxTestCase):
def test_language_code(self): def test_language_code(self):
language = 'es-ES' language = 'es-ES'
query_text = 'the query' query_text = 'the query'
@ -68,6 +112,30 @@ class TestQuery(SearxTestCase):
self.assertEqual(len(query.languages), 0) self.assertEqual(len(query.languages), 0)
self.assertFalse(query.specific) self.assertFalse(query.specific)
def test_autocomplete_empty(self):
query_text = 'the query :'
query = RawTextQuery(query_text, [])
self.assertEqual(query.autocomplete_list, [":en", ":en_us", ":english", ":united_kingdom"])
def test_autocomplete(self):
query = RawTextQuery(':englis', [])
self.assertEqual(query.autocomplete_list, [":english"])
query = RawTextQuery(':deutschla', [])
self.assertEqual(query.autocomplete_list, [":deutschland"])
query = RawTextQuery(':new_zea', [])
self.assertEqual(query.autocomplete_list, [":new_zealand"])
query = RawTextQuery(':hu-H', [])
self.assertEqual(query.autocomplete_list, [":hu-hu"])
query = RawTextQuery(':v', [])
self.assertEqual(query.autocomplete_list, [":vi", ":tiếng việt"])
class TestTimeoutParser(SearxTestCase):
def test_timeout_below100(self): def test_timeout_below100(self):
query_text = '<3 the query' query_text = '<3 the query'
query = RawTextQuery(query_text, []) query = RawTextQuery(query_text, [])
@ -105,3 +173,113 @@ class TestQuery(SearxTestCase):
self.assertEqual(query.getQuery(), query_text) self.assertEqual(query.getQuery(), query_text)
self.assertEqual(query.timeout_limit, None) self.assertEqual(query.timeout_limit, None)
self.assertFalse(query.specific) self.assertFalse(query.specific)
def test_timeout_autocomplete(self):
# invalid number: it is not bang but it is part of the query
query_text = 'the query <'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), query_text)
self.assertEqual(len(query.query_parts), 0)
self.assertEqual(query.getQuery(), query_text)
self.assertEqual(query.timeout_limit, None)
self.assertFalse(query.specific)
self.assertEqual(query.autocomplete_list, ['<3', '<850'])
class TestExternalBangParser(SearxTestCase):
def test_external_bang(self):
query_text = '!!ddg the query'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), query_text)
self.assertEqual(len(query.query_parts), 1)
self.assertFalse(query.specific)
def test_external_bang_not_found(self):
query_text = '!!notfoundbang the query'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), query_text)
self.assertEqual(query.external_bang, None)
self.assertFalse(query.specific)
def test_external_bang_autocomplete(self):
query_text = 'the query !!dd'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), '!!dd the query')
self.assertEqual(len(query.query_parts), 1)
self.assertFalse(query.specific)
self.assertGreater(len(query.autocomplete_list), 0)
a = query.autocomplete_list[0]
self.assertEqual(query.get_autocomplete_full_query(a), a + ' the query')
def test_external_bang_autocomplete_empty(self):
query_text = 'the query !!'
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), 'the query !!')
self.assertEqual(len(query.query_parts), 0)
self.assertFalse(query.specific)
self.assertGreater(len(query.autocomplete_list), 2)
a = query.autocomplete_list[0]
self.assertEqual(query.get_autocomplete_full_query(a), 'the query ' + a)
class TestBang(SearxTestCase):
SPECIFIC_BANGS = ['!dummy_engine', '!du', '!general']
NOT_SPECIFIC_BANGS = ['?dummy_engine', '?du', '?general']
THE_QUERY = 'the query'
def test_bang(self):
initialize(TEST_ENGINES)
for bang in TestBang.SPECIFIC_BANGS + TestBang.NOT_SPECIFIC_BANGS:
with self.subTest(msg="Check bang", bang=bang):
query_text = TestBang.THE_QUERY + ' ' + bang
query = RawTextQuery(query_text, [])
self.assertEqual(query.getFullQuery(), bang + ' ' + TestBang.THE_QUERY)
self.assertEqual(query.query_parts, [bang])
self.assertEqual(query.user_query_parts, TestBang.THE_QUERY.split(' '))
def test_specific(self):
for bang in TestBang.SPECIFIC_BANGS:
with self.subTest(msg="Check bang is specific", bang=bang):
query_text = TestBang.THE_QUERY + ' ' + bang
query = RawTextQuery(query_text, [])
self.assertTrue(query.specific)
def test_not_specific(self):
for bang in TestBang.NOT_SPECIFIC_BANGS:
with self.subTest(msg="Check bang is not specific", bang=bang):
query_text = TestBang.THE_QUERY + ' ' + bang
query = RawTextQuery(query_text, [])
self.assertFalse(query.specific)
def test_bang_not_found(self):
initialize(TEST_ENGINES)
query = RawTextQuery('the query !bang_not_found', [])
self.assertEqual(query.getFullQuery(), 'the query !bang_not_found')
def test_bang_autocomplete(self):
initialize(TEST_ENGINES)
query = RawTextQuery('the query !dum', [])
self.assertEqual(query.autocomplete_list, ['!dummy_engine'])
query = RawTextQuery('!dum the query', [])
self.assertEqual(query.autocomplete_list, [])
self.assertEqual(query.getQuery(), '!dum the query')
def test_bang_autocomplete_empty(self):
initialize()
query = RawTextQuery('the query !', [])
self.assertEqual(query.autocomplete_list, ['!images', '!wikipedia', '!osm'])
query = RawTextQuery('the query ?', ['osm'])
self.assertEqual(query.autocomplete_list, ['?images', '?wikipedia'])