Nytro Posted May 10, 2011 Report Posted May 10, 2011 API Hooking in PythonAuthor: cadaver (cred)# patcher.py# handles patching and unpatching of process memory.# public domain code.from ctypes import *from win32api import *from pytcc import pytccfrom struct import pack, unpack, calcsizefrom win32gui import PyGetString, PySetMemory, PySetStringfrom win32con import MEM_COMMIT, MEM_RESERVE, PAGE_EXECUTE_READWRITE, PROCESS_ALL_ACCESSfrom distorm import DecodeDEBUG = Truedef DB (msg): global DEBUG if DEBUG: print (msg)def OpenProcess (pid=GetCurrentProcessId()): """Opens a process by pid.""" DB ("[openProcess] pid:%s."%pid) phandle = windll.kernel32.OpenProcess (\ PROCESS_ALL_ACCESS, False, pid ) assert phandle, "Failed to open process!\n%s" % WinError (GetLastError ()) [1] return phandledef readMemory (phandle, address, size): """readMemory (address, size, phandle):""" cbuffer = c_buffer (size) success = windll.kernel32.ReadProcessMemory (\ phandle, address, cbuffer, size, 0 ) assert success, "Failed to read memory!\n%s" % WinError (GetLastError()) [1] return cbuffer.rawdef writeMemory (phandle, address=None, data=None): """Writes data to memory and returns the address.""" assert data size = len (data) if isinstance (data, str) else sizeof (data) cdata = c_buffer (data) if isinstance (data, str) else byref (data) if not address: address = allocate (size, phandle) success = windll.kernel32.WriteProcessMemory (\ phandle, address, cdata, size, 0 ) assert success, "Failed to write process memory!\n%s" % WinError (GetLastError()) [1] DB ("[write memory] :%s OK." % address) return addressdef allocate (size, phandle): """Allocates memory of size in phandle.""" address = windll.kernel32.VirtualAllocEx (\ phandle, 0, size, MEM_RESERVE | MEM_COMMIT, PAGE_EXECUTE_READWRITE ) assert address, "Failed to allocate memory!\n%s" % WinError(GetLastError()) [1] DB ("[memory allocation] :%s" % address) return addressdef releaseMemory (address, size, phandle): """Releases memory by address.""" return windll.kernel32.VirtualFreeEx (\ phandle, address, size, MEM_RELEASE ) assert success, "Failed to read process memory!\n%s" % WinError(GetLastError()) [1] return cbuffer.rawdef transport (data, phandle): size = len (data) memory = allocate (size, phandle) writeMemory (phandle, memory, data) return memorydef get_patch (destination, params_size=0): """mov eax, destination call eax retn params_size """ if isinstance (destination, (int,long)): destination = pack ("i", destination) if isinstance (params_size, (int,long)): params_size = pack ("h", params_size) return '\xb8%s\xff\xd0\xc2%s' % (destination, params_size)def get_cparams_size (cparams): if not cparams: return 0 s = '' for param in cparams: s += "size += sizeof (%s);\n" % param c_code = """int getsize (){ int size = 0; %s return size;}""" % s #DB (c_code) ccompiler = pytcc () ccompiler.compile (c_code) ccompiler.relocate () getsize = ccompiler.get_function ("getsize") size = getsize () # ccompiler.delete () return sizedef get_cparams_size_b (cparams): return sum (map (calcsize, [param._type_ for param in cparams]))def find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000): """find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000):Searches the instructions inside an API for a good place to patch.""" # DEBUG if DEBUG == 2: bytes = PyGetString (apiaddress, needed_size * 2) dprint (apiaddress, bytes) # # # # aoffset = 0 found_space = 0 position = apiaddress while found_space < needed_size: bytes = PyGetString (position, 24) # DB ("found_space: %s. aoffset: %s. apiaddress: %s." % (found_space, aoffset, hex(position))) # if does_code_end_function (bytes): raise "Function end found before enough space was found!" offset, size, instruction, hexstr = Decode (position, bytes) [0] if "ret" in instruction.lower (): raise "Function end found before enough space was found!" if not filter (lambda x:x.lower() in instruction.lower(), ["call", "jmp"]): found_space += size else: found_space = 0 aoffset += size if aoffset >= maxscan: raise "Maxscan exceeded while searching for a good spot to patch!" position += size return apiaddress + (aoffset - found_space)class patcher: source = None destination = None jmp_asm = None original_bytes = None params_size = 0 pid = None phandle = None duplicate_api = None original_api = None def __init__ (self, source=None, destination=None, params_size=0, pid=GetCurrentProcessId () ): self.set_pid (pid) self.set_source (source) self.set_destination (destination) self.set_params_size (params_size) def set_pid (self, pid): self.close () self.phandle = OpenProcess (pid) self.pid = pid def set_source (self, source): self.source = source def set_destination (self, destination): self.destination = destination def set_params_size (self, size): self.params_size = size def set_source_as_api (self, apiname, dllname="kernel32.dll", free=True): module = LoadLibrary (dllname) procedure = GetProcAddress (module, apiname) if free: FreeLibrary (module) assert procedure self.original_api = eval ("windll.%s.%s" % (dllname.strip(".dll"), apiname)) self.source = find_good_spot_to_patch (procedure, len (get_patch (0, self.params_size))) if DEBUG: DB ("found good spot to patch: %s %s. Offset from original api address: %s." \ %(self.source, hex (self.source), self.source - procedure)) def patch (self): assert all ((self.phandle, self.source, self.destination)), "Patch source or destination not set!" assert not self.original_bytes, "Already patched!" self.jmp_asm = get_patch (self.destination, self.params_size) jmp_asm_size = len (self.jmp_asm) self.original_bytes = PyGetString (self.source, jmp_asm_size) assert self.original_bytes, "Failed to capture original_bytes." writeMemory (\ phandle=self.phandle, address=self.source, data=self.jmp_asm) msg = "[jmp_asm]:%s\n[jmp_asm_size]:%s\n[original_bytes]:%s\n" \ % (repr (self.jmp_asm), jmp_asm_size, repr (self.original_bytes)) DB (msg) def unpatch (self): if not self.original_bytes: raise "Not patched!" assert all ((self.phandle, self.source, self.destination)), "Not initialized!" writeMemory (\ phandle=self.phandle, address=self.source, data=self.original_bytes ) self.original_bytes = None def close (self): if self.phandle: windll.kernel32.CloseHandle (self.phandle) self.phandle = None def release (self): if self.phandle and self.duplicate_api: releaseMemory (self.duplicate_api, 0, self.phandle) def call_original_api (self, *args, **kwargs): return self.original_api (*args, **kwargs) def call_duplicate_api (self, types, *args, **kwargs): return WINFUNCTYPE (c_void_p, types) (self.duplicate_api) (*args, **kwargs) def __del__ (self): try:self.unpatch () except:pass try:self.release () except:pass try:self.close () except:passdef dprint (a, c): """Pretty prints disassembled bytes. dprint (offset, bytes).""" x = Decode (a, c) print "[deci addr : hexi addr] [size] instruction\n" for offset, size, instruction, hexstr in x: print "[%s : %s] [%s] %s" % (a,hex (a), size, instruction) a += size print#cad# tramper.py# Relocates bytes of an API and creates a jump from those bytes to the original API affectively negating a hook.# TODO !Recalculate Relocated Relative jmp and call addresses. # public domain code.from ctypes import *from win32api import *from pytcc import pytccfrom struct import pack, unpackfrom win32gui import PyGetString, PySetMemory, PySetStringfrom win32con import MEM_COMMIT, MEM_RESERVE, PAGE_EXECUTE_READWRITE, PROCESS_ALL_ACCESSfrom distorm import Decodefrom patcher import OpenProcess, readMemory, writeMemory, allocate, transportDEBUG = Truedef DB (msg): global DEBUG if DEBUG: print (msg)def tramper (apiaddress, hook_size, apiname=None, dllname="kernel32"): """tramper (apiaddress, hook_size, apiname=None, dllname="kernel32"):Creates a duplicate API using the trampoline method and returns its address. """ if DEBUG: global hprocess, landing_offset, instructions, landing_address, tramp_memory, tramp_code, original_bytes if not apiaddress: dll = LoadLibrary (dllname) apiaddress = GetProcAddress (dll, apiname) landing_offset = 0 hprocess = OpenProcess () original_bytes = PyGetString (apiaddress, 300) tramp_memory = allocate (len (original_bytes) + 50, hprocess) print "Tramp memory: %s %s." % (tramp_memory, hex (tramp_memory)) instructions = Decode (apiaddress, original_bytes) sizes = iter ([X[1] for X in instructions]) while landing_offset < hook_size: landing_offset += sizes.next () landing_address = apiaddress + landing_offset DB ("Landing offset : %s %s" % (landing_offset, hex (landing_offset))) DB ("Landing address: %s %s" % (landing_address, hex (landing_address))) distance = landing_address - (tramp_memory +landing_offset) DB ("Distance: %s %s." % (distance, hex (distance))) tramp_code = original_bytes [:landing_offset] # api start - past hook - to start of instruction instructions = Decode (apiaddress, tramp_code) boffset = 0 for offset, size, instruction, hexstr in instructions: if filter (lambda x:x.lower() in instruction.lower(), ["call", "jmp"]): raise "[not supported yet] Cannot relocate CALL/JMP Instructions. Address: %s"% (apiaddress + boffset) boffset += size # # TODO !Recalculate Relocated Relative jmp and call addresses. # jump_code = '\xe9' + pack ("i", distance - 5) # bytes = jmp (distance - size of jump) tramp_code += jump_code # DEBUG DB ("Tramp [size]: %s [bytes]; %s" % (len(tramp_code), (repr(tramp_code)))) DB ("Tramper api decode.") if DEBUG: dprint (apiaddress, tramp_code) # # # # writeMemory (hprocess, tramp_memory, tramp_code) CloseHandle (hprocess) return tramp_memorydef dprint (a, c): """ pretty print disassembled bytes. dprint (offset, bytes).""" x = Decode (a, c) print "[deci addr : hexi addr] [size] instruction\n" for offset, size, instruction, hexstr in x: print "[%s : %s] [%s] %s" % (a,hex (a), size, instruction) a += sizeif __name__ == "__main__": # Test. lib = LoadLibrary ("kernel32") OpenProcessAddr = GetProcAddress (lib, "OpenProcess") FreeLibrary (lib) trampAddr = tramper (\ apiaddress=OpenProcessAddr, # (optional if apiname is defined) API address to duplicate. hook_size=10, # size of our API jmp code. (minimum size of relocated API bytes) apiname=None, # (optional) dllname="kernel32") # (optional / defaults to kernel32) # Prototype the OpenProcess trampoline. duplicate_OpenProcess = WINFUNCTYPE (c_int, c_int, c_int, c_int) (trampAddr) pid = GetCurrentProcessId () print "Calling duplicate OpenProcess with pid: %s" % pid phandle = duplicate_OpenProcess (0x1f0fff, 0, pid) print "Return value: %s." %phandle if phandle: CloseHandle (phandle)#cad# hooker.py# deals with hooking of win32 APIs.# public domain code.from patcher import *from tramper import tramperfrom win32api import *from pytcc import pytccdef create_hook (duplicate_api, cparam_types='', prelogic="", postlogic="", restype="int"): """ create_hook (pat, duplicate_api, cparam_types='', prelogic="", postlogic="", restype="int"): """ c_code =\"""%s function (int caller, %s){ %s %s RET = DUPE ( %s ); %s return RET;}""" cargs = '' symbols = '' for arg, char in zip (cparam_types, "ABCDEFGHIJKLMNOPQRSTUVWXYZ"): symbols += "%s, " % char cargs += "%s %s, " % (arg, char) symbols = symbols [:-2] cargs = cargs [:-2] c_code = c_code % (restype, cargs, prelogic, restype, symbols, postlogic) ccompiler = pytcc () ccompiler.add_lib_proc ("msvcrt.dll", "memset") ccompiler.add_symbol ("DUPE", duplicate_api) ccompiler.compile (c_code) ccompiler.relocate () hook = ccompiler.get_symbol ("function") return (c_code, hook)def hooker (apiname, cparam_types=list(), restype="int", prelogic='', postlogic='', pid=GetCurrentProcessId(), dllname="kernel32"): """hooker (apiname, cparam_types=list(), restype="int", prelogic='', postlogic='', pid=GetCurrentProcessId(), dllname="kernel32"): """ pat = patcher () params_size = get_cparams_size (cparam_types) pat.set_params_size (params_size) pat.set_source_as_api (apiname, dllname) hook_size = len (get_patch (pat.destination, pat.params_size)) tramp = tramper (pat.source, hook_size) pat.duplicate_api = tramp hook_ccode, hooks = create_hook (tramp, cparam_types, prelogic, postlogic, restype) pat.c_code = hook_ccode pat.set_destination (hooks) return patif __name__ == '__main__': # Test. hook = hooker (\ # API to hook apiname="OpenProcess", # the DLL the API is in. (defaults to kernel32) dllname="kernel32", # (required) API parameter types. In our hook these get translated to the names A,B,C...respectively. cparam_types=["int", "int", "int"], # (required) the API return type. restype="int", # (optional) this is the code in our hook wich is executed Before the real API. prelogic="if (C==1) {return 1111;}", # (optional) this is the code in our hook wich is executed After the real API. The real API's return value is named RET. postlogic="if (RET) {return 0;}" ) # hook API. # hook automatically unhooks itself and cleans up when it isnt refered to anymore. hook.patch () print "Calling hooked OpenProcess api with process id as 1." ret = windll.kernel32.OpenProcess (0x1f0fff, 0, 1) print "Return value: %s" % ret if ret == 1111: print "This test was sucesful." else: print "Return value is unexpected." # unhook API. # hook.unpatch ()#cadDownload:http://www.rohitab.com/discuss/index.php?app=core&module=attach§ion=attach&attach_id=3110Sursa: API Hooking in Python - rohitab.com - Forums Quote