1
0
mirror of https://gitlab.com/fdroid/fdroidserver.git synced 2024-09-21 04:10:37 +02:00

scanner.py: format

This commit is contained in:
linsui 2024-08-07 13:00:26 +08:00 committed by Hans-Christoph Steiner
parent 3782eddc4d
commit c3644464ff

View File

@ -16,29 +16,26 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import imghdr
import itertools
import json
import logging
import os
import re
import sys
import json
import imghdr
import logging
import zipfile
import itertools
import traceback
import urllib.parse
import urllib.request
import zipfile
from argparse import ArgumentParser
from tempfile import TemporaryDirectory
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field, fields
from datetime import datetime, timedelta
from enum import IntEnum
from pathlib import Path
from tempfile import TemporaryDirectory
from . import _
from . import common
from . import metadata
from .exception import BuildException, VCSException, ConfigurationException
from . import scanner
from . import _, common, metadata, scanner
from .exception import BuildException, ConfigurationException, VCSException
@dataclass
@ -48,8 +45,10 @@ class MessageStore:
errors: list = field(default_factory=list)
MAVEN_URL_REGEX = re.compile(r"""\smaven\s*(?:{.*?(?:setUrl|url)|\(\s*(?:url)?)\s*=?\s*(?:uri|URI|Uri\.create)?\(?\s*["']?([^\s"']+)["']?[^})]*[)}]""",
re.DOTALL)
MAVEN_URL_REGEX = re.compile(
r"""\smaven\s*(?:{.*?(?:setUrl|url)|\(\s*(?:url)?)\s*=?\s*(?:uri|URI|Uri\.create)?\(?\s*["']?([^\s"']+)["']?[^})]*[)}]""",
re.DOTALL,
)
SCANNER_CACHE_VERSION = 1
@ -60,7 +59,8 @@ class ExitCode(IntEnum):
def get_gradle_compile_commands(build):
compileCommands = ['api',
compileCommands = [
'api',
'apk',
'classpath',
'compile',
@ -68,13 +68,16 @@ def get_gradle_compile_commands(build):
'id',
'implementation',
'provided',
'runtimeOnly']
'runtimeOnly',
]
buildTypes = ['', 'release']
flavors = ['']
if build.gradle and build.gradle != ['yes']:
flavors += build.gradle
commands = [''.join(c) for c in itertools.product(flavors, buildTypes, compileCommands)]
commands = [
''.join(c) for c in itertools.product(flavors, buildTypes, compileCommands)
]
return [re.compile(r'\s*' + c, re.IGNORECASE) for c in commands]
@ -115,7 +118,9 @@ def get_embedded_classes(apkfile, depth=0):
["dexdump", '{}/{}'.format(tmp_dir, info.filename)],
output=False,
)
classes = classes.union(set(re.findall(r'[A-Z]+((?:\w+\/)+\w+)', run.output)))
classes = classes.union(
set(re.findall(r'[A-Z]+((?:\w+\/)+\w+)', run.output))
)
except zipfile.BadZipFile as ex:
return {_('Problem with ZIP file: %s, error %s') % (apkfile, ex)}
@ -191,9 +196,11 @@ class SignatureDataController:
raise SignatureDataMalformedException() from e
delta = (last_updated + self.cache_duration) - scanner._datetime_now()
if delta > timedelta(seconds=0):
logging.debug(_('next {name} cache update due in {time}').format(
logging.debug(
_('next {name} cache update due in {time}').format(
name=self.filename, time=delta
))
)
)
else:
raise SignatureDataOutdatedException()
@ -202,7 +209,9 @@ class SignatureDataController:
self.fetch_signatures_from_web()
self.write_to_cache()
except Exception as e:
raise Exception(_("downloading scanner signatures from '{}' failed").format(self.url)) from e
raise Exception(
_("downloading scanner signatures from '{}' failed").format(self.url)
) from e
def load(self):
try:
@ -215,10 +224,17 @@ class SignatureDataController:
except (SignatureDataOutdatedException, SignatureDataNoDefaultsException):
self.fetch_signatures_from_web()
self.write_to_cache()
except (SignatureDataMalformedException, SignatureDataVersionMismatchException) as e:
logging.critical(_("scanner cache is malformed! You can clear it with: '{clear}'").format(
except (
SignatureDataMalformedException,
SignatureDataVersionMismatchException,
) as e:
logging.critical(
_(
"scanner cache is malformed! You can clear it with: '{clear}'"
).format(
clear='rm -r {}'.format(common.get_config()['cachedir_scanner'])
))
)
)
raise e
def load_from_defaults(self):
@ -244,7 +260,13 @@ class SignatureDataController:
Right now this function does just a basic key sanitation.
"""
self.check_data_version()
valid_keys = ['timestamp', 'last_updated', 'version', 'signatures', 'cache_duration']
valid_keys = [
'timestamp',
'last_updated',
'version',
'signatures',
'cache_duration',
]
for k in list(self.data.keys()):
if k not in valid_keys:
@ -266,7 +288,11 @@ class SignatureDataController:
class ExodusSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Exodus signatures', 'exodus.json', 'https://reports.exodus-privacy.eu.org/api/trackers')
super().__init__(
'Exodus signatures',
'exodus.json',
'https://reports.exodus-privacy.eu.org/api/trackers',
)
self.cache_duration = timedelta(days=1) # refresh exodus cache after one day
self.has_trackers_json_key = True
@ -294,7 +320,7 @@ class ExodusSignatureDataController(SignatureDataController):
# exodus also provides network signatures, unused atm.
# "network_signatures": [tracker["network_signature"]],
"AntiFeatures": ["Tracking"], # TODO
"license": "NonFree" # We assume all trackers in exodus
"license": "NonFree", # We assume all trackers in exodus
# are non-free, although free
# trackers like piwik, acra,
# etc. might be listed by exodus
@ -315,9 +341,7 @@ class EtipSignatureDataController(ExodusSignatureDataController):
class SUSSDataController(SignatureDataController):
def __init__(self):
super().__init__(
'SUSS',
'suss.json',
'https://fdroid.gitlab.io/fdroid-suss/suss.json'
'SUSS', 'suss.json', 'https://fdroid.gitlab.io/fdroid-suss/suss.json'
)
def load_from_defaults(self):
@ -332,7 +356,9 @@ class ScannerTool:
self.scanner_data_lookup()
options = common.get_options()
options_refresh_scanner = hasattr(options, "refresh_scanner") and options.refresh_scanner
options_refresh_scanner = (
hasattr(options, "refresh_scanner") and options.refresh_scanner
)
if options_refresh_scanner or common.get_config().get('refresh_scanner'):
self.refresh()
@ -342,8 +368,9 @@ class ScannerTool:
def scanner_data_lookup(self):
sigsources = common.get_config().get('scanner_signature_sources', [])
logging.debug(
"scanner is configured to use signature data from: '{}'"
.format("', '".join(sigsources))
"scanner is configured to use signature data from: '{}'".format(
"', '".join(sigsources)
)
)
self.sdcs = []
for i, source_url in enumerate(sigsources):
@ -361,11 +388,13 @@ class ScannerTool:
"Has to be a valid HTTPS-URL or match a predefined "
"constants: 'suss', 'exodus'".format(source_url)
)
self.sdcs.append(SignatureDataController(
self.sdcs.append(
SignatureDataController(
source_url,
'{}_{}'.format(i, os.path.basename(u.path)),
source_url,
))
)
)
def load(self):
for sdc in self.sdcs:
@ -381,13 +410,21 @@ class ScannerTool:
for sdc in self.sdcs:
for signame, sigdef in sdc.data.get('signatures', {}).items():
for sig in sigdef.get('code_signatures', []):
self.regexs['err_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['err_code_signatures'][sig] = re.compile(
'.*' + sig, re.IGNORECASE
)
for sig in sigdef.get('gradle_signatures', []):
self.regexs['err_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['err_gradle_signatures'][sig] = re.compile(
'.*' + sig, re.IGNORECASE
)
for sig in sigdef.get('warn_code_signatures', []):
self.regexs['warn_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['warn_code_signatures'][sig] = re.compile(
'.*' + sig, re.IGNORECASE
)
for sig in sigdef.get('warn_gradle_signatures', []):
self.regexs['warn_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
self.regexs['warn_gradle_signatures'][sig] = re.compile(
'.*' + sig, re.IGNORECASE
)
def refresh(self):
for sdc in self.sdcs:
@ -430,9 +467,17 @@ def scan_binary(apkfile):
logging.debug("Problem: found class '%s'" % classname)
problems += 1
if warnings:
logging.warning(_("Found {count} warnings in {filename}").format(count=warnings, filename=apkfile))
logging.warning(
_("Found {count} warnings in {filename}").format(
count=warnings, filename=apkfile
)
)
if problems:
logging.critical(_("Found {count} problems in {filename}").format(count=problems, filename=apkfile))
logging.critical(
_("Found {count} problems in {filename}").format(
count=problems, filename=apkfile
)
)
return problems
@ -453,7 +498,10 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
if r.match(s):
yield n
allowed_repos = [re.compile(r'^https://' + re.escape(repo) + r'/*') for repo in [
allowed_repos = (
[
re.compile(r'^https://' + re.escape(repo) + r'/*')
for repo in [
'repo1.maven.org/maven2', # mavenCentral()
'jcenter.bintray.com', # jcenter()
'jitpack.io',
@ -474,10 +522,14 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
'plugins.gradle.org/m2', # Gradle plugin repo
'maven.google.com', # Google Maven Repo, https://developer.android.com/studio/build/dependencies.html#google-maven
]
] + [re.compile(r'^file://' + re.escape(repo) + r'/*') for repo in [
]
+ [
re.compile(r'^file://' + re.escape(repo) + r'/*')
for repo in [
'/usr/share/maven-repo', # local repo on Debian installs
]
]
)
scanignore = common.getpaths_map(build_dir, build.scanignore)
scandelete = common.getpaths_map(build_dir, build.scandelete)
@ -515,7 +567,7 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
-------
0 as we explicitly ignore the file, so don't count an error
"""
msg = ('Ignoring %s at %s' % (what, path_in_build_dir))
msg = 'Ignoring %s at %s' % (what, path_in_build_dir)
logging.info(msg)
if json_per_build is not None:
json_per_build.infos.append([msg, path_in_build_dir])
@ -537,7 +589,7 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
-------
0 as we deleted the offending file
"""
msg = ('Removing %s at %s' % (what, path_in_build_dir))
msg = 'Removing %s at %s' % (what, path_in_build_dir)
logging.info(msg)
if json_per_build is not None:
json_per_build.infos.append([msg, path_in_build_dir])
@ -598,14 +650,16 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
return warnproblem(what, path_in_build_dir, json_per_build)
if options and 'json' in vars(options) and options.json:
json_per_build.errors.append([what, path_in_build_dir])
if options and (options.verbose or not ('json' in vars(options) and options.json)):
if options and (
options.verbose or not ('json' in vars(options) and options.json)
):
logging.error('Found %s at %s' % (what, path_in_build_dir))
return 1
def is_executable(path):
return os.path.exists(path) and os.access(path, os.X_OK)
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f})
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7F})
def is_binary(path):
d = None
@ -614,7 +668,9 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
return bool(d.translate(None, textchars))
# False positives patterns for files that are binary and executable.
safe_paths = [re.compile(r) for r in [
safe_paths = [
re.compile(r)
for r in [
r".*/drawable[^/]*/.*\.png$", # png drawables
r".*/mipmap[^/]*/.*\.png$", # png mipmaps
]
@ -637,14 +693,12 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
# Iterate through all files in the source code
for root, dirs, files in os.walk(build_dir, topdown=True):
# It's topdown, so checking the basename is enough
for ignoredir in ('.hg', '.git', '.svn', '.bzr'):
if ignoredir in dirs:
dirs.remove(ignoredir)
for curfile in files:
if curfile in ['.DS_Store']:
continue
@ -733,13 +787,17 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
if is_used_by_gradle(line):
for name in suspects_found(line):
count += handleproblem(
"usual suspect \'%s\'" % (name),
"usual suspect '%s'" % (name),
path_in_build_dir,
filepath,
json_per_build,
)
noncomment_lines = [line for line in lines if not common.gradle_comment.match(line)]
no_comments = re.sub(r'/\*.*?\*/', '', ''.join(noncomment_lines), flags=re.DOTALL)
noncomment_lines = [
line for line in lines if not common.gradle_comment.match(line)
]
no_comments = re.sub(
r'/\*.*?\*/', '', ''.join(noncomment_lines), flags=re.DOTALL
)
for url in MAVEN_URL_REGEX.findall(no_comments):
if not any(r.match(url) for r in allowed_repos):
count += handleproblem(
@ -756,7 +814,9 @@ def scan_source(build_dir, build=metadata.Build(), json_per_build=None):
)
elif is_executable(filepath):
if is_binary(filepath) and not (safe_path(path_in_build_dir) or is_image_file(filepath)):
if is_binary(filepath) and not (
safe_path(path_in_build_dir) or is_image_file(filepath)
):
warnproblem(
_('executable binary, possibly code'),
path_in_build_dir,
@ -781,15 +841,36 @@ def main():
usage="%(prog)s [options] [(APPID[:VERCODE] | path/to.apk) ...]"
)
common.setup_global_opts(parser)
parser.add_argument("appid", nargs='*', help=_("application ID with optional versionCode in the form APPID[:VERCODE]"))
parser.add_argument("-f", "--force", action="store_true", default=False,
help=_("Force scan of disabled apps and builds."))
parser.add_argument("--json", action="store_true", default=False,
help=_("Output JSON to stdout."))
parser.add_argument("-r", "--refresh", dest="refresh_scanner", action="store_true", default=False,
help=_("fetch the latest version of signatures from the web"))
parser.add_argument("-e", "--exit-code", action="store_true", default=False,
help=_("Exit with a non-zero code if problems were found"))
parser.add_argument(
"appid",
nargs='*',
help=_("application ID with optional versionCode in the form APPID[:VERCODE]"),
)
parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help=_("Force scan of disabled apps and builds."),
)
parser.add_argument(
"--json", action="store_true", default=False, help=_("Output JSON to stdout.")
)
parser.add_argument(
"-r",
"--refresh",
dest="refresh_scanner",
action="store_true",
default=False,
help=_("fetch the latest version of signatures from the web"),
)
parser.add_argument(
"-e",
"--exit-code",
action="store_true",
default=False,
help=_("Exit with a non-zero code if problems were found"),
)
metadata.add_metadata_arguments(parser)
options = common.parse_args(parser)
metadata.warnings_action = options.W
@ -840,7 +921,6 @@ def main():
extlib_dir = os.path.join(build_dir, 'extlib')
for appid, app in apps.items():
json_per_appid = dict()
if app.Disabled and not options.force:
@ -861,14 +941,20 @@ def main():
# Set up vcs interface and make sure we have the latest code...
vcs = common.getvcs(app.RepoType, app.Repo, build_dir)
else:
logging.info(_("{appid}: no builds specified, running on current source state")
.format(appid=appid))
logging.info(
_(
"{appid}: no builds specified, running on current source state"
).format(appid=appid)
)
json_per_build = MessageStore()
json_per_appid['current-source-state'] = json_per_build
count = scan_source(build_dir, json_per_build=json_per_build)
if count > 0:
logging.warning(_('Scanner found {count} problems in {appid}:')
.format(count=count, appid=appid))
logging.warning(
_('Scanner found {count} problems in {appid}:').format(
count=count, appid=appid
)
)
probcount += count
app['Builds'] = []
@ -877,32 +963,42 @@ def main():
json_per_appid[build.versionCode] = json_per_build
if build.disable and not options.force:
logging.info("...skipping version %s - %s" % (
build.versionName, build.get('disable', build.commit[1:])))
logging.info(
"...skipping version %s - %s"
% (build.versionName, build.get('disable', build.commit[1:]))
)
continue
logging.info("...scanning version " + build.versionName)
# Prepare the source code...
common.prepare_source(vcs, app, build,
build_dir, srclib_dir,
extlib_dir, False)
common.prepare_source(
vcs, app, build, build_dir, srclib_dir, extlib_dir, False
)
count = scan_source(build_dir, build, json_per_build=json_per_build)
if count > 0:
logging.warning(_('Scanner found {count} problems in {appid}:{versionCode}:')
.format(count=count, appid=appid, versionCode=build.versionCode))
logging.warning(
_(
'Scanner found {count} problems in {appid}:{versionCode}:'
).format(
count=count, appid=appid, versionCode=build.versionCode
)
)
probcount += count
except BuildException as be:
logging.warning('Could not scan app %s due to BuildException: %s' % (
appid, be))
logging.warning(
'Could not scan app %s due to BuildException: %s' % (appid, be)
)
probcount += 1
except VCSException as vcse:
logging.warning('VCS error while scanning app %s: %s' % (appid, vcse))
probcount += 1
except Exception:
logging.warning('Could not scan app %s due to unknown error: %s' % (
appid, traceback.format_exc()))
logging.warning(
'Could not scan app %s due to unknown error: %s'
% (appid, traceback.format_exc())
)
probcount += 1
for k, v in json_per_appid.items():