~blowry/steamarchiver

1e13b541aa9275642996c49d81df2714acbd9a41 — Benjamin Lowry a month ago a310b12
use the Chunkstore class for all operations on chunkstores
4 files changed, 115 insertions(+), 93 deletions(-)

A chunkstore.py
M depot_extractor.py
M pack_sis.py
M unpack_sis.py
A chunkstore.py => chunkstore.py +62 -0
@@ 0,0 1,62 @@
#!/usr/bin/env python3
from binascii import hexlify, unhexlify
from os import path
from struct import iter_unpack, pack
from sys import argv

class Chunkstore():
    def __init__(self, filename, depot=None, is_encrypted=None):
        filename = filename.replace(".csd","").replace(".csm","")
        self.csmname = filename + ".csm"
        self.csdname = filename + ".csd"
        self.chunks = {}
        if path.exists(self.csdname) and path.exists(self.csmname):
            with open(self.csmname, "rb") as csmfile:
                self.csm = csmfile.read()
                if self.csm[:4] != b"SCFS":
                    print("not a CSM file: " + (filename + ".csm"))
                    return False
                self.depot = int.from_bytes(self.csm[0xc:0x10], byteorder='little', signed=False)
                self.is_encrypted = (self.csm[0x8:0xa] == b'\x03\x00')
                if is_encrypted != None and self.is_encrypted != is_encrypted:
                    raise Exception("chunkstore " + self.csdname + " already exists and contains " + ("encrypted" if self.is_encrypted else "decrypted") + " chunks")
                if depot != None and self.depot != depot:
                    raise Exception("chunkstore " + self.csdname + " already exists and lists a different depot (" + str(self.depot) + " instead of " + str(depot) + ")")
        elif depot != None and is_encrypted != None:
            self.depot = depot
            self.is_encrypted = is_encrypted
        else:
            raise Exception("Need to specify depot and encryption if file doesn't already exist")
    def __repr__(self):
        return f"Depot {self.depot} (encrypted: {self.is_encrypted}, chunks: {len(self.chunks)}) from CSD file {self.csdname}"
    def unpack(self, unpacker=None):
        if unpacker: assert callable(unpacker)
        self.chunks = {}
        with open(self.csmname, "rb") as csmfile: csm=csmfile.read()[0x14:]
        for sha, offset, _, length in iter_unpack("<20s Q L L", csm):
            self.chunks[sha] = (offset, length)
            if unpacker: unpacker(self, sha, offset, length)
    def write_csm(self):
        # write CSM header
        with open(self.csmname, "wb") as csmfile:
            csmfile.write(b"SCFS\x14\x00\x00\x00")
            if self.is_encrypted:
                csmfile.write(b"\x03\x00\x00\x00")
            else:
                csmfile.write(b"\x02\x00\x00\x00")
            csmfile.write(pack("<L L", self.depot, len(self.chunks)))
            csmfile.seek(0, 2) # make sure we're at the end of the csm file (in case we're writing to an existing csm)
            # iterate over chunks
            for sha, (offset, length) in self.chunks.items():
                csmfile.write(sha)
                csmfile.write(pack("<Q L L", offset, 0, length))
    def get_chunk(self, sha):
        with open(self.csdname, "rb") as csdfile:
            csdfile.seek(self.chunks[sha][0])
            return csdfile.read(self.chunks[sha][1])

if __name__ == "__main__":
    if len(argv) > 1:
        chunkstore = Chunkstore(argv[1])
        chunkstore.unpack()
        print(chunkstore)

M depot_extractor.py => depot_extractor.py +11 -5
@@ 27,7 27,7 @@ if __name__ == "__main__": # exit before we import our shit if the args are wron

from steam.core.manifest import DepotManifest
from steam.core.crypto import symmetric_decrypt
from unpack_sis import Chunkstore
from chunkstore import Chunkstore

if __name__ == "__main__":
    path = "./depots/%s/" % args.depotid


@@ 43,10 43,13 @@ if __name__ == "__main__":
                with open("./depot_keys.txt", "r", encoding="utf-8") as f:
                    for line in f.read().split("\n"):
                        line = line.split("\t")
                        if int(line[0]) == args.depotid:
                            args.depotkey = bytes.fromhex(line[2])
                            manifest.decrypt_filenames(args.depotkey)
                            break
                        try:
                            if int(line[0]) == args.depotid:
                                args.depotkey = bytes.fromhex(line[2])
                                manifest.decrypt_filenames(args.depotkey)
                                break
                        except ValueError:
                            pass
                    if not args.depotkey:
                        print("ERROR: manifest has encrypted filenames, but no depot key was specified and no key for this depot exists in depot_keys.txt")
                        exit(1)


@@ 110,6 113,9 @@ if __name__ == "__main__":
                        else:
                            print("ERROR: chunk %s is encrypted, but no depot key was specified" % chunkhex)
                            exit(1)
                    else:
                        decrypted = chunk_data
                        chunk_data = None

                else:
                    if exists(path + chunkhex):

M pack_sis.py => pack_sis.py +22 -34
@@ 1,36 1,31 @@
#!/usr/bin/env python3
from argparse import ArgumentParser
from binascii import hexlify, unhexlify
from os import scandir, makedirs
from os import scandir, makedirs, remove
from os.path import exists
from struct import pack, unpack, iter_unpack
from vdf import dumps
from sys import stderr
from chunkstore import Chunkstore

def pack_backup(depot, destdir, decrypted=False, no_update=False):
    csd_target = destdir + "/" + str(depot) + "_depotcache_1.csd"
    csm_target = destdir + "/" + str(depot) + "_depotcache_1.csm"
    depot_dir = "./depots/" + str(depot)
    previous_chunks = []
    existing_number_chunks = 0
    mode = "wb"

    if exists(csm_target) and exists(csd_target) and not no_update:
        with open(csd_target, "rb") as csd, open(csm_target, "rb") as csm:
            if csm.read(8) != b"SCFS\x14\x00\x00\x00":
                print("error: target", csm_target, "already exists and is not a CSM file", file=stderr)
                exit(1)
            if csm.read(1) == 0x03 and decrypted:
                print("error: target", csm_target, "already exists and contains encrypted chunks", file=stderr)
                exit(1)
            csm.seek(0xC)
            existing_depot, existing_number_chunks = unpack("<L L", csm.read(8))
            if existing_depot != depot:
                print("error: target", csm_target, "already exists and lists a different depot", file=stderr)
                exit(1)
            for sha, offset, _, length in iter_unpack("<20s Q L L", csm.read()):
                previous_chunks.append(hexlify(sha).decode())
        mode = "r+b"
    if exists(csm_target) and exists(csd_target):
        if no_update: # don't want to update the old files, delete them
            remove(csd_target)
            remove(csm_target)
            mode = "wb"
        else:
            chunkstore = Chunkstore(csd_target, depot, not decrypted)
            chunkstore.unpack()
            mode = "ab"
    else:
        chunkstore = Chunkstore(csd_target, depot, not decrypted)


    if decrypted:
        chunk_match = lambda chunk: chunk.endswith("_decrypted")


@@ 47,17 42,9 @@ def pack_backup(depot, destdir, decrypted=False, no_update=False):
    chunks = [chunk.name for chunk in scandir(depot_dir) if chunk.is_file()
            and not chunk.name.endswith(".zip")
            and chunk_match(chunk.name)
            and is_hex(chunk.name)
            and not chunk.name in previous_chunks]
    with open(csd_target, mode) as csd, open(csm_target, mode) as csm:
        # write CSM header
        csm.write(b"SCFS\x14\x00\x00\x00")
        if decrypted:
            csm.write(b"\x02\x00\x00\x00")
        else:
            csm.write(b"\x03\x00\x00\x00")
        csm.write(pack("<L L", depot, len(chunks) + existing_number_chunks))
        csm.seek(0, 2) # make sure we're at the end of the csm file (in case we're writing to an existing csm)
            and is_hex(chunk.name.replace("_decrypted",""))
            and not unhexlify(chunk.name.replace("_decrypted","")) in chunkstore.chunks.keys()]
    with open(csd_target, mode) as csd:
        # iterate over chunks
        chunks_added = 0
        for chunk in chunks:


@@ 75,12 62,13 @@ def pack_backup(depot, destdir, decrypted=False, no_update=False):

            # write chunk location to csm
            if decrypted:
                csm.write(unhexlify(chunk.replace("_decrypted","")))
                chunkstore.chunks[unhexlify(chunk.replace("_decrypted",""))] = (offset, length)
            else:
                csm.write(unhexlify(chunk))
            csm.write(pack("<Q L L", offset, 0, length))
                chunkstore.chunks[unhexlify(chunk)] = (offset, length)
            chunks_added += 1
            print(f"depot {depot}: added chunk {chunk} ({chunks_added}/{len(chunks)})")
        print("writing index...")
        chunkstore.write_csm()
        print("packed", len(chunks), "chunk" if len(chunks) == 1 else "chunks")
        csd.seek(0, 2)
        return csd.tell()


@@ 91,7 79,7 @@ if __name__ == "__main__":
    parser.add_argument("-d", dest="depots", metavar=('depot', 'manifest'), action="append", type=int, help="Depot ID to pack, can be used multiple times. Include a manifest ID too if generating an sku.sis", nargs='+')
    parser.add_argument("-n", dest="name", default="steamarchiver backup", type=str, help="Backup name")
    parser.add_argument("--decrypted", action='store_true', help="Use decrypted chunks to pack backup", dest="decrypted")
    parser.add_argument("--no-update", action='store_true', help="If an existing backup is found, delete it instead of updating it", dest="no_update")
    parser.add_argument("--no-update", action='store_true', help="If an existing backup is found, DELETE it instead of updating it", dest="no_update")
    parser.add_argument("--destdir", help="Directory to put sis/csm/csd files in", default=".")
    args = parser.parse_args()
    makedirs(args.destdir, exist_ok=True)

M unpack_sis.py => unpack_sis.py +20 -54
@@ 5,67 5,33 @@ from io import BytesIO
from os import path, makedirs
from re import sub
from steam.core.crypto import symmetric_encrypt, symmetric_encrypt_with_iv
from struct import iter_unpack
from struct import iter_unpack, pack
from sys import argv
from vdf import loads

class Chunkstore():
    def __init__(self, filename):
        filename = filename.replace(".csd","").replace(".csm","")
        self.csmname = filename + ".csm"
        self.csdname = filename + ".csd"
        self.chunks = {}
        self.csdfile = None
        self.open()
        with open(self.csmname, "rb") as csmfile:
            self.csm = csmfile.read()
            if self.csm[:4] != b"SCFS":
                print("not a CSM file: " + (filename + ".csm"))
                return False
            self.depot = int.from_bytes(self.csm[0xc:0x10], byteorder='little', signed=False)
            self.is_encrypted = (self.csm[0x8:0xa] == b'\x03\x00')
    def __repr__(self):
        return f"Depot {self.depot} (encrypted: {self.is_encrypted}) from CSD file {self.csdname}"
    def unpack(self, unpacker=None):
        if unpacker: assert callable(unpacker)
        self.open()
        csm = self.csm[0x14:]
        for sha, offset, _, length in iter_unpack("<20s Q L L", csm):
            self.chunks[sha] = (offset, length)
            if unpacker: unpacker(self, sha, offset, length)
    def open(self):
        if not self.csdfile:
            self.csdfile = open(self.csdname, "rb")
    def close(self):
        if self.csdfile:
            self.csdfile.close()
        self.csdfile = None
    def get_chunk(self, sha):
        self.open()
        self.csdfile.seek(self.chunks[sha][0])
        return self.csdfile.read(self.chunks[sha][1])
from chunkstore import Chunkstore

def unpack_chunkstore(target, key=None, key_hex=None):
        if key == True:
            key, key_hex = find_key(depot)
        def unpacker(chunkstore, sha, offset, length):
            print("extracting chunk %s from offset %s in file %s" % (hexlify(sha).decode(), offset, target + ".csd"))
            chunkstore.csdfile.seek(offset)
            if key:
                with open("./depots/%s/%s" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                    print("writing %s bytes re-encrypted using key %s and random IV" % (length, key_hex))
                    f.write(symmetric_encrypt(chunkstore.csdfile.read(length), key))
            elif chunkstore.is_encrypted:
                with open("./depots/%s/%s" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                    print("writing %s bytes encrypted" % length)
                    f.write(chunkstore.csdfile.read(length))
            else:
                with open("./depots/%s/%s_decrypted" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                    print("writing %s bytes unencrypted" % length)
                    f.write(chunkstore.csdfile.read(length))
        chunkstore = Chunkstore(target)
        makedirs("./depots/%s" % chunkstore.depot, exist_ok=True)
        chunkstore.unpack(unpacker)
        with open(chunkstore.csdname, "rb") as csdfile:
            def unpacker(chunkstore, sha, offset, length):
                print("extracting chunk %s from offset %s in file %s" % (hexlify(sha).decode(), offset, target + ".csd"))
                csdfile.seek(offset)
                if key:
                    with open("./depots/%s/%s" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                        print("writing %s bytes re-encrypted using key %s and random IV" % (length, key_hex))
                        f.write(symmetric_encrypt(csdfile.read(length), key))
                elif chunkstore.is_encrypted:
                    with open("./depots/%s/%s" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                        print("writing %s bytes encrypted" % length)
                        f.write(csdfile.read(length))
                else:
                    with open("./depots/%s/%s_decrypted" % (chunkstore.depot, hexlify(sha).decode()), "wb") as f:
                        print("writing %s bytes unencrypted" % length)
                        f.write(csdfile.read(length))
            makedirs("./depots/%s" % chunkstore.depot, exist_ok=True)
            chunkstore.unpack(unpacker)

def find_key(depot):
    if path.exists("./depot_keys.txt"):