Merge branch 'unified-scanner-signatures' into 'master'

cached scanner signatures

Closes #1008

See merge request fdroid/fdroidserver!1198
This commit is contained in:
Hans-Christoph Steiner 2022-10-06 14:09:49 +00:00
commit 7accb96b9e
7 changed files with 833 additions and 259 deletions

View File

@ -987,7 +987,7 @@ def main():
if not options.appid and not options.all:
parser.error("option %s: If you really want to build all the apps, use --all" % "all")
config = common.read_config(options)
config = common.read_config(opts=options)
if config['build_server_always']:
options.server = True

View File

@ -76,7 +76,7 @@ from fdroidserver.exception import FDroidException, VCSException, NoSubmodulesEx
BuildException, VerificationException, MetaDataException
from .asynchronousfilereader import AsynchronousFileReader
from . import apksigcopier
from . import apksigcopier, common
# The path to this fdroidserver distribution
@ -130,7 +130,6 @@ default_config = {
'ant': "ant",
'mvn3': "mvn",
'gradle': os.path.join(FDROID_PATH, 'gradlew-fdroid'),
'gradle_version_dir': str(Path.home() / '.cache/fdroidserver/gradle'),
'sync_from_local_copy_dir': False,
'allow_disabled_algorithms': False,
'per_app_repos': False,
@ -319,6 +318,31 @@ def fill_config_defaults(thisconfig):
ndk_paths[ndkdict['release']] = ndk_paths.pop(k)
break
if 'cachedir_scanner' not in thisconfig:
thisconfig['cachedir_scanner'] = str(Path(thisconfig['cachedir']) / 'scanner')
if 'gradle_version_dir' not in thisconfig:
thisconfig['gradle_version_dir'] = str(Path(thisconfig['cachedir']) / 'gradle')
def get_config(opts=None):
"""Get config instace. This function takes care of initializing config data before returning it."""
global config, options
if config is not None:
return config
config = {}
common.fill_config_defaults(config)
common.read_config(opts=opts)
# make sure these values are available in common.py even if they didn't
# declare global in a scope
common.config = config
if opts is not None:
common.options = opts
return config
def regsub_file(pattern, repl, path):
with open(path, 'rb') as f:

View File

@ -45,3 +45,10 @@ class BuildException(FDroidException):
class VerificationException(FDroidException):
pass
class ConfigurationException(FDroidException):
def __init__(self, value=None, detail=None):
super().__init__()
self.value = value
self.detail = detail

View File

@ -16,29 +16,28 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import imghdr
import json
import os
import re
import sys
import traceback
import json
import imghdr
import logging
import zipfile
import itertools
import traceback
import urllib.request
from argparse import ArgumentParser
from collections import namedtuple
from copy import deepcopy
from tempfile import TemporaryDirectory
import logging
import itertools
import requests
from pathlib import Path
from datetime import datetime, timedelta
from . import _
from . import common
from . import metadata
from .exception import BuildException, VCSException
from .exception import BuildException, VCSException, ConfigurationException
from . import scanner
config = None
options = None
DEFAULT_JSON_PER_BUILD = {'errors': [], 'warnings': [], 'infos': []} # type: ignore
@ -47,66 +46,8 @@ json_per_build = deepcopy(DEFAULT_JSON_PER_BUILD)
MAVEN_URL_REGEX = re.compile(r"""\smaven\s*(?:{.*?(?:setUrl|url)|\((?:url)?)\s*=?\s*(?:uri)?\(?\s*["']?([^\s"']+)["']?[^})]*[)}]""",
re.DOTALL)
CODE_SIGNATURES = {
exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [
r'com/google/firebase',
r'com/google/android/gms',
r'com/google/android/play/core',
r'com/google/tagmanager',
r'com/google/analytics',
r'com/android/billing',
]
}
# Common known non-free blobs (always lower case):
NON_FREE_GRADLE_LINES = {
exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [
r'flurryagent',
r'paypal.*mpl',
r'admob.*sdk.*android',
r'google.*ad.*view',
r'google.*admob',
r'google.*play.*services',
r'com.google.android.play:core.*',
r'com.google.android.play:app-update',
r'com.google.android.libraries.places:places',
r'com.google.mlkit',
r'com.android.billingclient',
r'androidx.work:work-gcm',
r'crittercism',
r'heyzap',
r'jpct.*ae',
r'youtube.*android.*player.*api',
r'bugsense',
r'crashlytics',
r'ouya.*sdk',
r'libspen23',
r'firebase',
r'''["']com.facebook.android['":]''',
r'cloudrail',
r'com.tencent.bugly',
r'appcenter-push',
r'com.github.junrar:junrar',
r'androidx.navigation:navigation-dynamic-features',
r'xyz.belvi.mobilevision:barcodescanner',
r'org.jetbrains.kotlinx:kotlinx-coroutines-play-services',
r'me.pushy:sdk',
r'io.github.sinaweibosdk',
r'com.umeng.umsdk',
r'com.google.android.exoplayer:extension-cast',
r'io.objectbox:objectbox-gradle-plugin',
r'com.evernote:android-job',
r'com.yayandroid:LocationManager',
r'com.onesignal:OneSignal',
r'com.cloudinary:cloudinary-android',
r'com.google.android.exoplayer:extension-cronet',
r'com.anjlab.android.iab.v3:library',
r'com.github.penn5:donations',
r'com.mapbox',
r'com.yandex.android',
r'com.hypertrack',
]
}
SCANNER_CACHE_VERSION = 1
def get_gradle_compile_commands(build):
@ -171,78 +112,272 @@ def get_embedded_classes(apkfile, depth=0):
return classes
# taken from exodus_core
def _exodus_compile_signatures(signatures):
"""
Compiles the regex associated to each signature, in order to speed up the trackers detection.
:return: A compiled list of signatures.
"""
compiled_tracker_signature = []
try:
compiled_tracker_signature = [
re.compile(track.code_signature) for track in signatures
]
except TypeError:
print("signatures is not iterable")
return compiled_tracker_signature
def _datetime_now():
"""Get datetime.now(), using this funciton allows mocking it for testing."""
return datetime.utcnow()
# taken from exodus_core
def load_exodus_trackers_signatures():
"""
Load trackers signatures from the official Exodus database.
def _scanner_cachedir():
"""Get `Path` to fdroidserver cache dir."""
cfg = common.get_config()
if not cfg:
raise ConfigurationException('config not initialized')
if "cachedir_scanner" not in cfg:
raise ConfigurationException("could not load 'cachedir_scanner' from config")
cachedir = Path(cfg["cachedir_scanner"])
cachedir.mkdir(exist_ok=True, parents=True)
return cachedir
:return: a dictionary containing signatures.
"""
signatures = []
exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers"
r = requests.get(exodus_url, timeout=300)
data = r.json()
for e in data['trackers']:
signatures.append(
namedtuple('tracker', data['trackers'][e].keys())(
*data['trackers'][e].values()
)
class SignatureDataMalformedException(Exception):
pass
class SignatureDataOutdatedException(Exception):
pass
class SignatureDataCacheMissException(Exception):
pass
class SignatureDataVersionMismatchException(Exception):
pass
class SignatureDataController:
def __init__(self, name, filename, url):
self.name = name
self.filename = filename
self.url = url
# by default we assume cache is valid indefinitely
self.cache_duration = timedelta(days=999999)
self.data = {}
def check_data_version(self):
if self.data.get("version") != SCANNER_CACHE_VERSION:
raise SignatureDataVersionMismatchException()
def check_last_updated(self):
"""
Check if the last_updated value is ok and raise an exception if expired or inaccessible.
:raises SignatureDataMalformedException: when timestamp value is
inaccessible or not parse-able
:raises SignatureDataOutdatedException: when timestamp is older then
`self.cache_duration`
"""
last_updated = self.data.get("last_updated", None)
if last_updated:
try:
last_updated = datetime.fromtimestamp(last_updated)
except ValueError as e:
raise SignatureDataMalformedException() from e
except TypeError as e:
raise SignatureDataMalformedException() from e
delta = (last_updated + self.cache_duration) - scanner._datetime_now()
if delta > timedelta(seconds=0):
logging.debug(_('next {name} cache update due in {time}').format(
name=self.filename, time=delta
))
else:
raise SignatureDataOutdatedException()
def fetch(self):
try:
self.fetch_signatures_from_web()
self.write_to_cache()
except Exception as e:
raise Exception(_("downloading scanner signatures from '{}' failed").format(self.url)) from e
def load(self):
try:
try:
self.load_from_cache()
self.verify_data()
self.check_last_updated()
except SignatureDataCacheMissException:
self.load_from_defaults()
except SignatureDataOutdatedException:
self.fetch_signatures_from_web()
self.write_to_cache()
except (SignatureDataMalformedException, SignatureDataVersionMismatchException) as e:
logging.critical(_("scanner cache is malformed! You can clear it with: '{clear}'").format(
clear='rm -r {}'.format(common.get_config()['cachedir_scanner'])
))
raise e
def load_from_defaults(self):
sig_file = (Path(__file__).parent / 'data' / 'scanner' / self.filename).resolve()
with open(sig_file) as f:
self.set_data(json.load(f))
def load_from_cache(self):
sig_file = scanner._scanner_cachedir() / self.filename
if not sig_file.exists():
raise SignatureDataCacheMissException()
with open(sig_file) as f:
self.set_data(json.load(f))
def write_to_cache(self):
sig_file = scanner._scanner_cachedir() / self.filename
with open(sig_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
logging.debug("write '{}' to cache".format(self.filename))
def verify_data(self):
"""
Clean and validate `self.data`.
Right now this function does just a basic key sanitation.
"""
self.check_data_version()
valid_keys = ['timestamp', 'last_updated', 'version', 'signatures', 'cache_duration']
for k in list(self.data.keys()):
if k not in valid_keys:
del self.data[k]
def set_data(self, new_data):
self.data = new_data
if 'cache_duration' in new_data:
self.cache_duration = timedelta(seconds=new_data['cache_duration'])
def fetch_signatures_from_web(self):
if not self.url.startswith("https://"):
raise Exception(_("can't open non-https url: '{};".format(self.url)))
logging.debug(_("downloading '{}'").format(self.url))
with urllib.request.urlopen(self.url) as f: # nosec B310 scheme filtered above
self.set_data(json.load(f))
self.data['last_updated'] = scanner._datetime_now().timestamp()
class ExodusSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Exodus signatures', 'exodus.yml', 'https://reports.exodus-privacy.eu.org/api/trackers')
self.cache_duration = timedelta(days=1) # refresh exodus cache after one day
def fetch_signatures_from_web(self):
logging.debug(_("downloading '{}'").format(self.url))
data = {
"signatures": {},
"timestamp": scanner._datetime_now().timestamp(),
"last_updated": scanner._datetime_now().timestamp(),
"version": SCANNER_CACHE_VERSION,
}
if not self.url.startswith("https://"):
raise Exception(_("can't open non-https url: '{};".format(self.url)))
with urllib.request.urlopen(self.url) as f: # nosec B310 scheme filtered above
d = json.load(f)
for tracker in d["trackers"].values():
if tracker.get('code_signature'):
data["signatures"][tracker["name"]] = {
"name": tracker["name"],
"warn_code_signatures": [tracker["code_signature"]],
# exodus also provides network signatures, unused atm.
# "network_signatures": [tracker["network_signature"]],
"AntiFeatures": ["Tracking"],
"license": "NonFree" # We assume all trackers in exodus
# are non-free, although free
# trackers like piwik, acra,
# etc. might be listed by exodus
# too.
}
self.set_data(data)
class SUSSDataController(SignatureDataController):
def __init__(self):
super().__init__(
'SUSS',
'suss.json',
'https://fdroid.gitlab.io/fdroid-suss/suss.json'
)
logging.debug('{} trackers signatures loaded'.format(len(signatures)))
return signatures, scanner._exodus_compile_signatures(signatures)
def load_from_defaults(self):
self.set_data(json.loads(SUSS_DEFAULT))
def scan_binary(apkfile, extract_signatures=None):
class ScannerTool():
def __init__(self):
self.sdcs = [
SUSSDataController(),
]
# we could add support for loading additional signature source
# definitions from config.yml here
self.load()
self.compile_regexes()
def load(self):
for sdc in self.sdcs:
sdc.load()
def compile_regexes(self):
self.regexs = {
'err_code_signatures': {},
'err_gradle_signatures': {},
'warn_code_signatures': {},
'warn_gradle_signatures': {},
}
for sdc in self.sdcs:
for signame, sigdef in sdc.data.get('signatures', {}).items():
for sig in sigdef.get('code_signatures', []):
self.regexs['err_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('gradle_signatures', []):
self.regexs['err_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('warn_code_signatures', []):
self.regexs['warn_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('warn_gradle_signatures', []):
self.regexs['warn_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
def refresh(self):
for sdc in self.sdcs:
sdc.fetch_signatures_from_web()
sdc.write_to_cache()
def add(self, new_controller: SignatureDataController):
self.sdcs.append(new_controller)
self.compile_regexes()
# TODO: change this from singleton instance to dependency injection
# use `_get_tool()` instead of accessing this directly
_SCANNER_TOOL = None
def _get_tool():
"""
Lazy loading function for getting a ScannerTool instance.
ScannerTool initialization need to access `common.config` values. Those are only available after initialization through `common.read_config()`. So this factory assumes config was called at an erlier point in time.
"""
if not scanner._SCANNER_TOOL:
scanner._SCANNER_TOOL = ScannerTool()
return scanner._SCANNER_TOOL
def scan_binary(apkfile):
"""Scan output of dexdump for known non-free classes."""
logging.info(_('Scanning APK with dexdump for known non-free classes.'))
result = get_embedded_classes(apkfile)
problems = 0
problems, warnings = 0, 0
for classname in result:
for suspect, regexp in CODE_SIGNATURES.items():
for suspect, regexp in _get_tool().regexs['warn_code_signatures'].items():
if regexp.match(classname):
logging.debug("Found class '%s'" % classname)
logging.debug("Warning: found class '%s'" % classname)
warnings += 1
for suspect, regexp in _get_tool().regexs['err_code_signatures'].items():
if regexp.match(classname):
logging.debug("Problem: found class '%s'" % classname)
problems += 1
if extract_signatures:
def _detect_tracker(sig, tracker, class_list):
for clazz in class_list:
if sig.search(clazz):
logging.debug("Found tracker, class {} matching {}".format(clazz, tracker.code_signature))
return tracker
return None
results = []
args = [(extract_signatures[1][index], tracker, result)
for (index, tracker) in enumerate(extract_signatures[0]) if
len(tracker.code_signature) > 3]
for res in itertools.starmap(_detect_tracker, args):
if res:
results.append(res)
trackers = [t for t in results if t is not None]
problems += len(trackers)
if warnings:
logging.warning(_("Found {count} warnings in {filename}").format(count=warnings, filename=apkfile))
if problems:
logging.critical("Found problems in %s" % apkfile)
logging.critical(_("Found {count} problems in {filename}").format(count=problems, filename=apkfile))
return problems
@ -255,20 +390,9 @@ def scan_source(build_dir, build=metadata.Build()):
"""
count = 0
allowlisted = [
'firebase-jobdispatcher', # https://github.com/firebase/firebase-jobdispatcher-android/blob/master/LICENSE
'com.firebaseui', # https://github.com/firebase/FirebaseUI-Android/blob/master/LICENSE
'geofire-android', # https://github.com/firebase/geofire-java/blob/master/LICENSE
'com.yandex.android:authsdk', # https://github.com/yandexmobile/yandex-login-sdk-android/blob/master/LICENSE.txt
'com.hypertrack:hyperlog', # https://github.com/hypertrack/hyperlog-android#license
]
def is_allowlisted(s):
return any(al in s for al in allowlisted)
def suspects_found(s):
for n, r in NON_FREE_GRADLE_LINES.items():
if r.match(s) and not is_allowlisted(s):
for n, r in _get_tool().regexs['err_gradle_signatures'].items():
if r.match(s):
yield n
allowed_repos = [re.compile(r'^https://' + re.escape(repo) + r'/*') for repo in [
@ -540,7 +664,7 @@ def scan_source(build_dir, build=metadata.Build()):
def main():
global config, options, json_per_build
global options, json_per_build
# Parse command line...
parser = ArgumentParser(
@ -557,6 +681,8 @@ def main():
help=_("Force scan of disabled apps and builds."))
parser.add_argument("--json", action="store_true", default=False,
help=_("Output JSON to stdout."))
parser.add_argument("--refresh", "-r", action="store_true", default=False,
help=_("fetch the latest version of signatures from the web"))
metadata.add_metadata_arguments(parser)
options = parser.parse_args()
metadata.warnings_action = options.W
@ -568,21 +694,28 @@ def main():
else:
logging.getLogger().setLevel(logging.ERROR)
config = common.read_config(options)
# initialize/load configuration values
common.get_config(opts=options)
if options.refresh:
scanner._get_tool().refresh()
if options.exodus:
c = ExodusSignatureDataController()
if options.refresh:
c.fetch_signatures_from_web()
else:
c.fetch()
scanner._get_tool().add(c)
probcount = 0
exodus = []
if options.exodus:
exodus = load_exodus_trackers_signatures()
appids = []
for apk in options.appid:
if os.path.isfile(apk):
count = scanner.scan_binary(apk, exodus)
count = scanner.scan_binary(apk)
if count > 0:
logging.warning(
_('Scanner found {count} problems in {apk}:').format(
_('Scanner found {count} problems in {apk}').format(
count=count, apk=apk
)
)
@ -683,3 +816,299 @@ def main():
if __name__ == "__main__":
main()
SUSS_DEFAULT = '''{
"cache_duration": 86400,
"signatures": {
"admob": {
"gradle_signatures": [
"admob.*sdk.*android"
],
"license": "NonFree"
},
"androidx": {
"gradle_signatures": [
"androidx.navigation:navigation-dynamic-features",
"androidx.work:work-gcm"
],
"license": "NonFree"
},
"appcenter-push": {
"gradle_signatures": [
"appcenter-push"
],
"license": "NonFree"
},
"bugsense": {
"gradle_signatures": [
"bugsense"
],
"license": "NonFree"
},
"cloudrail": {
"gradle_signatures": [
"cloudrail"
],
"license": "NonFree"
},
"com.android.billing": {
"code_signatures": [
"com/android/billing"
],
"license": "NonFree"
},
"com.android.billingclient": {
"gradle_signatures": [
"com.android.billingclient"
],
"license": "NonFree"
},
"com.anjlab.android.iab.v3": {
"gradle_signatures": [
"com.anjlab.android.iab.v3:library"
],
"license": "NonFree"
},
"com.cloudinary": {
"gradle_signatures": [
"com.cloudinary:cloudinary-android"
],
"license": "NonFree"
},
"com.evernote": {
"gradle_signatures": [
"com.evernote:android-job"
],
"license": "NonFree"
},
"com.facebook": {
"gradle_signatures": [
"[\\"']com.facebook.android['\\":]"
],
"license": "NonFree"
},
"com.github.junrar": {
"gradle_signatures": [
"com.github.junrar:junrar"
],
"license": "NonFree"
},
"com.github.penn5": {
"gradle_signatures": [
"com.github.penn5:donations"
],
"license": "NonFree"
},
"com.google.analytics": {
"code_signatures": [
"com/google/analytics"
],
"license": "NonFree"
},
"com.google.android.exoplayer": {
"gradle_signatures": [
"com.google.android.exoplayer:extension-cast",
"com.google.android.exoplayer:extension-cronet"
],
"license": "NonFree"
},
"com.google.android.gms": {
"code_signatures": [
"com/google/android/gms"
],
"license": "NonFree"
},
"com.google.android.libraries.places": {
"gradle_signatures": [
"com.google.android.libraries.places:places"
],
"license": "NonFree"
},
"com.google.android.play": {
"gradle_signatures": [
"com.google.android.play:app-update",
"com.google.android.play:core.*"
],
"license": "NonFree"
},
"com.google.android.play.core": {
"code_signatures": [
"com/google/android/play/core"
],
"license": "NonFree"
},
"com.google.firebase": {
"code_signatures": [
"com/google/firebase"
],
"license": "NonFree"
},
"com.google.mlkit": {
"gradle_signatures": [
"com.google.mlkit"
],
"license": "NonFree"
},
"com.google.tagmanager": {
"code_signatures": [
"com/google/tagmanager"
],
"license": "NonFree"
},
"com.hypertrack": {
"gradle_signatures": [
"com\\\\.hypertrack(?!:hyperlog)"
],
"license": "NonFree"
},
"com.mapbox": {
"MaintainerNotes": "com.mapbox.mapboxsdk:mapbox-sdk-services seems to be fully under this license:\\nhttps://github.com/mapbox/mapbox-java/blob/main/LICENSE\\n",
"gradle_signatures": [
"com\\\\.mapbox(?!\\\\.mapboxsdk:mapbox-sdk-services)"
],
"license": "NonFree"
},
"com.onesignal": {
"gradle_signatures": [
"com.onesignal:OneSignal"
],
"license": "NonFree"
},
"com.tencent.bugly": {
"gradle_signatures": [
"com.tencent.bugly"
],
"license": "NonFree"
},
"com.umeng.umsdk": {
"gradle_signatures": [
"com.umeng.umsdk"
],
"license": "NonFree"
},
"com.yandex.android": {
"gradle_signatures": [
"com\\\\.yandex\\\\.android(?!:authsdk)"
],
"license": "NonFree"
},
"com.yayandroid": {
"gradle_signatures": [
"com.yayandroid:LocationManager"
],
"license": "NonFree"
},
"crashlytics": {
"gradle_signatures": [
"crashlytics"
],
"license": "NonFree"
},
"crittercism": {
"gradle_signatures": [
"crittercism"
],
"license": "NonFree"
},
"firebase": {
"gradle_signatures": [
"com(\\\\.google)?\\\\.firebase[.:](?!firebase-jobdispatcher|geofire-java)"
],
"license": "NonFree"
},
"flurryagent": {
"gradle_signatures": [
"flurryagent"
],
"license": "NonFree"
},
"google-ad": {
"gradle_signatures": [
"google.*ad.*view"
],
"license": "NonFree"
},
"google.admob": {
"gradle_signatures": [
"google.*admob"
],
"license": "NonFree"
},
"google.play.services": {
"gradle_signatures": [
"google.*play.*services"
],
"license": "NonFree"
},
"heyzap": {
"gradle_signatures": [
"heyzap"
],
"license": "NonFree"
},
"io.github.sinaweibosdk": {
"gradle_signatures": [
"io.github.sinaweibosdk"
],
"license": "NonFree"
},
"io.objectbox": {
"gradle_signatures": [
"io.objectbox:objectbox-gradle-plugin"
],
"license": "NonFree"
},
"jpct": {
"gradle_signatures": [
"jpct.*ae"
],
"license": "NonFree"
},
"libspen23": {
"gradle_signatures": [
"libspen23"
],
"license": "NonFree"
},
"me.pushy": {
"gradle_signatures": [
"me.pushy:sdk"
],
"license": "NonFree"
},
"org.jetbrains.kotlinx": {
"gradle_signatures": [
"org.jetbrains.kotlinx:kotlinx-coroutines-play-services"
],
"license": "NonFree"
},
"ouya": {
"gradle_signatures": [
"ouya.*sdk"
],
"license": "NonFree"
},
"paypal": {
"gradle_signatures": [
"paypal.*mpl"
],
"license": "NonFree"
},
"xyz.belvi.mobilevision": {
"gradle_signatures": [
"xyz.belvi.mobilevision:barcodescanner"
],
"license": "NonFree"
},
"youtube": {
"gradle_signatures": [
"youtube.*android.*player.*api"
],
"license": "NonFree"
}
},
"timestamp": 1664480104.875586,
"version": 1,
"last_updated": 1664480104.875586
}'''

View File

@ -402,7 +402,7 @@ class BuildTest(unittest.TestCase):
os.chdir(testdir)
os.mkdir("build")
config = dict()
config = fdroidserver.common.get_config()
config['sdk_path'] = os.getenv('ANDROID_HOME')
config['ndk_paths'] = {'r10d': os.getenv('ANDROID_NDK_HOME')}
fdroidserver.common.config = config

View File

@ -17,6 +17,7 @@ import zipfile
import collections
import pathlib
from unittest import mock
from datetime import datetime, timedelta
localmodule = os.path.realpath(
os.path.join(os.path.dirname(inspect.getfile(inspect.currentframe())), '..')
@ -29,7 +30,7 @@ import fdroidserver.build
import fdroidserver.common
import fdroidserver.metadata
import fdroidserver.scanner
from testcommon import TmpCwd
from testcommon import TmpCwd, mock_open_to_str
class ScannerTest(unittest.TestCase):
@ -53,8 +54,8 @@ class ScannerTest(unittest.TestCase):
'com.integreight.onesheeld': 11,
'com.jens.automation2': 2,
'firebase-suspect': 1,
'org.mozilla.rocket': 3,
'org.tasks': 3,
'org.mozilla.rocket': 1,
'org.tasks': 2,
'realm': 1,
'se.manyver': 2,
}
@ -446,21 +447,27 @@ class Test_scan_binary(unittest.TestCase):
fdroidserver.common.config = config
fdroidserver.common.options = mock.Mock()
def test_code_signature_match(self):
apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk')
mock_code_signatures = {
fdroidserver.scanner._SCANNER_TOOL = mock.Mock()
fdroidserver.scanner._SCANNER_TOOL.regexs = {}
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"java/lang/Object": re.compile(
r'.*java/lang/Object', re.IGNORECASE | re.UNICODE
)
}
with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures):
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile
),
)
fdroidserver.scanner._SCANNER_TOOL.regexs['warn_code_signatures'] = {}
def test_code_signature_match(self):
apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk')
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner._SCANNER_TOOL.regexs[
'err_code_signatures'
].values(),
apkfile,
),
)
@unittest.skipIf(
sys.version_info < (3, 9),
@ -470,115 +477,213 @@ class Test_scan_binary(unittest.TestCase):
)
def test_bottom_level_embedded_apk_code_signature(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
mock_code_signatures = {
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile(
r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity',
re.IGNORECASE | re.UNICODE,
)
}
with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures):
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile
),
)
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner._SCANNER_TOOL.regexs[
'err_code_signatures'
].values(),
apkfile,
),
)
def test_top_level_signature_embedded_apk_present(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
mock_code_signatures = {
fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = {
"org/fdroid/ci/BuildConfig": re.compile(
r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE
)
}
with mock.patch("fdroidserver.scanner.CODE_SIGNATURES", mock_code_signatures):
self.assertEqual(
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner.CODE_SIGNATURES.values(), apkfile
),
)
def test_no_match(self):
apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk')
result = fdroidserver.scanner.scan_binary(apkfile)
self.assertEqual(
0, result, "Found false positives in binary '{}'".format(apkfile)
1,
fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(
fdroidserver.scanner._SCANNER_TOOL.regexs[
'err_code_signatures'
].values(),
apkfile,
),
)
class Test__exodus_compile_signatures(unittest.TestCase):
def setUp(self):
self.m1 = mock.Mock()
self.m1.code_signature = r"^random\sregex$"
self.m2 = mock.Mock()
self.m2.code_signature = r"^another.+regex$"
self.mock_sigs = [self.m1, self.m2]
def test_ok(self):
result = fdroidserver.scanner._exodus_compile_signatures(self.mock_sigs)
self.assertListEqual(
result,
[
re.compile(self.m1.code_signature),
re.compile(self.m2.code_signature),
],
class Test_SignatureDataController(unittest.TestCase):
# __init__
def test_init(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
self.assertEqual(sdc.name, 'nnn')
self.assertEqual(sdc.filename, 'fff.yml')
self.assertEqual(sdc.cache_duration, timedelta(999999))
self.assertDictEqual(sdc.data, {})
def test_not_iterable(self):
result = fdroidserver.scanner._exodus_compile_signatures(123)
self.assertListEqual(result, [])
class Test_load_exodus_trackers_signatures(unittest.TestCase):
def setUp(self):
self.requests_ret = mock.Mock()
self.requests_ret.json = mock.Mock(
return_value={
"trackers": {
"1": {
"id": 1,
"name": "Steyer Puch 1",
"description": "blah blah blah",
"creation_date": "1956-01-01",
"code_signature": "com.puch.|com.steyer.",
"network_signature": "pst\\.com",
"website": "https://pst.com",
"categories": ["tracker"],
"documentation": [],
},
"2": {
"id": 2,
"name": "Steyer Puch 2",
"description": "blah blah blah",
"creation_date": "1956-01-01",
"code_signature": "com.puch.|com.steyer.",
"network_signature": "pst\\.com",
"website": "https://pst.com",
"categories": ["tracker"],
"documentation": [],
},
},
}
# check_last_updated
def test_check_last_updated_ok(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
self.requests_func = mock.Mock(return_value=self.requests_ret)
self.compilesig_func = mock.Mock(return_value="mocked return value")
sdc.data['last_updated'] = datetime.utcnow().timestamp()
sdc.check_last_updated()
def test_ok(self):
with mock.patch("requests.get", self.requests_func), mock.patch(
"fdroidserver.scanner._exodus_compile_signatures", self.compilesig_func
def test_check_last_updated_exception_cache_outdated(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.cache_duration = timedelta(days=7)
sdc.data['last_updated'] = (datetime.utcnow() - timedelta(days=30)).timestamp()
with self.assertRaises(fdroidserver.scanner.SignatureDataOutdatedException):
sdc.check_last_updated()
def test_check_last_updated_exception_not_string(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.data['last_updated'] = 'sepp'
with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated()
def test_check_last_updated_exception_not_iso_formatted_string(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.data['last_updated'] = '01/09/2002 10:11'
with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated()
def test_check_last_updated_no_exception_missing_when_last_updated_not_set(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.check_last_updated()
# check_data_version
def test_check_data_version_ok(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.data['version'] = fdroidserver.scanner.SCANNER_CACHE_VERSION
sdc.check_data_version()
def test_check_data_version_exception(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
with self.assertRaises(
fdroidserver.scanner.SignatureDataVersionMismatchException
):
sigs, regex = fdroidserver.scanner.load_exodus_trackers_signatures()
self.requests_func.assert_called_once_with(
"https://reports.exodus-privacy.eu.org/api/trackers", timeout=300
)
self.assertEqual(len(sigs), 2)
self.assertListEqual([1, 2], sorted([x.id for x in sigs]))
sdc.check_data_version()
self.compilesig_func.assert_called_once_with(sigs)
self.assertEqual(regex, "mocked return value")
def test_load_ok(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
func_lfc = mock.Mock()
func_vd = mock.Mock()
func_clu = mock.Mock()
with mock.patch(
'fdroidserver.scanner.SignatureDataController.load_from_cache',
func_lfc,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.verify_data',
func_vd,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.check_last_updated',
func_clu,
):
sdc.load()
func_lfc.assert_called_once_with()
func_vd.assert_called_once_with()
func_clu.assert_called_once_with()
def test_load_initial_cache_miss(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
func_lfc = mock.Mock(
side_effect=fdroidserver.scanner.SignatureDataCacheMissException
)
func_lfd = mock.Mock()
with mock.patch(
'fdroidserver.scanner.SignatureDataController.load_from_cache',
func_lfc,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.load_from_defaults',
func_lfd,
):
sdc.load()
func_lfc.assert_called_once_with()
func_lfd.assert_called_once_with()
def test_load_cache_auto_refresh(self):
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
func_lfc = mock.Mock()
func_vd = mock.Mock()
func_clu = mock.Mock(
side_effect=fdroidserver.scanner.SignatureDataOutdatedException()
)
func_fsfw = mock.Mock()
func_wtc = mock.Mock()
with mock.patch(
'fdroidserver.scanner.SignatureDataController.load_from_cache',
func_lfc,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.verify_data',
func_vd,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.check_last_updated',
func_clu,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.fetch_signatures_from_web',
func_fsfw,
), mock.patch(
'fdroidserver.scanner.SignatureDataController.write_to_cache',
func_wtc,
):
sdc.load()
func_lfc.assert_called_once_with()
func_vd.assert_called_once_with()
func_clu.assert_called_once_with()
func_fsfw.assert_called_once_with()
func_wtc.assert_called_once_with()
@unittest.skipIf(
sys.version_info < (3, 9, 0),
"mock_open doesn't allow easy access to written data in older python versions",
)
def test_write_to_cache(self):
open_func = mock.mock_open()
sdc = fdroidserver.scanner.SignatureDataController(
'nnn', 'fff.yml', 'https://example.com/test.json'
)
sdc.data = {"mocked": "data"}
with mock.patch("builtins.open", open_func), mock.patch(
"fdroidserver.scanner._scanner_cachedir",
return_value=pathlib.Path('.'),
):
sdc.write_to_cache()
open_func.assert_called_with(pathlib.Path('fff.yml'), 'w', encoding="utf-8")
self.assertEqual(mock_open_to_str(open_func), """{\n "mocked": "data"\n}""")
class Test_ScannerTool(unittest.TestCase):
def test_load(self):
st = mock.Mock()
st.sdcs = [mock.Mock(), mock.Mock()]
fdroidserver.scanner.ScannerTool.load(st)
st.sdcs[0].load.assert_called_once_with()
st.sdcs[1].load.assert_called_once_with()
class Test_main(unittest.TestCase):
@ -627,7 +732,7 @@ class Test_main(unittest.TestCase):
self.exit_func.assert_not_called()
self.read_app_args_func.assert_not_called()
self.scan_binary_func.assert_called_once_with('local.application.apk', [])
self.scan_binary_func.assert_called_once_with('local.application.apk')
if __name__ == "__main__":
@ -648,8 +753,7 @@ if __name__ == "__main__":
[
unittest.makeSuite(ScannerTest),
unittest.makeSuite(Test_scan_binary),
unittest.makeSuite(Test__exodus_compile_signatures),
unittest.makeSuite(Test_load_exodus_trackers_signatures),
unittest.makeSuite(Test_SignatureDataController),
unittest.makeSuite(Test_main),
]
)

View File

@ -36,8 +36,7 @@ class TmpCwd():
class TmpPyPath():
"""Context-manager for temporarily changing the current working
directory.
"""Context-manager for temporarily adding a direcory to python path
"""
def __init__(self, additional_path):
@ -48,3 +47,14 @@ class TmpPyPath():
def __exit__(self, a, b, c):
sys.path.remove(self.additional_path)
def mock_open_to_str(mock):
"""
helper function for accessing all data written into a
unittest.mock.mock_open() instance as a string.
"""
return "".join([
x.args[0] for x in mock.mock_calls if str(x).startswith("call().write(")
])