'''
This script aims to be the most generic and the most explicit possible.
It works with OWASP ZAP API Python client.
To use it, you have to load the Python API client module and start ZAP
Before starting this script for the first time: Open ZAP, go to
Tools -> Options -> API -> Generate random Key, copy and paste the key in the
variable "apiKey" of the configuration area
This script is divided into two parts : a configuration area, where you have to
change variables according to your needs, and the part with API calls.
Author : aine-rb on Github, from Sopra Steria - modified for Sn1per by @xer0dayz
'''
import time
from pprint import pprint
from zapv2 import ZAPv2
import sys, getopt
targetURL = str(sys.argv[1])
apiKey=''
localProxy = {"http": "http://127.0.0.1:8081", "https": "http://127.0.0.1:8081"}
isNewSession = True
sessionName = 'WebgoatSession'
globalExcludeUrl = ['^(?:(?!http:\/\/localhost:8081).*).$']
useProxyChain = False
proxyAddress = 'my.corp.proxy'
proxyPort = '8080'
skipProxyAddresses = ('127.0.0.1;'
'localhost')
useProxyChainAuth = False
proxyUsername = ''
proxyPassword = ''
proxyRealm = ''
useProxyScript = False
proxyScriptName = 'proxyScript.js'
proxyScriptEngine = 'Oracle Nashorn'
proxyScriptFileName = '/zap/scripts/proxy/proxyScript.js'
proxyScriptDescription = 'This is a description'
useContextForScan = False
defineNewContext = False
contextName = 'WebGoat_script-based'
contextId = 0
contextIncludeURL = [targetURL + '.*']
contextExcludeURL = ['http://localhost:8081/WebGoat/j_spring_security_logout',
'http://localhost:8081/WebGoat/logout.mvc']
sessionManagement = 'cookieBasedSessionManagement'
authMethod = 'scriptBasedAuthentication'
authScriptName = 'TwoStepAuthentication.js'
authScriptEngine = 'Oracle Nashorn'
authScriptFileName = '/zap/scripts/authentication/TwoStepAuthentication.js'
authScriptDescription = 'This is a description'
authParams = ('scriptName=' + authScriptName + '&'
'Submission Form URL=http://localhost:8081/WebGoat/j_spring_security_check&'
'Username field=username&'
'Password field=password&'
'Target URL=http://localhost:8081/WebGoat/welcome.mvc')
isLoggedInIndicator = False
indicatorRegex = '\QLocation: http://localhost:8081/WebGoat/login.mvc\E'
createUser = False
userList = [
{'name': 'guest', 'credentials': 'Username=guest&Password=guest'},
{'name': 'webgoat', 'credentials': 'Username=webgoat&Password=webgoat'}
]
userIdList = []
target = targetURL
applicationURL = ['']
useScanPolicy = False
scanPolicyName = 'SQL Injection and XSS'
isWhiteListPolicy = False
ascanIds = [40018, 40019, 40020, 40021, 40022, 40024, 90018,
40012, 40014, 40016, 40017]
alertThreshold = 'Medium'
attackStrength = 'Low'
useAjaxSpider = True
shutdownOnceFinished = False
sys.stdout = open("/usr/share/sniper/bin/zap-report.txt", "w")
zap = ZAPv2(proxies=localProxy, apikey=apiKey)
core = zap.core
if isNewSession:
pprint('Create ZAP session: ' + sessionName + ' -> ' +
core.new_session(name=sessionName, overwrite=True))
else:
pprint('Load ZAP session: ' + sessionName + ' -> ' +
core.load_session(name=sessionName))
print('Add Global Exclude URL regular expressions:')
for regex in globalExcludeUrl:
pprint(regex + ' ->' + core.exclude_from_proxy(regex=regex))
pprint('Enable outgoing proxy chain: ' + str(useProxyChain) + ' -> ' +
core.set_option_use_proxy_chain(boolean=useProxyChain))
if useProxyChain:
pprint('Set outgoing proxy name: ' + proxyAddress + ' -> ' +
core.set_option_proxy_chain_name(string=proxyAddress))
pprint('Set outgoing proxy port: ' + proxyPort + ' -> ' +
core.set_option_proxy_chain_port(integer=proxyPort))
pprint('Skip names for outgoing proxy: ' + skipProxyAddresses + ' -> ' +
core.set_option_proxy_chain_skip_name(string=skipProxyAddresses))
pprint('Set outgoing proxy chain authentication: ' +
str(useProxyChainAuth) + ' -> ' +
core.set_option_use_proxy_chain_auth(boolean=useProxyChainAuth))
if useProxyChainAuth:
pprint('Set outgoing proxy username -> ' +
core.set_option_proxy_chain_user_name(string=proxyUsername))
pprint('Set outgoing proxy password -> ' +
core.set_option_proxy_chain_password(string=proxyPassword))
pprint('Set outgoing proxy realm: ' + proxyRealm + ' -> ' +
core.set_option_proxy_chain_realm(string=proxyRealm))
if useProxyScript:
script = zap.script
script.remove(scriptname=proxyScriptName)
pprint('Load proxy script: ' + proxyScriptName + ' -> ' +
script.load(scriptname=proxyScriptName, scripttype='proxy',
scriptengine=proxyScriptEngine,
filename=proxyScriptFileName,
scriptdescription=proxyScriptDescription))
pprint('Enable proxy script: ' + proxyScriptName + ' -> ' +
script.enable(scriptname=proxyScriptName))
if useContextForScan:
context = zap.context
if defineNewContext:
contextId = context.new_context(contextname=contextName)
pprint('Use context ID: ' + contextId)
print('Include URL in context:')
for url in contextIncludeURL:
pprint(url + ' -> ' +
context.include_in_context(contextname=contextName,
regex=url))
print('Exclude URL from context:')
for url in contextExcludeURL:
pprint(url + ' -> ' +
context.exclude_from_context(contextname=contextName,
regex=url))
pprint('Set session management method: ' + sessionManagement + ' -> ' +
zap.sessionManagement.set_session_management_method(
contextid=contextId, methodname=sessionManagement,
methodconfigparams=None))
if authMethod == 'scriptBasedAuthentication':
script = zap.script
script.remove(scriptname=authScriptName)
pprint('Load script: ' + authScriptName + ' -> ' +
script.load(scriptname=authScriptName,
scripttype='authentication',
scriptengine=authScriptEngine,
filename=authScriptFileName,
scriptdescription=authScriptDescription))
auth = zap.authentication
pprint('Set authentication method: ' + authMethod + ' -> ' +
auth.set_authentication_method(contextid=contextId,
authmethodname=authMethod,
authmethodconfigparams=authParams))
if isLoggedInIndicator:
pprint('Define Loggedin indicator: ' + indicatorRegex + ' -> ' +
auth.set_logged_in_indicator(contextid=contextId,
loggedinindicatorregex=indicatorRegex))
else:
pprint('Define Loggedout indicator: ' + indicatorRegex + ' -> ' +
auth.set_logged_out_indicator(contextid=contextId,
loggedoutindicatorregex=indicatorRegex))
users = zap.users
if createUser:
for user in userList:
userName = user.get('name')
print('Create user ' + userName + ':')
userId = users.new_user(contextid=contextId, name=userName)
userIdList.append(userId)
pprint('User ID: ' + userId + '; username -> ' +
users.set_user_name(contextid=contextId, userid=userId,
name=userName) +
'; credentials -> ' +
users.set_authentication_credentials(contextid=contextId,
userid=userId,
authcredentialsconfigparams=user.get('credentials')) +
'; enabled -> ' +
users.set_user_enabled(contextid=contextId, userid=userId,
enabled=True))
pprint('Enable all passive scanners -> ' +
zap.pscan.enable_all_scanners())
ascan = zap.ascan
if useScanPolicy:
ascan.remove_scan_policy(scanpolicyname=scanPolicyName)
pprint('Add scan policy ' + scanPolicyName + ' -> ' +
ascan.add_scan_policy(scanpolicyname=scanPolicyName))
for policyId in range(0, 5):
ascan.set_policy_alert_threshold(id=policyId,
alertthreshold=alertThreshold,
scanpolicyname=scanPolicyName)
ascan.set_policy_attack_strength(id=policyId,
attackstrength=attackStrength,
scanpolicyname=scanPolicyName)
if isWhiteListPolicy:
pprint('Disable all scanners -> ' +
ascan.disable_all_scanners(scanpolicyname=scanPolicyName))
pprint('Enable given scan IDs -> ' +
ascan.enable_scanners(ids=ascanIds,
scanpolicyname=scanPolicyName))
else:
pprint('Enable all scanners -> ' +
ascan.enable_all_scanners(scanpolicyname=scanPolicyName))
pprint('Disable given scan IDs -> ' +
ascan.disable_scanners(ids=ascanIds,
scanpolicyname=scanPolicyName))
else:
print('No custom policy used for scan')
scanPolicyName = None
pprint('Access target URL ' + target)
core.access_url(url=target, followredirects=True)
for url in applicationURL:
pprint('Access URL ' + url)
core.access_url(url=url, followredirects=True)
time.sleep(2)
forcedUser = zap.forcedUser
spider = zap.spider
ajax = zap.ajaxSpider
scanId = 0
print('Starting Scans on target: ' + target)
if useContextForScan:
for userId in userIdList:
print('Starting scans with User ID: ' + userId)
scanId = spider.scan_as_user(contextid=contextId, userid=userId,
url=target, maxchildren=None, recurse=True, subtreeonly=None)
print('Start Spider scan with user ID: ' + userId +
'. Scan ID equals: ' + scanId)
time.sleep(2)
while (int(spider.status(scanId)) < 100):
print('Spider progress: ' + spider.status(scanId) + '%')
time.sleep(2)
print('Spider scan for user ID ' + userId + ' completed')
if useAjaxSpider:
pprint('Set forced user mode enabled -> ' +
forcedUser.set_forced_user_mode_enabled(boolean=True))
pprint('Set user ID: ' + userId + ' for forced user mode -> ' +
forcedUser.set_forced_user(contextid=contextId,
userid=userId))
pprint('Ajax Spider the target with user ID: ' + userId + ' -> ' +
ajax.scan(url=target, inscope=None))
time.sleep(10)
while (ajax.status != 'stopped'):
print('Ajax Spider is ' + ajax.status)
time.sleep(5)
for url in applicationURL:
pprint('Ajax Spider the URL: ' + url + ' with user ID: ' +
userId + ' -> ' +
ajax.scan(url=url, inscope=None))
time.sleep(10)
while (ajax.status != 'stopped'):
print('Ajax Spider is ' + ajax.status)
time.sleep(5)
pprint('Set forced user mode disabled -> ' +
forcedUser.set_forced_user_mode_enabled(boolean=False))
print('Ajax Spider scan for user ID ' + userId + ' completed')
scanId = ascan.scan_as_user(url=target, contextid=contextId,
userid=userId, recurse=True, scanpolicyname=scanPolicyName,
method=None, postdata=True)
print('Start Active Scan with user ID: ' + userId +
'. Scan ID equals: ' + scanId)
time.sleep(2)
while (int(ascan.status(scanId)) < 100):
print('Active Scan progress: ' + ascan.status(scanId) + '%')
time.sleep(2)
print('Active Scan for user ID ' + userId + ' completed')
else:
scanId = spider.scan(url=target, maxchildren=None, recurse=True,
contextname=None, subtreeonly=None)
print('Scan ID equals ' + scanId)
time.sleep(2)
while (int(spider.status(scanId)) < 100):
print('Spider progress ' + spider.status(scanId) + '%')
time.sleep(2)
print('Spider scan completed')
if useAjaxSpider:
pprint('Start Ajax Spider -> ' + ajax.scan(url=target, inscope=None))
time.sleep(10)
while (ajax.status != 'stopped'):
print('Ajax Spider is ' + ajax.status)
time.sleep(5)
for url in applicationURL:
pprint('Ajax Spider the URL: ' + url + ' -> ' +
ajax.scan(url=url, inscope=None))
time.sleep(10)
while (ajax.status != 'stopped'):
print('Ajax Spider is ' + ajax.status)
time.sleep(5)
print('Ajax Spider scan completed')
scanId = zap.ascan.scan(url=target, recurse=True, inscopeonly=None,
scanpolicyname=scanPolicyName, method=None, postdata=True)
print('Start Active scan. Scan ID equals ' + scanId)
while (int(ascan.status(scanId)) < 100):
print('Active Scan progress: ' + ascan.status(scanId) + '%')
time.sleep(5)
print('Active Scan completed')
time.sleep(5)
print('HTML report:')
pprint(core.htmlreport())
print('XML report')
pprint(core.xmlreport())
if shutdownOnceFinished:
pprint('Shutdown ZAP -> ' + core.shutdown())
sys.stdout.close()