From c1cfe978511c800cdbdbebee2306139524dc715b Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Sun, 23 Jul 2017 11:56:57 +0200 Subject: [PATCH] [enh] timeout and total HTTP time are managed by searx.poolrequests --- searx/poolrequests.py | 51 +++++++++++++++++++++++++++++-- searx/search.py | 70 +++++++++++++++++++------------------------ 2 files changed, 79 insertions(+), 42 deletions(-) diff --git a/searx/poolrequests.py b/searx/poolrequests.py index 628fd5dff..933e0ad93 100644 --- a/searx/poolrequests.py +++ b/searx/poolrequests.py @@ -1,8 +1,9 @@ import requests from itertools import cycle -from threading import RLock +from threading import RLock, local from searx import settings +from time import time class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): @@ -41,6 +42,7 @@ class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): block=self._pool_block, **self._conn_params) +threadLocal = local() connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor if settings['outgoing'].get('source_ips'): @@ -72,12 +74,57 @@ class SessionSinglePool(requests.Session): super(SessionSinglePool, self).close() +def set_timeout_for_thread(timeout, start_time=None): + threadLocal.timeout = timeout + threadLocal.start_time = start_time + + +def reset_time_for_thread(): + threadLocal.total_time = 0 + + +def get_time_for_thread(): + return threadLocal.total_time + + def request(method, url, **kwargs): - """same as requests/requests/api.py request(...) except it use SessionSinglePool and force proxies""" + """same as requests/requests/api.py request(...)""" + time_before_request = time() + + # session start session = SessionSinglePool() + + # proxies kwargs['proxies'] = settings['outgoing'].get('proxies') or None + + # timeout + if 'timeout' in kwargs: + timeout = kwargs['timeout'] + else: + timeout = getattr(threadLocal, 'timeout', None) + if timeout is not None: + kwargs['timeout'] = timeout + + # do request response = session.request(method=method, url=url, **kwargs) + + time_after_request = time() + + # is there a timeout for this engine ? + if timeout is not None: + timeout_overhead = 0.2 # seconds + # start_time = when the user request started + start_time = getattr(threadLocal, 'start_time', time_before_request) + search_duration = time_after_request - start_time + if search_duration > timeout + timeout_overhead: + raise requests.exceptions.Timeout(response=response) + + # session end session.close() + + # + threadLocal.total_time += time_after_request - time_before_request + return response diff --git a/searx/search.py b/searx/search.py index 22aea3661..71d20724e 100644 --- a/searx/search.py +++ b/searx/search.py @@ -47,16 +47,12 @@ logger = logger.getChild('search') number_of_searches = 0 -def send_http_request(engine, request_params, start_time, timeout_limit): - # for page_load_time stats - time_before_request = time() - +def send_http_request(engine, request_params): # create dictionary which contain all # informations about the request request_args = dict( headers=request_params['headers'], cookies=request_params['cookies'], - timeout=timeout_limit, verify=request_params['verify'] ) @@ -68,29 +64,10 @@ def send_http_request(engine, request_params, start_time, timeout_limit): request_args['data'] = request_params['data'] # send the request - response = req(request_params['url'], **request_args) - - # is there a timeout (no parsing in this case) - timeout_overhead = 0.2 # seconds - time_after_request = time() - search_duration = time_after_request - start_time - if search_duration > timeout_limit + timeout_overhead: - raise requests.exceptions.Timeout(response=response) - - with threading.RLock(): - # no error : reset the suspend variables - engine.continuous_errors = 0 - engine.suspend_end_time = 0 - # update stats with current page-load-time - # only the HTTP request - engine.stats['page_load_time'] += time_after_request - time_before_request - engine.stats['page_load_count'] += 1 - - # everything is ok : return the response - return response + return req(request_params['url'], **request_args) -def search_one_request(engine, query, request_params, start_time, timeout_limit): +def search_one_request(engine, query, request_params): # update request parameters dependent on # search-engine (contained in engines folder) engine.request(query, request_params) @@ -103,7 +80,7 @@ def search_one_request(engine, query, request_params, start_time, timeout_limit) return [] # send request - response = send_http_request(engine, request_params, start_time, timeout_limit) + response = send_http_request(engine, request_params) # parse the response response.search_params = request_params @@ -111,11 +88,20 @@ def search_one_request(engine, query, request_params, start_time, timeout_limit) def search_one_request_safe(engine_name, query, request_params, result_container, start_time, timeout_limit): + # set timeout for all HTTP requests + requests_lib.set_timeout_for_thread(timeout_limit, start_time=start_time) + # reset the HTTP total time + requests_lib.reset_time_for_thread() + + # engine = engines[engine_name] + # suppose everything will be alright + requests_exception = False + try: # send requests and parse the results - search_results = search_one_request(engine, query, request_params, start_time, timeout_limit) + search_results = search_one_request(engine, query, request_params) # add results result_container.extend(engine_name, search_results) @@ -124,14 +110,15 @@ def search_one_request_safe(engine_name, query, request_params, result_container with threading.RLock(): engine.stats['engine_time'] += time() - start_time engine.stats['engine_time_count'] += 1 - - return True + # update stats with the total HTTP time + engine.stats['page_load_time'] += requests_lib.get_time_for_thread() + engine.stats['page_load_count'] += 1 except Exception as e: - engine.stats['errors'] += 1 - search_duration = time() - start_time - requests_exception = False + + with threading.RLock(): + engine.stats['errors'] += 1 if (issubclass(e.__class__, requests.exceptions.Timeout)): result_container.add_unresponsive_engine((engine_name, gettext('timeout'))) @@ -152,14 +139,17 @@ def search_one_request_safe(engine_name, query, request_params, result_container # others errors logger.exception('engine {0} : exception : {1}'.format(engine_name, e)) - # update continuous_errors / suspend_end_time + # suspend or not the engine if there are HTTP errors + with threading.RLock(): if requests_exception: - with threading.RLock(): - engine.continuous_errors += 1 - engine.suspend_end_time = time() + min(60, engine.continuous_errors) - - # - return False + # update continuous_errors / suspend_end_time + engine.continuous_errors += 1 + engine.suspend_end_time = time() + min(60, engine.continuous_errors) + else: + # no HTTP error (perhaps an engine error) + # anyway, reset the suspend variables + engine.continuous_errors = 0 + engine.suspend_end_time = 0 def search_multiple_requests(requests, result_container, start_time, timeout_limit):