Jump to content
Nytro

process_memory_scanner.py

Recommended Posts

Posted

[h=1]process_memory_scanner.py[/h]By

[h=3]cadaver[/h]

# process_memory_scanner.py  by Cadaver
# this is written for Python 2.7 and tested on windows 7 64bit with 32bit processes.
# it can scan an entire process for a integer value in under half a second.
# it can also scan for any ctype within python ctypes or ctypes.wintypes modules \
# simply by creating an instance of the type and passing it as the target to the Scanner class.
# license: public domain



import gc
from ctypes import *
from time import time, sleep
from ctypes.wintypes import *
from threading import Thread
from struct import calcsize, pack, unpack, Struct
from win32gui import PyGetString, PySetString
from win32process import GetProcessMemoryInfo
from win32api import GetCurrentProcessId, GetCurrentProcess, CloseHandle


from win32con import MEM_FREE, PROCESS_VM_READ, PROCESS_VM_WRITE, PROCESS_QUERY_INFORMATION
from win32con import PAGE_READWRITE, PAGE_WRITECOPY, PAGE_EXECUTE_READWRITE, PAGE_EXECUTE_WRITECOPY
from win32con import PAGE_EXECUTE_READ, PAGE_READONLY, PROCESS_VM_OPERATION, PROCESS_ALL_ACCESS


DEBUG_QUERY = False # print VirtualQueryEx info.
DEBUG_SLICE = False # print Python string slice detailed used for comparison.
DEBUG_EXCEPTIONS = False # print some ignored exceptions that might occure.
DEBUG_ALIGNMENT = False # print alignment info.
DEBUG_COMPARE = False # print compare info
ASSUME_ALIGNMENT = True # assume any non string type could be byte aligned.


ACCESS = PROCESS_VM_READ | PROCESS_VM_OPERATION | PROCESS_VM_WRITE | PROCESS_QUERY_INFORMATION
WRITABLE = PAGE_READWRITE | PAGE_WRITECOPY |PAGE_EXECUTE_READWRITE | PAGE_EXECUTE_WRITECOPY
READABLE = PAGE_EXECUTE_READ | PAGE_EXECUTE_READWRITE | PAGE_READONLY | PAGE_READWRITE


MEMORY_STATES = {0x1000: "MEM_COMMIT", 0x10000: "MEM_FREE", 0x2000: "MEM_RESERVE"}
MEMORY_PROTECTIONS = {0x10: "PAGE_EXECUTE", 0x20: "PAGE_EXECUTE_READ", 0x40: "PAGEEXECUTE_READWRITE",
0x80: "PAGE_EXECUTE_WRITECOPY", 0x01: "PAGE_NOACCESS", 0x04: "PAGE_READWRITE", 0x08: "PAGE_WRITECOPY"}
MEMORY_TYPES = {0x1000000: "MEM_IMAGE", 0x40000: "MEM_MAPPED", 0x20000: "MEM_PRIVATE"}




# debug function to time an operation.
def timeit (func):
import time
t1 = time.time ()
ret = func()
print "Time taken:", time.time () - t1
return ret




class MEMORY_BASIC_INFORMATION (Structure):
_fields_ = [
("BaseAddress", c_void_p),
("AllocationBase", c_void_p),
("AllocationProtect", DWORD),
("RegionSize", UINT),
("State", DWORD),
("Protect", DWORD),
("Type", DWORD)
]




class PyMEMORY_BASIC_INFORMATION:

def __init__ (self, MBI):
self.MBI = MBI
self.set_attributes ()


def set_attributes (self):

self.BaseAddress = self.MBI.BaseAddress
self.AllocationBase = self.MBI.AllocationBase
self.AllocationProtect = MEMORY_PROTECTIONS.get (self.MBI.AllocationProtect, self.MBI.AllocationProtect)
self.RegionSize = self.MBI.RegionSize
self.State = MEMORY_STATES.get (self.MBI.State, self.MBI.State)
#self.Protect = self.MBI.Protect # uncomment this and comment next line if you want to do a bitwise check on Protect.
self.Protect = MEMORY_PROTECTIONS.get (self.MBI.Protect, self.MBI.Protect)
self.Type = MEMORY_TYPES.get (self.MBI.Type, self.MBI.Type)





class SYSTEM_INFO(Structure):

_fields_ = [("wProcessorArchitecture", WORD),
("wReserved", WORD),
("dwPageSize", DWORD),
("lpMinimumApplicationAddress", DWORD),
("lpMaximumApplicationAddress", DWORD),
("dwActiveProcessorMask", DWORD),
("dwNumberOfProcessors", DWORD),
("dwProcessorType", DWORD),
("dwAllocationGranularity", DWORD),
("wProcessorLevel", WORD),
("wProcessorRevision", WORD)]


class TARGET:

"""Given a ctype (initialized or not) this coordinates all the information needed to read, write and compare."""

def __init__ (self, ctype):

self.alignment = 1
self.ctype = ctype
# size of target data
self.size = sizeof (ctype)
self.type = ctype._type_

# get the format type needed for struct.unpack/pack.
while hasattr (self.type, "_type_"):
self.type = self.type._type_
# string_buffers and char arrays have _type_ 'c'
# but that makes it slightly slower to unpack
# so swap is for 's'.
if self.type == "c":
self.type = "s"
# calculate byte alignment. this speeds up scanning substantially
# because we can read and compare every alignment bytes
# instead of every single byte.
# although if we are scanning for a string the alignment is defaulted to 1 \
# (im not sure if this is correct).
elif ASSUME_ALIGNMENT:
# calc alignment
divider = 1
for i in xrange (4):
divider *= 2
if not self.size % divider:
self.alignment = divider

# size of target ctype.
self.type_size = calcsize (self.type)
# length of target / array length.
self.length = self.size / self.type_size
self.value = getattr (ctype, "raw", ctype.value)
# the format string used for struct.pack/unpack.
self.format = str (self.length) + self.type
# efficient packer / unpacker for our own format.
self.packer = Struct (self.format)




def get_packed (self):

"""Gets the byte representation of the ctype value for use with WriteProcessMemory."""
return self.packer.pack (self.value)


def __str__ (self):
return str (self.ctype) [:10] + "..." + " <" + str (self.value)[:10]+ "..." + ">"




class LOCK:

"""Locks a value to process memory by writing to it in a loop."""

def __init__ (self, process_handle, address, target, interval=0.001):
self.process_handle = process_handle
self.address = address
self.target = target
self.interval = interval
self.lock_ = True


def thread_lock (self):
self.lock_ = True
process_handle = self.process_handle
address = self.address
target = self.target
interval = self.interval
while self.lock_:
write_target (process_handle, address, target)
sleep (interval)


def stop (self):
self.lock_ = False


def start (self):
self.lock_ = True
Thread (None, self.thread_lock).start ()


def __del__ (self):
self.stop ()




def OpenProcess (pid=GetCurrentProcessId()):

phandle = windll.kernel32.OpenProcess (\
ACCESS, #PROCESS_ALL_ACCESS, # alternative access right for debugging.
False,
pid )

assert phandle, "Failed to open process!\n%s" % WinError (GetLastError ()) [1]
return phandle




def VirtualQueryEx (process_handle, address):

MBI = MEMORY_BASIC_INFORMATION ()
MBI_pointer = byref (MBI)
size = sizeof (MBI)
success = windll.kernel32.VirtualQueryEx (
process_handle,
address,
MBI_pointer,
size )

assert success, "VirtualQueryEx Failed with success == %s.\n%s" % (success, WinError (GetLastError ()) [1])
assert success == size, "VirtualQueryEx Failed because not all data was written."
return PyMEMORY_BASIC_INFORMATION (MBI)



def VirtualProtectEx (process_handle, address, size, new_protection):

old_protection = DWORD ()
old_protection_pointer = byref (old_protection)

success = windll.kernel32.VirtualProtectEx (
process_handle,
address,
size,
new_protection,
old_protection_pointer )

assert success, "VirtualProtectEx Failed with success == %s.\n%s" % (success, WinError (GetLastError ()) [1])
return old_protection.value




def ReadMemory (process_handle, address, size):

cbuffer = c_buffer (size)
success = windll.kernel32.ReadProcessMemory (
process_handle,
address,
cbuffer,
size,
0 )

assert success, "ReadMemory Failed with success == %s and address == %s and size == %s.\n%s" % (success, address, size, WinError (GetLastError()) [1])
return cbuffer.raw




def WriteMemory (process_handle, address, data):

"""Writes string data to process and returns success."""

size = len (data)

success = windll.kernel32.WriteProcessMemory (\
process_handle,
address,
data,
size,
0 )

assert success, "Failed to write process memory!\n%s" % WinError (GetLastError()) [1]
return success




def write_target (process_handle, address, target):

# target can be a ctype initialized with value.
"""Writes TARGET instance to address."""

if not isinstance (target, TARGET):
target = TARGET (target)

return WriteMemory (process_handle, address, target.get_packed())


def read_target (process_handle, address, target):

# target can be a ctype initialized with value.
"""Reads TARGET instance from address and returns the unpacked python equivalent."""

if not isinstance (target, TARGET):
target = TARGET (target)

bytes_ = ReadMemory (process_handle, address, target.size)
return target.packer.unpack (bytes_) [0]




def scan_page (process_handle, page_address, target):

# target must be TARGET instance. eg TARGET (c_int (123)).
"""Scans the entire page for TARGET instance and returns the next page address and found addresses."""

information = VirtualQueryEx (process_handle, page_address)
base_address = information.BaseAddress
region_size = information.RegionSize
next_region = base_address + region_size
size = target.size
format_ = target.format
target_value = target.value
step = target.alignment
unpacker = target.packer.unpack
found = []

# Filter out any pages that are not readable by returning the next_region address
# and an empty list to represent no addresses found.
if information.Type != "MEM_PRIVATE" or \
region_size < size or \
information.State != "MEM_COMMIT" or \
information.Protect not in ["PAGE_EXECUTE_READ", "PAGEEXECUTE_READWRITE", "PAGE_READWRITE"]:
return next_region, []

if DEBUG_QUERY:
print vars (information)

# read the whole page into buffer.
page_bytes = ReadMemory (process_handle, base_address, region_size)

# iterate over the page buffer bytes in steps of alignment
# and unpack those bytes into the format described by
# TARGET instance, then compare with TARGET.
for i in xrange (0, (region_size - size), step):

partial = page_bytes [i:i + size]

if unpacker (partial) [0] == target_value:

found.append (base_address + i)

if DEBUG_SLICE:
print "Found at address: %s i: %s Partial: %s Len_Partial: %s Alignment: %s." \
% (base_address+i, i, partial, len(partial), step)
if DEBUG_COMPARE:
print "Unpacked: %s Partial: %s" % (repr(unpacker (partial)[0]), repr(target_value))

del page_bytes # free the buffer
return next_region, found


def scan_addresses (process_handle, addresses, target):

"""Scans addresses and returns a dictionary of {address: value}."""

values = {}

for address in addresses:

try:
value = read_target (process_handle, address, target)
except Exception, error:
# the process may of freed the page while we are scanning.
if DEBUG_EXCEPTIONS:

raise
continue

values [address] = value
return values




def scan_addresses_for_target (process_handle, addresses, target):

"""Scan addresses and returns a list of those addresses that match the target value."""

target_value = target.value
found = list ()

values = scan_addresses (process_handle, addresses, target)

for address, value in values.items ():

if value == target_value:
found.append (address)

return found




def scan_addresses_for_change (process_handle, addresses, target):

"""scan addresses and returns a list of those addresses that don't match the target value."""

target_value = target.value
found = list ()

values = scan_addresses (process_handle, addresses, target)

for address, value in values.items ():

if value != target_value:
found.append (address)

return found




def scan_process (process_handle, target):

"""scans a processes pages for the target value."""

if not isinstance (target, TARGET):
target = TARGET (target)

if DEBUG_ALIGNMENT:
print "Target: %s Size: %s Alignment: %s." % (target.ctype, target.size, target.alignment)

si = SYSTEM_INFO ()
psi = byref (si)
windll.kernel32.GetSystemInfo (psi)
# get the first address of the first page to scan so we know where to start scanning
base_address = si.lpMinimumApplicationAddress
# get the last address to scan so we know when to stop scanning.
max_address = si.lpMaximumApplicationAddress
found = list ()
page_address = base_address

while page_address < max_address:

next_page, f = scan_page (process_handle, page_address, target)
found.extend (f)
page_address = next_page

if len (found) >= 60000000:
print "[Warning] Scan ended early because too many addresses were found to hold the target data."
break


gc.collect ()
return found




class Scanner:

def __init__ (self, pid, ctype=None):

self.pid = pid
self.process_handle = OpenProcess (pid)
self.target = None
self.found = list ()
self.locks = list ()
if ctype:
self.set_target (ctype)


def set_target (self, ctype):

self.target = TARGET (ctype)


def scan_process(self):

self.found = scan_process (self.process_handle, self.target)
print len (self.found), "Addresses with value %s found." % (self.target)


def scan_for_change (self):

self.found = scan_addresses_for_change (self.process_handle, self.found, self.target)
print len (self.found), "Addresses with NOT value %s found." % (self.target)


def scan_for_new_target (self, ctype):

target = TARGET (ctype)
self.found = scan_addresses_for_target (self.process_handle, self.found, target)
print len (self.found), "Addresses with new value %s found." % (self.target)


def scan_addresses (self, addresses=None):

if not addresses:
addresses = self.found

return scan_addresses (self.process_handle, addresses, self.target)


def write_target (self, address, target):

return write_target (self.process_handle, address, target)


def read_target (self, address, target):

return read_target (self.process_handle, address, target)


def lock_target (self, address, target):

if not isinstance (target, TARGET):
target = TARGET (target)

lock = LOCK (self.process_handle, address, target)
lock.start ()
self.locks.append (lock)


def stop_locks (self):

for lock in self.locks:
lock.stop ()

self.locks = list ()


def __del__ (self):

del self.found

if self.process_handle:
CloseHandle (self.process_handle)



def test ():

global PID, p, s, a, na
PID = GetCurrentProcessId ()
#p = OpenProcess ()

s = Scanner (PID, create_string_buffer ("teststringthing"))
# s = Scanner (PID, c_int (123))
print "Scanning process."
timeit (s.scan_process)
a = s.found [0]
print "Locking found[0]."
s.lock_target (a, create_string_buffer ("new str"))
print "Writing target."
s.write_target (a, create_string_buffer ("new str 2"))
print "Scanning for changes."
s.scan_for_change ()
print "Scanning for new target."
s.scan_for_new_target (c_int (1))
print "Scanning addresses."

s.found = [a]
na=s.scan_addresses ()
print "Reading target."
r = s.read_target (a, c_int)
print "Writing target."
s.write_target (a, create_string_buffer ("test"))
s.write_target (a, c_double (88.8))
s.write_target (a, c_int (123))
s.write_target (a, c_float (1.2))
s.stop_locks ()



Scanner.__doc__ = """
class Scanner:
This is the main scanner class that attempts to provide a tidy API for scanning a remote process for values.
def __init__ (self, pid, ctype=None):

This is the initializer that takes the process iD and any ctype from the ctype module or wintype
from the wintype module as the type and value we want to scan for
eg c_int (1) - if we want to search for a integer of value 1, or
create_string_buffer ("test") - if we want to search for a string of value 'test'.

def set_target (self, ctype):
Sets the ctype (with value) of the data we want to search for.

def scan_process(self):
Scans the process for target and saves the found addresses in self.found.

def scan_for_change (self):
Scans the addresses in self.found for any value that isnt the target value
and saves these new addresses in self.found.

def scan_for_new_target (self, ctype):
Scans the addresses in self.found for a new value and returns the ones that match.

def scan_addresses (self, addresses=None):
Scans the addresses and returns a dictionary of {address, value}.

def write_target (self, address, target):
Writes the data described by target to the address.

def read_target (self, address, target):
Reads the data described by target from address.

def lock_target (self, address, target):
Locks address to target value.

def stop_locks (self):
Terminates all current locks.
"""

if __name__ == "__main__":
print Scanner.__doc__
raw_input ("[paused]")


This is written in Python 2.7 and requires Pythons win32 extensions.

Sursa: process_memory_scanner.py - rohitab.com - Forums

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.



×
×
  • Create New...