1
0
mirror of https://github.com/searxng/searxng.git synced 2024-11-04 20:30:11 +01:00
searxng/searx/utils.py

384 lines
11 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import os
import sys
2015-01-11 13:26:40 +01:00
import re
import json
2015-01-11 13:26:40 +01:00
from imp import load_source
from numbers import Number
from os.path import splitext, join
from io import open
2014-04-25 01:46:40 +02:00
from random import choice
from html.parser import HTMLParser
from urllib.parse import urljoin, urlparse, unquote
from lxml import html
from lxml.etree import XPath, _ElementStringResult, _ElementUnicodeResult
from babel.core import get_global
2014-04-25 01:46:40 +02:00
from searx import settings
2014-11-18 11:37:42 +01:00
from searx.version import VERSION_STRING
2016-09-06 16:43:48 +02:00
from searx.languages import language_codes
2015-01-11 13:26:40 +01:00
from searx import logger
2014-11-18 11:37:42 +01:00
2015-01-11 13:26:40 +01:00
logger = logger.getChild('utils')
2014-01-10 23:38:08 +01:00
blocked_tags = ('script',
'style')
ecma_unescape4_re = re.compile(r'%u([0-9a-fA-F]{4})', re.UNICODE)
ecma_unescape2_re = re.compile(r'%([0-9a-fA-F]{2})', re.UNICODE)
useragents = json.loads(open(os.path.dirname(os.path.realpath(__file__))
+ "/data/useragents.json", 'r', encoding='utf-8').read())
2014-01-12 20:13:14 +01:00
xpath_cache = dict()
lang_to_lc_cache = dict()
2014-10-17 12:34:51 +02:00
def searx_useragent():
return 'searx/{searx_version} {suffix}'.format(
searx_version=VERSION_STRING,
suffix=settings['outgoing'].get('useragent_suffix', ''))
2014-10-19 12:41:04 +02:00
def gen_useragent(os=None):
return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions'])))
class HTMLTextExtractorException(Exception):
pass
2013-11-08 23:44:26 +01:00
class HTMLTextExtractor(HTMLParser):
2013-11-08 23:44:26 +01:00
def __init__(self):
HTMLParser.__init__(self)
self.result = []
self.tags = []
def handle_starttag(self, tag, attrs):
self.tags.append(tag)
def handle_endtag(self, tag):
if not self.tags:
return
if tag != self.tags[-1]:
raise HTMLTextExtractorException()
self.tags.pop()
def is_valid_tag(self):
return not self.tags or self.tags[-1] not in blocked_tags
2013-11-08 23:44:26 +01:00
def handle_data(self, d):
if not self.is_valid_tag():
return
2013-11-08 23:44:26 +01:00
self.result.append(d)
def handle_charref(self, number):
if not self.is_valid_tag():
return
if number[0] in ('x', 'X'):
2014-01-20 02:31:20 +01:00
codepoint = int(number[1:], 16)
else:
codepoint = int(number)
self.result.append(chr(codepoint))
2013-11-08 23:44:26 +01:00
def handle_entityref(self, name):
if not self.is_valid_tag():
return
2014-10-19 12:41:04 +02:00
# codepoint = htmlentitydefs.name2codepoint[name]
# self.result.append(chr(codepoint))
2013-11-18 16:47:20 +01:00
self.result.append(name)
2013-11-08 23:44:26 +01:00
def get_text(self):
return ''.join(self.result).strip()
2013-11-08 23:44:26 +01:00
2013-11-08 23:44:26 +01:00
def html_to_text(html):
html = html.replace('\n', ' ')
html = ' '.join(html.split())
2013-11-08 23:44:26 +01:00
s = HTMLTextExtractor()
try:
s.feed(html)
except HTMLTextExtractorException:
logger.debug("HTMLTextExtractor: invalid HTML\n%s", html)
2013-11-08 23:44:26 +01:00
return s.get_text()
2013-11-15 18:55:18 +01:00
def extract_text(xpath_results):
'''
if xpath_results is list, extract the text from each result and concat the list
if xpath_results is a xml element, extract all the text node from it
( text_content() method from lxml )
if xpath_results is a string element, then it's already done
'''
if type(xpath_results) == list:
# it's list of result : concat everything using recursive call
result = ''
for e in xpath_results:
result = result + extract_text(e)
return result.strip()
elif type(xpath_results) in [_ElementStringResult, _ElementUnicodeResult]:
# it's a string
return ''.join(xpath_results)
else:
# it's a element
text = html.tostring(
xpath_results, encoding='unicode', method='text', with_tail=False
)
text = text.strip().replace('\n', ' ')
return ' '.join(text.split())
def extract_url(xpath_results, search_url):
if xpath_results == []:
raise Exception('Empty url resultset')
url = extract_text(xpath_results)
if url.startswith('//'):
# add http or https to this kind of url //example.com/
parsed_search_url = urlparse(search_url)
url = '{0}:{1}'.format(parsed_search_url.scheme or 'http', url)
elif url.startswith('/'):
# fix relative url to the search engine
url = urljoin(search_url, url)
# fix relative urls that fall through the crack
if '://' not in url:
url = urljoin(search_url, url)
# normalize url
url = normalize_url(url)
return url
def normalize_url(url):
parsed_url = urlparse(url)
# add a / at this end of the url if there is no path
if not parsed_url.netloc:
raise Exception('Cannot parse url')
if not parsed_url.path:
url += '/'
# FIXME : hack for yahoo
if parsed_url.hostname == 'search.yahoo.com'\
and parsed_url.path.startswith('/r'):
p = parsed_url.path
mark = p.find('/**')
if mark != -1:
return unquote(p[mark + 3:]).decode()
return url
def dict_subset(d, properties):
result = {}
for k in properties:
if k in d:
result[k] = d[k]
return result
2015-01-29 19:44:52 +01:00
# get element in list or default value
def list_get(a_list, index, default=None):
if len(a_list) > index:
return a_list[index]
else:
return default
def get_torrent_size(filesize, filesize_multiplier):
try:
filesize = float(filesize)
if filesize_multiplier == 'TB':
filesize = int(filesize * 1024 * 1024 * 1024 * 1024)
elif filesize_multiplier == 'GB':
filesize = int(filesize * 1024 * 1024 * 1024)
elif filesize_multiplier == 'MB':
filesize = int(filesize * 1024 * 1024)
elif filesize_multiplier == 'KB':
filesize = int(filesize * 1024)
2016-10-11 19:31:42 +02:00
elif filesize_multiplier == 'TiB':
filesize = int(filesize * 1000 * 1000 * 1000 * 1000)
elif filesize_multiplier == 'GiB':
filesize = int(filesize * 1000 * 1000 * 1000)
elif filesize_multiplier == 'MiB':
filesize = int(filesize * 1000 * 1000)
elif filesize_multiplier == 'KiB':
filesize = int(filesize * 1000)
except:
filesize = None
return filesize
2016-09-06 16:43:48 +02:00
2016-10-11 19:31:42 +02:00
def convert_str_to_int(number_str):
if number_str.isdigit():
return int(number_str)
else:
return 0
# convert a variable to integer or return 0 if it's not a number
def int_or_zero(num):
if isinstance(num, list):
if len(num) < 1:
return 0
num = num[0]
return convert_str_to_int(num)
2016-09-06 16:43:48 +02:00
def is_valid_lang(lang):
if isinstance(lang, bytes):
lang = lang.decode()
2016-09-06 16:43:48 +02:00
is_abbr = (len(lang) == 2)
lang = lang.lower()
2016-09-06 16:43:48 +02:00
if is_abbr:
for l in language_codes:
if l[0][:2] == lang:
return (True, l[0][:2], l[3].lower())
2016-09-06 16:43:48 +02:00
return False
else:
for l in language_codes:
if l[1].lower() == lang or l[3].lower() == lang:
return (True, l[0][:2], l[3].lower())
2016-09-06 16:43:48 +02:00
return False
def _get_lang_to_lc_dict(lang_list):
key = str(lang_list)
value = lang_to_lc_cache.get(key, None)
if value is None:
value = dict()
for lc in lang_list:
value.setdefault(lc.split('-')[0], lc)
lang_to_lc_cache[key] = value
return value
# auxiliary function to match lang_code in lang_list
def _match_language(lang_code, lang_list=[], custom_aliases={}):
# replace language code with a custom alias if necessary
if lang_code in custom_aliases:
lang_code = custom_aliases[lang_code]
if lang_code in lang_list:
return lang_code
# try to get the most likely country for this language
subtags = get_global('likely_subtags').get(lang_code)
if subtags:
subtag_parts = subtags.split('_')
new_code = subtag_parts[0] + '-' + subtag_parts[-1]
if new_code in custom_aliases:
new_code = custom_aliases[new_code]
if new_code in lang_list:
return new_code
# try to get the any supported country for this language
return _get_lang_to_lc_dict(lang_list).get(lang_code, None)
# get the language code from lang_list that best matches locale_code
def match_language(locale_code, lang_list=[], custom_aliases={}, fallback='en-US'):
# try to get language from given locale_code
language = _match_language(locale_code, lang_list, custom_aliases)
if language:
return language
locale_parts = locale_code.split('-')
lang_code = locale_parts[0]
# try to get language using an equivalent country code
if len(locale_parts) > 1:
country_alias = get_global('territory_aliases').get(locale_parts[-1])
if country_alias:
language = _match_language(lang_code + '-' + country_alias[0], lang_list, custom_aliases)
if language:
return language
# try to get language using an equivalent language code
alias = get_global('language_aliases').get(lang_code)
if alias:
language = _match_language(alias, lang_list, custom_aliases)
if language:
return language
if lang_code != locale_code:
# try to get language from given language without giving the country
language = _match_language(lang_code, lang_list, custom_aliases)
return language or fallback
def load_module(filename, module_dir):
modname = splitext(filename)[0]
if modname in sys.modules:
del sys.modules[modname]
filepath = join(module_dir, filename)
module = load_source(modname, filepath)
module.name = modname
return module
2017-07-20 15:44:02 +02:00
def to_string(obj):
if isinstance(obj, str):
return obj
if isinstance(obj, Number):
return str(obj)
if hasattr(obj, '__str__'):
return obj.__str__()
if hasattr(obj, '__repr__'):
return obj.__repr__()
def ecma_unescape(s):
"""
python implementation of the unescape javascript function
https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
"""
# s = unicode(s)
# "%u5409" becomes "吉"
s = ecma_unescape4_re.sub(lambda e: chr(int(e.group(1), 16)), s)
# "%20" becomes " ", "%F3" becomes "ó"
s = ecma_unescape2_re.sub(lambda e: chr(int(e.group(1), 16)), s)
return s
def get_engine_from_settings(name):
"""Return engine configuration from settings.yml of a given engine name"""
if 'engines' not in settings:
return {}
2019-09-30 14:27:13 +02:00
for engine in settings['engines']:
if 'name' not in engine:
continue
if name == engine['name']:
return engine
return {}
def get_xpath(xpath_str):
result = xpath_cache.get(xpath_str, None)
if result is None:
result = XPath(xpath_str)
xpath_cache[xpath_str] = result
return result
def eval_xpath(element, xpath_str):
xpath = get_xpath(xpath_str)
return xpath(element)