diff --git a/fdroidserver/common.py b/fdroidserver/common.py index 8ed76fb7..f4a1fe77 100644 --- a/fdroidserver/common.py +++ b/fdroidserver/common.py @@ -125,6 +125,7 @@ default_config = { 'sdk_path': "$ANDROID_HOME", 'ndk_paths': {}, 'cachedir': str(Path.home() / '.cache/fdroidserver'), + 'cachedir_scanner': str(Path.home() / '.cache/fdroidserver/scanner'), 'java_paths': None, 'scan_binary': False, 'ant': "ant", diff --git a/fdroidserver/exception.py b/fdroidserver/exception.py index 097f4f68..a598b368 100644 --- a/fdroidserver/exception.py +++ b/fdroidserver/exception.py @@ -45,3 +45,10 @@ class BuildException(FDroidException): class VerificationException(FDroidException): pass + + +class ConfigurationException(FDroidException): + def __init__(self, value=None, detail=None): + super().__init__() + self.value = value + self.detail = detail diff --git a/fdroidserver/scanner.py b/fdroidserver/scanner.py index 710d0c23..6321af8b 100644 --- a/fdroidserver/scanner.py +++ b/fdroidserver/scanner.py @@ -23,19 +23,23 @@ import re import sys import traceback import zipfile +import yaml from argparse import ArgumentParser from collections import namedtuple from copy import deepcopy from tempfile import TemporaryDirectory +from pathlib import Path import logging import itertools +import urllib.request +from datetime import datetime, timedelta import requests from . import _ from . import common from . import metadata -from .exception import BuildException, VCSException +from .exception import BuildException, VCSException, ConfigurationException from . import scanner config = None @@ -47,17 +51,6 @@ json_per_build = deepcopy(DEFAULT_JSON_PER_BUILD) MAVEN_URL_REGEX = re.compile(r"""\smaven\s*(?:{.*?(?:setUrl|url)|\((?:url)?)\s*=?\s*(?:uri)?\(?\s*["']?([^\s"']+)["']?[^})]*[)}]""", re.DOTALL) -CODE_SIGNATURES = { - exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [ - r'com/google/firebase', - r'com/google/android/gms', - r'com/google/android/play/core', - r'com/google/tagmanager', - r'com/google/analytics', - r'com/android/billing', - ] -} - # Common known non-free blobs (always lower case): NON_FREE_GRADLE_LINES = { exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [ @@ -109,6 +102,9 @@ NON_FREE_GRADLE_LINES = { } +SCANNER_CACHE_VERSION = 1 + + def get_gradle_compile_commands(build): compileCommands = ['compile', 'provided', @@ -188,6 +184,145 @@ def _exodus_compile_signatures(signatures): return compiled_tracker_signature +def _datetime_now(): + """ + simple warpper for datetime.now to allow mocking it for testing + """ + return datetime.now().astimezone() + + +def _scanner_cachedir(): + """ + get `Path` to local cache dir + """ + if not common.config or "cachedir_scanner" not in common.config: + raise ConfigurationException("could not load 'cachedir_scanner' config") + cachedir = Path(config["cachedir_scanner"]) + cachedir.mkdir(exist_ok=True, parents=True) + return cachedir + + +class SignatureCacheMalformedException(Exception): + pass + + +class SignatureCacheOutdatedException(Exception): + pass + + +class SignatureDataController: + def __init__(self, name, filename): + self.name = name + self.filename = filename + self.cache_outdated_interval = timedelta(days=7) + self.data = {} + + def check_data_version(self): + if self.data.get("version") != SCANNER_CACHE_VERSION: + raise SignatureCacheMalformedException() + + def check_last_updated(self): + timestamp = self.data.get("timestamp") + if not timestamp: + raise SignatureCacheMalformedException() + try: + timestamp = datetime.fromisoformat(timestamp) + except ValueError as e: + raise SignatureCacheMalformedException() from e + except TypeError as e: + raise SignatureCacheMalformedException() from e + if (timestamp + self.cache_outdated_interval) < scanner._datetime_now(): + raise SignatureCacheOutdatedException() + + def load_from_defaults(self): + sig_file = Path(__file__).absolute().parent / 'scanner_signatures' / self.file_name + with open(sig_file) as f: + self.data = yaml.safe_load(f) + + def load_from_cache(self): + sig_file = scanner._scanner_cachedir() / self.filename + if not sig_file.exists(): + raise SignatureCacheMalformedException() + with open(sig_file) as f: + self.data = yaml.safe_load(f) + + def write_to_cache(self): + sig_file = scanner._scanner_cachedir() / self.filename + with open(sig_file, "w", encoding="utf-8") as f: + yaml.safe_dump(self.data, f) + logging.debug("write '{}' to cache".format(self.filename)) + + def verify_data(self): + valid_keys = ['timestamp', 'version', 'signatures'] + for k in [x for x in self.data.keys() if x not in valid_keys]: + del self.data[k] + + # def scan + + +class ExodusSignatureDataController(SignatureDataController): + def __init__(self): + super().__init__('Exodus signatures', 'exodus.yml') + + def fetch_signatures_from_web(): + pass + # TODO + # exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers" + # sigs = { + # "signatures": [], + # "timestamp": scanner._datetime_now().isoformat(), + # "version": SCANNER_CACHE_VERSION, + # } + + # with urllib.request.urlopen(exodus_url) as f: + # data = json.load(f) + # for tracker in data["trackers"].values(): + # sigs["signatures"].append({ + # "name": tracker["name"], + # "binary_signature": tracker["code_signature"], + # "network_signature": tracker["network_signature"], + # "types": ["tracker", "non-free"] # right now we assume all trackers in exodus are non-free + # }) + + +class ScannerSignatureDataController(SignatureDataController): + def __init__(self): + super().__init__('Scanner signatures', 'scanner.yml') + + def fetch_signatures_from_web(self): + url = "https://uniqx.gitlab.io/fdroid-scanner-signatures/sigs.json" + with urllib.request.urlopen(url) as f: + data = yaml.safe_load(f) + # TODO: validate parsed data + # TODO: error message 'please update fdroidserver/report' when fetching failed due to changes in the data strucutre + self.data = data + + +class SignatureTool(): + def __init__(self): + self.sdcs = [ScannerSignatureDataController()] + for sdc in self.sdcs: + sdc.fetch_signatures_from_web() + # TODO: use cache + # if not sdc.check_cache(): + # sdc.load_from_defaults() + self.compile_regexes() + + def compile_regexes(self): + self.regex = {'code_signatures': {}} + for sdc in self.sdcs: + for lname, ldef in sdc.data.get('signatures', []).items(): + self.regex['code_signatures'].update({(x, re.compile(x)) for x in ldef.get('code_signatures', [])}) + + def binary_signatures(self): + for sdc in self.sdcs: + for sig in sdc.binary_signatures(): + yield sig + + +SIGNATURE_TOOL = SignatureTool() + + # taken from exodus_core def load_exodus_trackers_signatures(): """ @@ -215,7 +350,7 @@ def scan_binary(apkfile, extract_signatures=None): result = get_embedded_classes(apkfile) problems = 0 for classname in result: - for suspect, regexp in CODE_SIGNATURES.items(): + for suspect, regexp in SIGNATURE_TOOL.regex['code_signatures'].items(): if regexp.match(classname): logging.debug("Found class '%s'" % classname) problems += 1 diff --git a/tests/scanner.TestCase b/tests/scanner.TestCase index 991cd9f0..01076a6e 100755 --- a/tests/scanner.TestCase +++ b/tests/scanner.TestCase @@ -4,6 +4,7 @@ import glob import inspect import logging import optparse +import io import os import re import shutil @@ -17,6 +18,7 @@ import zipfile import collections import pathlib from unittest import mock +from datetime import datetime, timedelta localmodule = os.path.realpath( os.path.join(os.path.dirname(inspect.getfile(inspect.currentframe())), '..') @@ -446,21 +448,19 @@ class Test_scan_binary(unittest.TestCase): fdroidserver.common.config = config fdroidserver.common.options = mock.Mock() + fdroidserver.scanner.SIGNATURE_TOOL = mock.Mock() + fdroidserver.scanner.SIGNATURE_TOOL.regex = {} + fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { + "java/lang/Object": re.compile(r'.*java/lang/Object', re.IGNORECASE | re.UNICODE) + } + def test_code_signature_match(self): apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk') - mock_code_signatures = { - "java/lang/Object": re.compile( - r'.*java/lang/Object', re.IGNORECASE | re.UNICODE - ) - } - with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures): - self.assertEqual( - 1, - fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format( - fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile - ), - ) + self.assertEqual( + 1, + fdroidserver.scanner.scan_binary(apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), + ) @unittest.skipIf( sys.version_info < (3, 9), @@ -470,44 +470,85 @@ class Test_scan_binary(unittest.TestCase): ) def test_bottom_level_embedded_apk_code_signature(self): apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') - mock_code_signatures = { + fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { "org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile( - r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', - re.IGNORECASE | re.UNICODE, + r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', re.IGNORECASE | re.UNICODE ) } - with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures): - self.assertEqual( - 1, - fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format( - fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile - ), - ) + + self.assertEqual( + 1, + fdroidserver.scanner.scan_binary(apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), + ) def test_top_level_signature_embedded_apk_present(self): apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') - mock_code_signatures = { + fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { "org/fdroid/ci/BuildConfig": re.compile( r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE ) } - with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures): - self.assertEqual( - 1, - fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format( - fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile - ), - ) - - def test_no_match(self): - apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk') - result = fdroidserver.scanner.scan_binary(apkfile) self.assertEqual( - 0, result, "Found false positives in binary '{}'".format(apkfile) + 1, + fdroidserver.scanner.scan_binary(apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), ) + # TODO: re-enable once allow-listing migrated to more complex regexes + # def test_no_match(self): + # apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk') + # result = fdroidserver.scanner.scan_binary(apkfile) + # self.assertEqual(0, result, "Found false positives in binary '{}'".format(apkfile)) + + +# class Test__fetch_exodus_signatures_to_cache(unittest.TestCase): +# def setUp(self): +# self.web_req_func = mock.Mock(return_value=io.StringIO(json.dumps({ +# "trackers": { +# "1": { +# "id": 1, +# "name": "Steyer Puch 1", +# "description": "blah blah blah", +# "creation_date": "1956-01-01", +# "code_signature": "com.puch.|com.steyer.", +# "network_signature": "pst\\.com", +# "website": "https://pst.com", +# "categories": ["tracker"], +# "documentation": [], +# }, +# "2": { +# "id": 2, +# "name": "Steyer Puch 2", +# "description": "blah blah blah", +# "creation_date": "1956-01-01", +# "code_signature": "com.puch.|com.steyer.", +# "network_signature": "pst\\.com", +# "website": "https://pst.com", +# "categories": ["tracker"], +# "documentation": [], +# } +# }, +# }))) +# self.open_func = mock.mock_open() +# self.cachedir_func = mock.Mock(return_value=pathlib.Path("mocked/path")) +# +# def test_ok(self): +# with mock.patch("urllib.request.urlopen", self.web_req_func), mock.patch( +# "builtins.open", self.open_func +# ) as outfilemock, mock.patch( +# "fdroidserver.scanner._scanner_cachedir", self.cachedir_func +# ), mock.patch("fdroidserver.scanner._datetime_now", unittest.mock.Mock(return_value=datetime(1999, 12, 31, 23, 59, 59))): +# fdroidserver.scanner.fetch_exodus_signatures_to_cache() +# +# self.cachedir_func.assert_called_once() +# self.web_req_func.assert_called_once_with("https://reports.exodus-privacy.eu.org/api/trackers") +# self.open_func.assert_called_once_with(pathlib.Path("mocked/path/exodus.json"), "w", encoding="utf-8") +# self.assertEqual( +# mock_open_to_str(self.open_func), +# """{"signatures": {"exodus-1": {"name": "Steyer Puch 1", "code_signature": "com.puch.|com.steyer.", "network_signature": "pst\\\\.com", "types": ["tracker", "non-free"]}, "exodus-2": {"name": "Steyer Puch 2", "code_signature": "com.puch.|com.steyer.", "network_signature": "pst\\\\.com", "types": ["tracker", "non-free"]}}, "timestamp": "1999-12-31T23:59:59"}""" +# ) + class Test__exodus_compile_signatures(unittest.TestCase): def setUp(self): @@ -581,6 +622,106 @@ class Test_load_exodus_trackers_signatures(unittest.TestCase): self.assertEqual(regex, "mocked return value") +class Test_SignatureDataController(unittest.TestCase): + # __init__ + def test_init(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + self.assertEqual(sdc.name, 'nnn') + self.assertEqual(sdc.filename, 'fff.yml') + self.assertEqual(sdc.cache_outdated_interval, timedelta(days=7)) + self.assertDictEqual(sdc.data, {}) + + # check_last_updated + def test_check_last_updated_ok(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data['timestamp'] = datetime.now().astimezone().isoformat() + sdc.check_last_updated() + + def test_check_last_updated_exception_cache_outdated(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data['timestamp'] = (datetime.now().astimezone() - timedelta(days=30)).isoformat() + with self.assertRaises(fdroidserver.scanner.SignatureCacheOutdatedException): + sdc.check_last_updated() + + def test_check_last_updated_exception_missing_timestamp_value(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): + sdc.check_last_updated() + + def test_check_last_updated_exception_not_string(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data['timestamp'] = 12345 + with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): + sdc.check_last_updated() + + def test_check_last_updated_exception_not_iso_formatted_string(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data['timestamp'] = '01/09/2002 10:11' + with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): + sdc.check_last_updated() + + # check_data_version + def test_check_data_version_ok(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data['version'] = fdroidserver.scanner.SCANNER_CACHE_VERSION + sdc.check_data_version() + + def test_check_data_version_exception(self): + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): + sdc.check_data_version() + + +class Test_ScannerSignatureDataController_fetch_signatures_from_web(unittest.TestCase): + def setUp(self): + self.uo_func = mock.Mock(return_value=io.StringIO(textwrap.dedent('''\ + version: 999 + timestamp: "1999-12-31T23:59:59.999999+00:00" + signatures: + - binary_signature: com/google/firebase + name: Google Firebase + types: + - tracker + - non-free + - gradle_signature: com/google/android/gms + name: Google Mobile Services + types: + - non-free + - network_signature: doubleclick\\.net + name: Another thing to test. + types: + - ads + '''))) + + def test_fetch_signatures_from_web(self): + sdc = fdroidserver.scanner.ScannerSignatureDataController() + with unittest.mock.patch('urllib.request.urlopen', self.uo_func): + sdc.fetch_signatures_from_web() + self.assertEqual(sdc.data.get('version'), 999) + self.assertEqual(sdc.data.get('timestamp'), "1999-12-31T23:59:59.999999+00:00") + self.assertListEqual( + sdc.data.get('signatures'), + [ + { + 'binary_signature': 'com/google/firebase', + 'name': 'Google Firebase', + 'types': ['tracker', 'non-free'], + }, + { + 'gradle_signature': 'com/google/android/gms', + 'name': 'Google Mobile Services', + 'types': ['non-free'], + }, + { + 'network_signature': 'doubleclick\\.net', + 'name': 'Another thing to test.', + 'types': ['ads'], + }, + ] + ) + self.assertEqual(len(sdc.data), 3) + + class Test_main(unittest.TestCase): def setUp(self): self.args = ["com.example.app", "local/additional.apk", "another.apk"] @@ -644,13 +785,13 @@ if __name__ == "__main__": (fdroidserver.common.options, args) = parser.parse_args(['--verbose']) newSuite = unittest.TestSuite() - newSuite.addTests( - [ - unittest.makeSuite(ScannerTest), - unittest.makeSuite(Test_scan_binary), - unittest.makeSuite(Test__exodus_compile_signatures), - unittest.makeSuite(Test_load_exodus_trackers_signatures), - unittest.makeSuite(Test_main), - ] - ) + newSuite.addTests([ + unittest.makeSuite(ScannerTest), + unittest.makeSuite(Test_scan_binary), + unittest.makeSuite(Test__exodus_compile_signatures), + unittest.makeSuite(Test_load_exodus_trackers_signatures), + unittest.makeSuite(Test_SignatureDataController), + unittest.makeSuite(Test_ScannerSignatureDataController_fetch_signatures_from_web), + unittest.makeSuite(Test_main), + ]) unittest.main(failfast=False) diff --git a/tests/testcommon.py b/tests/testcommon.py index 2557bd61..128ead00 100644 --- a/tests/testcommon.py +++ b/tests/testcommon.py @@ -36,8 +36,7 @@ class TmpCwd(): class TmpPyPath(): - """Context-manager for temporarily changing the current working - directory. + """Context-manager for temporarily adding a direcory to python path """ def __init__(self, additional_path): @@ -48,3 +47,9 @@ class TmpPyPath(): def __exit__(self, a, b, c): sys.path.remove(self.additional_path) + + +def mock_open_to_str(mock): + return "".join([ + x.args[0] for x in mock.mock_calls if str(x).startswith("call().write(") + ])