diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index daa95686..1ee61b26 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -249,6 +249,7 @@ black: fdroidserver/readmeta.py fdroidserver/signindex.py fdroidserver/tail.py + fdroidserver/verify.py setup.py tests/build.TestCase tests/deploy.TestCase @@ -263,6 +264,7 @@ black: tests/ndk-release-checksums.py tests/rewritemeta.TestCase tests/signindex.TestCase + tests/verify.TestCase fedora_latest: diff --git a/fdroidserver/verify.py b/fdroidserver/verify.py index f117feaa..e12f72ea 100644 --- a/fdroidserver/verify.py +++ b/fdroidserver/verify.py @@ -34,39 +34,6 @@ options = None config = None -class hashabledict(OrderedDict): - def __key(self): - return tuple((k, self[k]) for k in sorted(self)) - - def __hash__(self): - try: - return hash(self.__key()) - except TypeError as e: - print(self.__key()) - raise e - - def __eq__(self, other): - return self.__key() == other.__key() - - def __lt__(self, other): - return self.__key() < other.__key() - - def __qt__(self, other): - return self.__key() > other.__key() - - -class Decoder(json.JSONDecoder): - def __init__(self, **kwargs): - json.JSONDecoder.__init__(self, **kwargs) - self.parse_array = self.JSONArray - # Use the python implemenation of the scanner - self.scan_once = json.scanner.py_make_scanner(self) - - def JSONArray(self, s_and_end, scan_once, **kwargs): - values, end = json.decoder.JSONArray(s_and_end, scan_once, **kwargs) - return set(values), end - - def _add_diffoscope_info(d): """Add diffoscope setup metadata to provided dict under 'diffoscope' key. @@ -76,23 +43,25 @@ def _add_diffoscope_info(d): """ try: import diffoscope - d['diffoscope'] = hashabledict() + + d['diffoscope'] = dict() d['diffoscope']['VERSION'] = diffoscope.VERSION from diffoscope.comparators import ComparatorManager + ComparatorManager().reload() from diffoscope.tools import tool_check_installed, tool_required + external_tools = sorted(tool_required.all) external_tools = [ - tool - for tool in external_tools - if not tool_check_installed(tool) + tool for tool in external_tools if not tool_check_installed(tool) ] - d['diffoscope']['External-Tools-Required'] = tuple(external_tools) + d['diffoscope']['External-Tools-Required'] = external_tools from diffoscope.tools import OS_NAMES, get_current_os from diffoscope.external_tools import EXTERNAL_TOOLS + current_os = get_current_os() os_list = [current_os] if (current_os in OS_NAMES) else iter(OS_NAMES) for os_ in os_list: @@ -102,10 +71,12 @@ def _add_diffoscope_info(d): tools.add(EXTERNAL_TOOLS[x][os_]) except KeyError: pass - d['diffoscope']['Available-in-{}-packages'.format(OS_NAMES[os_])] = tuple(sorted(tools)) + tools = sorted(tools) + d['diffoscope']['Available-in-{}-packages'.format(OS_NAMES[os_])] = tools - from diffoscope.tools import python_module_missing - d['diffoscope']['Missing-Python-Modules'] = tuple(sorted(python_module_missing.modules)) + from diffoscope.tools import python_module_missing as pmm + + d['diffoscope']['Missing-Python-Modules'] = sorted(pmm.modules) except ImportError: pass @@ -118,6 +89,9 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result): ensure that there is only one report per file, even when run repeatedly. + The output is run through JSON to normalize things like tuples vs + lists. + """ jsonfile = unsigned_apk + '.json' if os.path.exists(jsonfile): @@ -125,22 +99,25 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result): data = json.load(fp, object_pairs_hook=OrderedDict) else: data = OrderedDict() - output = hashabledict() + output = dict() _add_diffoscope_info(output) output['url'] = url for key, filename in (('local', unsigned_apk), ('remote', remote_apk)): - d = hashabledict() + d = dict() output[key] = d d['file'] = filename d['sha256'] = common.sha256sum(filename) d['timestamp'] = os.stat(filename).st_ctime - d['packageName'], d['versionCode'], d['versionName'] = common.get_apk_id(filename) + d['packageName'], d['versionCode'], d['versionName'] = common.get_apk_id( + filename + ) if compare_result: output['verified'] = False output['result'] = compare_result else: output['verified'] = True - data[str(output['local']['timestamp'])] = output # str makes better dict keys than float + # str makes better dict keys than float + data[str(output['local']['timestamp'])] = output with open(jsonfile, 'w') as fp: json.dump(data, fp, sort_keys=True) @@ -148,14 +125,22 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result): jsonfile = 'unsigned/verified.json' if os.path.exists(jsonfile): with open(jsonfile) as fp: - data = json.load(fp, cls=Decoder, object_pairs_hook=hashabledict) + data = json.load(fp) else: data = OrderedDict() data['packages'] = OrderedDict() packageName = output['local']['packageName'] + if packageName not in data['packages']: - data['packages'][packageName] = set() - data['packages'][packageName].add(output) + data['packages'][packageName] = [] + found = False + output_dump = json.dumps(output, sort_keys=True) + for p in data['packages'][packageName]: + if output_dump == json.dumps(p, sort_keys=True): + found = True + break + if not found: + data['packages'][packageName].insert(0, json.loads(output_dump)) with open(jsonfile, 'w') as fp: json.dump(data, fp, cls=common.Encoder, sort_keys=True) @@ -165,13 +150,27 @@ def main(): global options, config # Parse command line... - parser = ArgumentParser(usage="%(prog)s [options] [APPID[:VERCODE] [APPID[:VERCODE] ...]]") + parser = ArgumentParser( + usage="%(prog)s [options] [APPID[:VERCODE] [APPID[:VERCODE] ...]]" + ) common.setup_global_opts(parser) - parser.add_argument("appid", nargs='*', help=_("application ID with optional versionCode in the form APPID[:VERCODE]")) - parser.add_argument("--reuse-remote-apk", action="store_true", default=False, - help=_("Verify against locally cached copy rather than redownloading.")) - parser.add_argument("--output-json", action="store_true", default=False, - help=_("Output JSON report to file named after APK.")) + parser.add_argument( + "appid", + nargs='*', + help=_("application ID with optional versionCode in the form APPID[:VERCODE]"), + ) + parser.add_argument( + "--reuse-remote-apk", + action="store_true", + default=False, + help=_("Verify against locally cached copy rather than redownloading."), + ) + parser.add_argument( + "--output-json", + action="store_true", + default=False, + help=_("Output JSON report to file named after APK."), + ) options = parser.parse_args() config = common.read_config(options) @@ -218,10 +217,15 @@ def main(): net.download_file(url, dldir=tmp_dir) except requests.exceptions.HTTPError: try: - net.download_file(url.replace('/repo', '/archive'), dldir=tmp_dir) + net.download_file( + url.replace('/repo', '/archive'), dldir=tmp_dir + ) except requests.exceptions.HTTPError as e: - raise FDroidException(_('Downloading {url} failed. {error}') - .format(url=url, error=e)) from e + raise FDroidException( + _('Downloading {url} failed. {error}').format( + url=url, error=e + ) + ) from e unsigned_apk = os.path.join(unsigned_dir, apkfilename) compare_result = common.verify_apks(remote_apk, unsigned_apk, tmp_dir) diff --git a/tests/verify.TestCase b/tests/verify.TestCase new file mode 100755 index 00000000..c62ea95d --- /dev/null +++ b/tests/verify.TestCase @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 + +import inspect +import json +import logging +import optparse +import os +import shutil +import sys +import tempfile +import unittest + +from pathlib import Path +from unittest.mock import patch + +localmodule = os.path.realpath( + os.path.join(os.path.dirname(inspect.getfile(inspect.currentframe())), '..') +) +print('localmodule: ' + localmodule) +if localmodule not in sys.path: + sys.path.insert(0, localmodule) + +from fdroidserver import common, verify + + +TEST_APP_ENTRY = { + "1539780240.3885746": { + "local": { + "file": "unsigned/com.politedroid_6.apk", + "packageName": "com.politedroid", + "sha256": "70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d", + "timestamp": 1234567.8900000, + "versionCode": "6", + "versionName": "1.5", + }, + "remote": { + "file": "tmp/com.politedroid_6.apk", + "packageName": "com.politedroid", + "sha256": "70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d", + "timestamp": 1234567.8900000, + "versionCode": "6", + "versionName": "1.5", + }, + "url": "https://f-droid.org/repo/com.politedroid_6.apk", + "verified": True, + } +} + + +class VerifyTest(unittest.TestCase): + + basedir = Path(__file__).resolve().parent + + def setUp(self): + logging.basicConfig(level=logging.DEBUG) + self.tempdir = tempfile.TemporaryDirectory() + os.chdir(self.tempdir.name) + self.repodir = Path('repo') + self.repodir.mkdir() + + def tearDown(self): + self.tempdir.cleanup() + + @patch('fdroidserver.common.sha256sum') + def test_write_json_report(self, sha256sum): + sha256sum.return_value = ( + '70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d' + ) + os.mkdir('tmp') + os.mkdir('unsigned') + verified_json = Path('unsigned/verified.json') + packageName = 'com.politedroid' + apk_name = packageName + '_6.apk' + remote_apk = 'tmp/' + apk_name + unsigned_apk = 'unsigned/' + apk_name + # TODO common.use apk_strip_v1_signatures() on unsigned_apk + shutil.copy(str(self.basedir / 'repo' / apk_name), remote_apk) + shutil.copy(str(self.basedir / 'repo' / apk_name), unsigned_apk) + url = TEST_APP_ENTRY['1539780240.3885746']['url'] + + self.assertFalse(verified_json.exists()) + verify.write_json_report(url, remote_apk, unsigned_apk, {}) + self.assertTrue(verified_json.exists()) + # smoke check status JSON + with verified_json.open() as fp: + firstpass = json.load(fp) + + verify.write_json_report(url, remote_apk, unsigned_apk, {}) + with verified_json.open() as fp: + secondpass = json.load(fp) + + self.assertEqual(firstpass, secondpass) + + +if __name__ == "__main__": + os.chdir(os.path.dirname(__file__)) + + parser = optparse.OptionParser() + parser.add_option( + "-v", + "--verbose", + action="store_true", + default=False, + help="Spew out even more information than normal", + ) + (common.options, args) = parser.parse_args(['--verbose']) + + newSuite = unittest.TestSuite() + newSuite.addTest(unittest.makeSuite(VerifyTest)) + unittest.main(failfast=False)