API Hooking in Python

Author: cadaver (cred)

# patcher.py
# handles patching and unpatching of process memory.
# public domain code.

from ctypes import *
from win32api import *
from pytcc import pytcc
from struct import pack, unpack, calcsize
from win32gui import PyGetString, PySetMemory, PySetString
from distorm import Decode

DEBUG = True
def DB (msg):
global DEBUG
if DEBUG: print (msg)

def OpenProcess (pid=GetCurrentProcessId()):

"""Opens a process by pid."""
DB ("[openProcess] pid:%s."%pid)

phandle = windll.kernel32.OpenProcess (\

pid )

assert phandle, "Failed to open process!\n%s" % WinError (GetLastError ()) [1]
return phandle

def readMemory (phandle, address, size):
"""readMemory (address, size, phandle):"""

cbuffer = c_buffer (size)

success = windll.kernel32.ReadProcessMemory (\

0 )

assert success, "Failed to read memory!\n%s" % WinError (GetLastError()) [1]
return cbuffer.raw

def writeMemory (phandle, address=None, data=None):
"""Writes data to memory and returns the address."""
assert data
size = len (data) if isinstance (data, str) else sizeof (data)
cdata = c_buffer (data) if isinstance (data, str) else byref (data)

if not address: address = allocate (size, phandle)

success = windll.kernel32.WriteProcessMemory (\

0 )

assert success, "Failed to write process memory!\n%s" % WinError (GetLastError()) [1]
DB ("[write memory] :%s OK." % address)

return address

def allocate (size, phandle):
"""Allocates memory of size in phandle."""

address = windll.kernel32.VirtualAllocEx (\


assert address, "Failed to allocate memory!\n%s" % WinError(GetLastError()) [1]
DB ("[memory allocation] :%s" % address)

return address

def releaseMemory (address, size, phandle):
"""Releases memory by address."""

return windll.kernel32.VirtualFreeEx (\


assert success, "Failed to read process memory!\n%s" % WinError(GetLastError()) [1]

return cbuffer.raw

def transport (data, phandle):

size = len (data)
memory = allocate (size, phandle)
writeMemory (phandle, memory, data)
return memory

def get_patch (destination, params_size=0):

"""mov eax, destination
call eax
retn params_size

if isinstance (destination, (int,long)): destination = pack ("i", destination)
if isinstance (params_size, (int,long)): params_size = pack ("h", params_size)

return '\xb8%s\xff\xd0\xc2%s' % (destination, params_size)

def get_cparams_size (cparams):

if not cparams: return 0

s = ''

for param in cparams:
s += "size += sizeof (%s);\n" % param
c_code = """
int getsize ()
int size = 0;
return size;
}""" % s

#DB (c_code)
ccompiler = pytcc ()
ccompiler.compile (c_code)
ccompiler.relocate ()
getsize = ccompiler.get_function ("getsize")
size = getsize ()
# ccompiler.delete ()
return size

def get_cparams_size_b (cparams):
return sum (map (calcsize, [param._type_ for param in cparams]))

def find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000):
"""find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000):
Searches the instructions inside an API for a good place to patch."""

if DEBUG == 2:
bytes = PyGetString (apiaddress, needed_size * 2)
dprint (apiaddress, bytes)
# # # #

aoffset = 0
found_space = 0
position = apiaddress

while found_space < needed_size:

bytes = PyGetString (position, 24)
# DB ("found_space: %s. aoffset: %s. apiaddress: %s." % (found_space, aoffset, hex(position)))
# if does_code_end_function (bytes): raise "Function end found before enough space was found!"
offset, size, instruction, hexstr = Decode (position, bytes) [0]

if "ret" in instruction.lower (): raise "Function end found before enough space was found!"

if not filter (lambda x:x.lower() in instruction.lower(), ["call", "jmp"]):
found_space += size
found_space = 0

aoffset += size
if aoffset >= maxscan: raise "Maxscan exceeded while searching for a good spot to patch!"
position += size

return apiaddress + (aoffset - found_space)

class patcher:

source = None
destination = None
jmp_asm = None
original_bytes = None
params_size = 0
pid = None
phandle = None

duplicate_api = None
original_api = None

def __init__ (self,

pid=GetCurrentProcessId () ):

self.set_pid (pid)
self.set_source (source)
self.set_destination (destination)
self.set_params_size (params_size)

def set_pid (self, pid):

self.close ()
self.phandle = OpenProcess (pid)
self.pid = pid

def set_source (self, source): self.source = source
def set_destination (self, destination): self.destination = destination
def set_params_size (self, size): self.params_size = size
def set_source_as_api (self, apiname, dllname="kernel32.dll", free=True):

module = LoadLibrary (dllname)
procedure = GetProcAddress (module, apiname)
if free: FreeLibrary (module)
assert procedure
self.original_api = eval ("windll.%s.%s" % (dllname.strip(".dll"), apiname))

self.source = find_good_spot_to_patch (procedure, len (get_patch (0, self.params_size)))
if DEBUG: DB ("found good spot to patch: %s %s. Offset from original api address: %s." \
%(self.source, hex (self.source), self.source - procedure))

def patch (self):

assert all ((self.phandle, self.source, self.destination)), "Patch source or destination not set!"
assert not self.original_bytes, "Already patched!"

self.jmp_asm = get_patch (self.destination, self.params_size)
jmp_asm_size = len (self.jmp_asm)

self.original_bytes = PyGetString (self.source, jmp_asm_size)
assert self.original_bytes, "Failed to capture original_bytes."

writeMemory (\

msg = "[jmp_asm]:%s\n[jmp_asm_size]:%s\n[original_bytes]:%s\n" \
% (repr (self.jmp_asm), jmp_asm_size, repr (self.original_bytes))
DB (msg)

def unpatch (self):

if not self.original_bytes: raise "Not patched!"
assert all ((self.phandle, self.source, self.destination)), "Not initialized!"

writeMemory (\

data=self.original_bytes )

self.original_bytes = None

def close (self):

if self.phandle:
windll.kernel32.CloseHandle (self.phandle)
self.phandle = None

def release (self):
if self.phandle and self.duplicate_api:
releaseMemory (self.duplicate_api, 0, self.phandle)

def call_original_api (self, *args, **kwargs): return self.original_api (*args, **kwargs)

def call_duplicate_api (self, types, *args, **kwargs):

return WINFUNCTYPE (c_void_p, types) (self.duplicate_api) (*args, **kwargs)

def __del__ (self):

try:self.unpatch ()
try:self.release ()
try:self.close ()

def dprint (a, c):
"""Pretty prints disassembled bytes. dprint (offset, bytes)."""
x = Decode (a, c)

print "[deci addr : hexi addr] [size] instruction\n"

for offset, size, instruction, hexstr in x:

print "[%s : %s] [%s] %s" % (a,hex (a), size, instruction)
a += size



# tramper.py
# Relocates bytes of an API and creates a jump from those bytes to the original API affectively negating a hook.
# TODO !Recalculate Relocated Relative jmp and call addresses.
# public domain code.

from ctypes import *
from win32api import *
from pytcc import pytcc
from struct import pack, unpack
from win32gui import PyGetString, PySetMemory, PySetString

from distorm import Decode
from patcher import OpenProcess, readMemory, writeMemory, allocate, transport

DEBUG = True
def DB (msg):
global DEBUG
if DEBUG: print (msg)

def tramper (apiaddress, hook_size, apiname=None, dllname="kernel32"):
"""tramper (apiaddress, hook_size, apiname=None, dllname="kernel32"):
Creates a duplicate API using the trampoline method and returns its address.

if DEBUG: global hprocess, landing_offset, instructions, landing_address, tramp_memory, tramp_code, original_bytes

if not apiaddress:
dll = LoadLibrary (dllname)
apiaddress = GetProcAddress (dll, apiname)

landing_offset = 0
hprocess = OpenProcess ()
original_bytes = PyGetString (apiaddress, 300)

tramp_memory = allocate (len (original_bytes) + 50, hprocess)
print "Tramp memory: %s %s." % (tramp_memory, hex (tramp_memory))

instructions = Decode (apiaddress, original_bytes)
sizes = iter ([X[1] for X in instructions])

while landing_offset < hook_size:

landing_offset += sizes.next ()

landing_address = apiaddress + landing_offset

DB ("Landing offset : %s %s" % (landing_offset, hex (landing_offset)))
DB ("Landing address: %s %s" % (landing_address, hex (landing_address)))

distance = landing_address - (tramp_memory +landing_offset)
DB ("Distance: %s %s." % (distance, hex (distance)))

tramp_code = original_bytes [:landing_offset] # api start - past hook - to start of instruction
instructions = Decode (apiaddress, tramp_code)

boffset = 0
for offset, size, instruction, hexstr in instructions:

if filter (lambda x:x.lower() in instruction.lower(), ["call", "jmp"]):
raise "[not supported yet] Cannot relocate CALL/JMP Instructions. Address: %s"% (apiaddress + boffset)

boffset += size

# TODO !Recalculate Relocated Relative jmp and call addresses.

jump_code = '\xe9' + pack ("i", distance - 5) # bytes = jmp (distance - size of jump)
tramp_code += jump_code

DB ("Tramp [size]: %s [bytes]; %s" % (len(tramp_code), (repr(tramp_code))))
DB ("Tramper api decode.")
if DEBUG: dprint (apiaddress, tramp_code)
# # # #

writeMemory (hprocess, tramp_memory, tramp_code)
CloseHandle (hprocess)

return tramp_memory

def dprint (a, c):
""" pretty print disassembled bytes. dprint (offset, bytes)."""

x = Decode (a, c)
print "[deci addr : hexi addr] [size] instruction\n"

for offset, size, instruction, hexstr in x:

print "[%s : %s] [%s] %s" % (a,hex (a), size, instruction)
a += size

if __name__ == "__main__":

# Test.

lib = LoadLibrary ("kernel32")
OpenProcessAddr = GetProcAddress (lib, "OpenProcess")
FreeLibrary (lib)

trampAddr = tramper (\

apiaddress=OpenProcessAddr, # (optional if apiname is defined) API address to duplicate.
hook_size=10, # size of our API jmp code. (minimum size of relocated API bytes)
apiname=None, # (optional)
dllname="kernel32") # (optional / defaults to kernel32)

# Prototype the OpenProcess trampoline.
duplicate_OpenProcess = WINFUNCTYPE (c_int, c_int, c_int, c_int) (trampAddr)

pid = GetCurrentProcessId ()

print "Calling duplicate OpenProcess with pid: %s" % pid
phandle = duplicate_OpenProcess (0x1f0fff, 0, pid)
print "Return value: %s." %phandle

if phandle: CloseHandle (phandle)


# hooker.py
# deals with hooking of win32 APIs.
# public domain code.

from patcher import *
from tramper import tramper
from win32api import *
from pytcc import pytcc

def create_hook (duplicate_api, cparam_types='', prelogic="", postlogic="", restype="int"):
""" create_hook (pat, duplicate_api, cparam_types='', prelogic="", postlogic="", restype="int"):

c_code =\
%s function (int caller, %s)
%s RET = DUPE ( %s );
return RET;

cargs = ''
symbols = ''
for arg, char in zip (cparam_types, "ABCDEFGHIJKLMNOPQRSTUVWXYZ"):

symbols += "%s, " % char
cargs += "%s %s, " % (arg, char)

symbols = symbols [:-2]
cargs = cargs [:-2]

c_code = c_code % (restype, cargs, prelogic, restype, symbols, postlogic)
ccompiler = pytcc ()
ccompiler.add_lib_proc ("msvcrt.dll", "memset")
ccompiler.add_symbol ("DUPE", duplicate_api)
ccompiler.compile (c_code)
ccompiler.relocate ()

hook = ccompiler.get_symbol ("function")

return (c_code, hook)

def hooker (apiname, cparam_types=list(), restype="int", prelogic='', postlogic='', pid=GetCurrentProcessId(), dllname="kernel32"):
"""hooker (apiname, cparam_types=list(), restype="int", prelogic='', postlogic='', pid=GetCurrentProcessId(), dllname="kernel32"):

pat = patcher ()

params_size = get_cparams_size (cparam_types)
pat.set_params_size (params_size)

pat.set_source_as_api (apiname, dllname)

hook_size = len (get_patch (pat.destination, pat.params_size))
tramp = tramper (pat.source, hook_size)
pat.duplicate_api = tramp

hook_ccode, hooks = create_hook (tramp, cparam_types, prelogic, postlogic, restype)
pat.c_code = hook_ccode
pat.set_destination (hooks)

return pat

if __name__ == '__main__':

# Test.

hook = hooker (\

# API to hook

# the DLL the API is in. (defaults to kernel32)

# (required) API parameter types. In our hook these get translated to the names A,B,C...respectively.
cparam_types=["int", "int", "int"],

# (required) the API return type.

# (optional) this is the code in our hook wich is executed Before the real API.
prelogic="if (C==1) {return 1111;}",

# (optional) this is the code in our hook wich is executed After the real API. The real API's return value is named RET.
postlogic="if (RET) {return 0;}"

# hook API.
# hook automatically unhooks itself and cleans up when it isnt refered to anymore.
hook.patch ()

print "Calling hooked OpenProcess api with process id as 1."
ret = windll.kernel32.OpenProcess (0x1f0fff, 0, 1)

print "Return value: %s" % ret
if ret == 1111: print "This test was sucesful."
else: print "Return value is unexpected."

# unhook API.
# hook.unpatch ()




Sursa: API Hooking in Python - rohitab.com - Forums

