User:WindPower/DemParser.py
From the Portal Wiki
More actions
Non-finished, non-functional.
<nowiki># .dem file parser thing
import os
import struct
import cStringIO
class CStreamReader:
def __init__(self, stream):
if type(stream) is type(''):
self.stream = cStringIO.StringIO(stream)
else:
self.stream = stream
self.position = 0
self.streamEnded = False
def seek(self, offset):
self.stream.seek(offset, os.SEEK_CUR)
def rewind(self, offset):
self.seek(-offset)
def skip(self, offset): # Just an alias for seek, really
self.seek(offset)
def isEOF(self):
return self.streamEnded
def stripNull(self, s):
if s.find('\x00') != -1:
s = s[:s.find('\x00')]
return s
def readBinary(self, format):
length = struct.calcsize(format)
self.position += length
contents = self.stream.read(length)
if not contents and length:
print 'EOF reached!'
streamEnded = True
return ''
return struct.unpack(format, contents)
def readRaw(self, length):
return self.readBinary(str(length) + 's')[0]
def readByte(self):
return self.readBinary('c')[0]
def readString(self, length):
return self.stripNull(self.readRaw(length))
def readInt(self):
return self.readBinary('<i')[0]
def readShort(self):
return self.readBinary('<h')[0]
def readFloat(self):
return self.readBinary('f')[0]
def isSpace(self, c):
return c in (' ', '\r', '\n', '\t')
def printByte(self):
pad = lambda s, length: ' ' * (length - len(str(s))) + str(s)
b = self.readByte()
i = ord(b)
decimal = pad(i, 3)
hexadecimal = pad(hex(i), 4)
binary = pad(bin(i), 10)
r = pad(b.__repr__(), 6)
if not i:
r = '\'NULL\''
print 'Byte #' + pad(self.position, 8) + ':', r, '| Decimal:', decimal, '| Hexadecimal:', hexadecimal, '| Binary:', binary
return b
def printNext(self, bytes):
for i in range(bytes):
self.printByte()
class StringTableParser(CStreamReader):
def __init__(self, data, textMode=False):
CStreamReader.__init__(self, data)
self.textMode = textMode
def readVariableString(self, maxLength):
if self.textMode:
self.eatWhiteSpace()
s = ''
read = 0
while not self.isEOF() and read < maxLength:
b = self.readByte()
if self.isSpace(b) or not b or b == '\x00':
break
s += b
read += 1
return s
def eatWhiteSpace(self):
b = self.readByte()
while self.isSpace(b):
b = self.readByte()
self.rewind(1)
def parse(self):
numberOfTables = ord(self.readByte())
print 'Number of tables:', numberOfTables
for table in range(numberOfTables):
print 'Reading table', table
tableName = self.readVariableString(256)
print 'Table name is', tableName
numOfStrings = ord(self.readByte())
self.skip(1) # Skip a null byte here
print 'Number of strings:', numOfStrings
for s in range(numOfStrings):
keyName = self.readVariableString(4096)
print 'Key name:', keyName.__repr__()
hasUserData = self.readByte()
print 'Has user data:', hasUserData.__repr__()
if hasUserData == '\x01':
userDataLength = self.readShort()
print 'Has user data; Length:', userDataLength, 'bytes'
userData = self.readRaw(userDataLength)
print 'User data:', userData.__repr__()
else:
print 'Doesn\'t have user data.'
self.printNext(1024)
raw_input('Done')
class DemParser(CStreamReader):
MAX_OSPATH = 260
dem_signon = '\x01'
dem_packet = '\x02'
dem_synctick = '\x03'
dem_consolecmd = '\x04'
dem_usercmd = '\x05'
dem_datatables = '\x06'
dem_stop = '\x07'
dem_customdata = '\x08'
dem_stringtables = '\x09'
def __init__(self, f):
CStreamReader.__init__(self, open(f, 'rb'))
self.file = f
def readHeader(self):
self.headerInfo = {
'stamp': self.readString(8),
'demoprotocol': self.readInt(),
'networkprotocol': self.readInt(),
'servername': self.readString(DemParser.MAX_OSPATH),
'clientname': self.readString(DemParser.MAX_OSPATH),
'mapname': self.readString(DemParser.MAX_OSPATH),
'gamedirectory': self.readString(DemParser.MAX_OSPATH),
'playback_time': self.readFloat(),
'playback_ticks': self.readInt(),
'playback_frames': self.readInt(),
'signonlength': self.readInt()
}
return self.headerInfo
def readSignOn(self):
print 'Signon length = ' + str(self.headerInfo['signonlength']) + ' bytes; skipping.'
self.skip(self.headerInfo['signonlength'])
def readCommand(self):
command = self.readByte()
tick = self.readInt()
self.readByte() # Wut
reprCommand = hex(ord(command))
print 'NEXT COMMAND:', reprCommand, '@ Tick', tick
if command == DemParser.dem_signon:
self.readSignOn()
return True
elif command == DemParser.dem_stringtables:
length = self.readInt()
print 'String table length:', length, 'bytes'
tableData = self.readRaw(length)
tables = StringTableParser(tableData)
print tables.parse()
return False
elif command == DemParser.dem_consolecmd:
length = self.readInt()
print 'Console command:', self.readString(length)
return True
elif command == DemParser.dem_packet:
self.skip(160) # Length of IPv4 packet header
length = self.readInt()
print 'Packet data length:', length, 'bytes. Skipping.'
data = self.readRaw(length)
if data.find('.wav') != -1:
raw_input('Contains .wav!')
return True
elif command == DemParser.dem_usercmd:
self.skip(4)
length = self.readInt()
data = self.readRaw(length)
print 'User command:', data.__repr__()
if data.find('.wav') != -1:
raw_input('Contains .wav!')
return True
elif command == DemParser.dem_stop:
print 'End of demo reached.'
return False
def readPacket(self):
packetLength = self.readInt()
print 'Packet length:', packetLength
packet = self.readRaw(packetLength)
print 'Packet contents:', packet
p = DemParser(r'D:\steamapps\common\portal 2\portal2\p2itch_6.dem')
print p.readHeader()
while p.readCommand():
pass
p.printNext(256)</nowiki>