Skip to content

Instantly share code, notes, and snippets.

@klezVirus
Last active August 21, 2024 12:35
Show Gist options
  • Save klezVirus/ae28e23335374985c0570ab7c71a7b62 to your computer and use it in GitHub Desktop.
Save klezVirus/ae28e23335374985c0570ab7c71a7b62 to your computer and use it in GitHub Desktop.
Attempt to hook a NT functionwhile also try to recover partial information about the caller (module, address)
import frida
import sys
import subprocess
import ctypes
import threading
import multiprocessing
import argparse
def inject_dummy():
try:
ctypes.windll.kernel32.Sleep(10000)
# Define shellcode to be executed
shellcode = b"\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90"
# Allocate memory in target process
base_address = ctypes.windll.kernel32.VirtualAllocEx(-1, None, len(shellcode), 0x3000, 0x40)
# Write shellcode to allocated memory
written = ctypes.c_int(0)
ctypes.windll.kernel32.WriteProcessMemory(-1, base_address, shellcode, len(shellcode), ctypes.byref(written))
# Create remote thread to execute shellcode
thread_id = ctypes.c_ulong(0)
ctypes.windll.kernel32.CreateThread(None, 0, base_address, None, 0, ctypes.byref(thread_id))
ctypes.windll.kernel32.WaitForSingleObject(-1, 2000)
except:
multiprocessing.current_process().terminate()
def spawn_process():
# Start the process using subprocess.Popen
process = multiprocessing.Process(target=inject_dummy)
process.start()
return process
def on_message(message, data):
print("[%s] => %s" % (message, data))
def frida_attach(target_process):
if isinstance(target_process, int):
target_pid = target_process
else:
target_pid = target_process.pid
session = frida.attach(target_pid)
script = session.create_script("""
/*
Utility inRange: Check whether an int is within a range
*/
function inRange(x, range) {
return ((x-range[0])*(x-range[1]) <= 0);
};
/*
Utility inAddressRange: Check whether an address is within an address range
*/
function inAddressRange(addr, modules){
for (var [m, range] of Object.entries(modules)){
// console.log(`Module ${m}: (${range[0]}, ${range[1]}) - ${}`);
if(inRange(parseInt(addr, 16), range)){
return [m, addr];
}
}
return ["UNKNOWN", addr];
};
/*
Utility printStack: Print the stack trace
*/
function format_stack(stacktrace){
var callstack = `\n`;
// Format callstack
for (var [m, addr] of stacktrace){
addr = pad(" ", addr, true);
callstack = callstack + (`\t${addr} - ${m}\n`);
}
return callstack;
}
/*
Utility pad string
*/
function pad(pad, str, padLeft) {
if (typeof str === 'undefined')
return pad;
if (padLeft) {
return (pad + str).slice(-pad.length);
} else {
return (str + pad).substring(0, pad.length);
}
}
/*
Detect unbacked modules
*/
function detect_unbacked_modules(stacktrace){
var call_from_unbacked_module = stacktrace.filter(x => x[0] == "UNKNOWN");
if (call_from_unbacked_module.length > 0){
// Get address information from the unbacked modules
for (var [m, addr] of call_from_unbacked_module){
var mi = Process.findRangeByAddress(addr);
console.log(`[!] Call from unbacked ${mi.protection} memory at: ${mi.base} (Size: ${mi.size})`);
}
}
}
// Enumerates all DLL in the process
const modules = Process.enumerateModules();
// DEBUG print
// console.log('[>] Modules: ' + JSON.stringify(modules));
// Build a modules dictionary in format {name: [beginAddress, endAddress]}
const modules_dictionary = Object.assign({}, ...modules.map((x) => ({[x.name]: [parseInt(x.base, 16), parseInt(x.base, 16) + x.size]})));
// Alternative way to build up the modules dictionary
// const modules_dictionary = Object.fromEntries(modules.map(x => {[x.base]: [x.name, x.size]}));
// DEBUG print
// console.log(JSON.stringify(modules_dictionary));
// Intercept allocations in NTDLL by hooking NtAllocateVirtualMemory
var pNtAllocateVirtualMemory = Module.findExportByName("ntdll.dll", "NtAllocateVirtualMemory");
// Define Interceptor
Interceptor.attach(pNtAllocateVirtualMemory, {
onEnter: function (args) {
// Print the call to NtAllocateVirtualMemory with its arguments
console.log(`[*] NtAllocateVirtualMemory(${args[0]}, ${args[1]}, ${args[2]}, ${args[3]}, ${args[4]}, ${args[5]})`);
// Record important parameters
// Second arg is BaseAddress of the module
this.basePtr = args[1];
// And fourth is RegionSize
this.size = args[3];
// Get thread callstack
/*
Note: Using Backtracer.ACCURATE it is possible to get more accurate results.
However, it usually freezes for so long that I had to stick with the FUZZY option
*/
// Using ACCURATE and DEBUG symbols --> May freeze / not work
// var stacktrace = Thread.backtrace(this.context, Backtracer.ACCURATE).map(DebugSymbol.fromAddress);
// Get FUZZY thread call stack and using previously collected modules to identify referred modules
var raw_stacktrace = Thread.backtrace(this.context, Backtracer.FUZZY);
var stacktrace = raw_stacktrace.map(x => inAddressRange(x, modules_dictionary));
// DEBUG print
// console.log('[*] NtAllocateVirtualMemory allocating ' + this.size + ' bytes at: ' + this.basePtr);
// Callstack (addresses only)
// console.log('[>] Call stack (RAW): ' + JSON.stringify(raw_stacktrace));
// Callstack resolved
console.log('[>] Call stack: ' + format_stack(stacktrace));
// Detect call from unbacked module and print its memory information
detect_unbacked_modules(stacktrace);
},
onLeave(retval) {
if (retval == 0){
console.log('[+] NtAllocateVirtualMemory successfully allocated ' + this.size + ' bytes at: ' + this.basePtr);
console.log(' -------------------------------------------------------- ');
} else{
console.log('[-] NtAllocateVirtualMemory failed.');
console.log(' -------------------------------------------------------- ');
}
}
});""")
script.on('message', on_message)
script.load()
print("[!] Ctrl+D on UNIX, Ctrl+Z on Windows/cmd.exe to detach from instrumented program.\n\n")
sys.stdin.read()
session.detach()
def main():
parser = argparse.ArgumentParser(description='Frier - A simple script to hook and trace NtAllocateVirtualMemory')
parser.add_argument('-p', '--pid', help='PID of the process to be hooked', type=int)
parser.add_argument('-T', '--test', help='Auto spawn and inject for testing purposes', action='store_true', default=False)
args = parser.parse_args()
if not args.pid and not args.test:
print("[-] No PID specified. Use -h for help.")
parser.print_help()
return
elif args.test:
print("[!] No PID specified. Spawning a new process and performing simple dummy injection...")
target_process = spawn_process()
else:
target_process = args.pid
try:
frida_attach(target_process)
except KeyboardInterrupt:
print("[!] Ctrl+C pressed. Exiting...")
[p.kill() for p in multiprocessing.active_children()]
sys.exit(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment