import base64 import binascii import datetime import hashlib # Disable Signifiy Debugging Logging import logging import struct from io import BytesIO import pefile from signify.authenticode import SignedPEFile from signify.exceptions import ( AuthenticodeParseError, AuthenticodeVerificationError, CertificateVerificationError, SignedPEParseError, SignerInfoParseError, SignerInfoVerificationError, VerificationError, ) from strelka import strelka logger = logging.getLogger("signify") logger.propagate = False # Ref: https://ebourg.github.io/jsign/apidocs/src-html/net/jsign/pe/SectionFlag.html CHARACTERISTICS_DLL = { 0x0020: "HIGH_ENTROPY_VA", 0x0040: "DYNAMIC_BASE", 0x0080: "FORCE_INTEGRITY", 0x0100: "NX_COMPAT", 0x0200: "NO_ISOLATION", 0x0400: "NO_SEH", 0x0800: "NO_BIND", 0x1000: "APPCONTAINER", 0x2000: "WDM_DRIVER", 0x4000: "GUARD_CF", 0x8000: "TERMINAL_SERVER_AWARE", } CHARACTERISTICS_IMAGE = { 0x0001: "RELOCS_STRIPPED", 0x0002: "EXECUTABLE_IMAGE", 0x0004: "LINE_NUMS_STRIPPED", 0x0008: "LOCAL_SYMS_STRIPPED", 0x0010: "AGGRESIVE_WS_TRIM", 0x0020: "LARGE_ADDRESS_AWARE", 0x0040: "16BIT_MACHINE", 0x0080: "BYTES_REVERSED_LO", 0x0100: "32BIT_MACHINE", 0x0200: "DEBUG_STRIPPED", 0x0400: "REMOVABLE_RUN_FROM_SWAP", 0x0800: "NET_RUN_FROM_SWAP", 0x1000: "SYSTEM", 0x2000: "DLL", 0x4000: "UP_SYSTEM_ONLY", 0x8000: "BYTES_REVERSED_HI", } CHARACTERISTICS_SECTION = { 0x00000000: "TYPE_REG", 0x00000001: "TYPE_DSECT", 0x00000002: "TYPE_NOLOAD", 0x00000004: "TYPE_GROUP", 0x00000008: "TYPE_NO_PAD", 0x00000010: "TYPE_COPY", 0x00000020: "CNT_CODE", 0x00000040: "CNT_INITIALIZED_DATA", 0x00000080: "CNT_UNINITIALIZED_DATA", 0x00000100: "LNK_OTHER", 0x00000200: "LNK_INFO", 0x00000400: "LNK_OVER", 0x00000800: "LNK_REMOVE", 0x00001000: "LNK_COMDAT", 0x00004000: "MEM_PROTECTED|NO_DEFER_SPEC_EXC", 0x00008000: "GPREL|MEM_FARDATA", 0x00010000: "MEM_SYSHEAP", 0x00020000: "MEM_PURGEABLE|MEM_16BIT", 0x00040000: "MEM_LOCKED", 0x00080000: "MEM_PRELOAD", 0x00100000: "ALIGN_1BYTES", 0x00200000: "ALIGN_2BYTES", 0x00300000: "ALIGN_4BYTES", 0x00400000: "ALIGN_8BYTES", 0x00500000: "ALIGN_16BYTES", 0x00600000: "ALIGN_32BYTES", 0x00700000: "ALIGN_64BYTES", 0x00800000: "ALIGN_128BYTES", 0x00900000: "ALIGN_256BYTES", 0x00A00000: "ALIGN_512BYTES", 0x00B00000: "ALIGN_1024BYTES", 0x00C00000: "ALIGN_2048BYTES", 0x00D00000: "ALIGN_4096BYTES", 0x00E00000: "ALIGN_8192BYTES", 0x00F00000: "ALIGN_MASK", 0x01000000: "LNK_NRELOC_OVFL", 0x02000000: "MEM_DISCARDABLE", 0x04000000: "MEM_NOT_CACHED", 0x08000000: "MEM_NOT_PAGED", 0x10000000: "MEM_SHARED", 0x20000000: "MEM_EXECUTE", 0x40000000: "MEM_READ", 0x80000000: "MEM_WRITE", } # https://docs.microsoft.com/en-us/windows/win32/api/verrsrc/ns-verrsrc-tagvs_fixedfileinfo FIXED_FILE_INFO_FLAGS = { 0x00000001: "DEBUG", 0x00000010: "INFOINFERRED", 0x00000004: "PATCHED", 0x00000002: "PRERELEASE", 0x00000008: "PRIVATEBUILD", 0x00000020: "SPECIALBUILD", } FIXED_FILE_INFO_OS = { 0x00000000: "UNKNOWN", 0x00000001: "WINDOWS16", 0x00000002: "PM16", 0x00000003: "PM32", 0x00000004: "WINDOWS32", 0x00010000: "DOS", 0x00040000: "NT", 0x00020000: "OS216", 0x00030000: "OS232", } FIXED_FILE_INFO_SUBTYPE = { (0x00000003, 0x00000000): "UNKNOWN", (0x00000003, 0x00000001): "DRV_PRINTER", (0x00000003, 0x00000002): "DRV_KEYBOARD", (0x00000003, 0x00000003): "DRV_LANGUAGE", (0x00000003, 0x00000004): "DRV_DISPLAY", (0x00000003, 0x00000005): "DRV_MOUSE", (0x00000003, 0x00000006): "DRV_NETWORK", (0x00000003, 0x00000007): "DRV_SYSTEM", (0x00000003, 0x00000008): "DRV_INSTALLABLE", (0x00000003, 0x00000009): "DRV_SOUND", (0x00000003, 0x0000000A): "DRV_COMM", (0x00000003, 0x0000000C): "DRV_VERSIONED_PRINTER", (0x00000004, 0x00000000): "UNKNOWN", (0x00000004, 0x00000001): "FONT_RASTER", (0x00000004, 0x00000002): "FONT_VECTOR", (0x00000004, 0x00000003): "FONT_TRUETYPE", } FIXED_FILE_INFO_TYPE = { 0x00000000: "UNKNOWN", 0x00000001: "APP", 0x00000002: "DLL", 0x00000003: "DRV", 0x00000004: "FONT", 0x00000005: "VXD", 0x00000007: "STATIC_LIB", } MAGIC_DOS = { 0x5A4D: "DOS", 0x4D5A: "DOSZM", 0x454E: "NE", 0x454C: "LE", 0x584C: "LX", 0x5A56: "TE", 0x00004550: "NT", } MAGIC_IMAGE = { 0x10B: "32_BIT", 0x20B: "64_BIT", 0x107: "ROM_IMAGE", } VAR_FILE_INFO_LANGS = { 0x0401: "Arabic", 0x0415: "Polish", 0x0402: "Bulgarian", 0x0416: "Portuguese (Brazil)", 0x0403: "Catalan", 0x0417: "Rhaeto-Romanic", 0x0404: "Traditional Chinese", 0x0418: "Romanian", 0x0405: "Czech", 0x0419: "Russian", 0x0406: "Danish", 0x041A: "Croato-Serbian (Latin)", 0x0407: "German", 0x041B: "Slovak", 0x0408: "Greek", 0x041C: "Albanian", 0x0409: "U.S. English", 0x041D: "Swedish", 0x040A: "Castilian Spanish", 0x041E: "Thai", 0x040B: "Finnish", 0x041F: "Turkish", 0x040C: "French", 0x0420: "Urdu", 0x040D: "Hebrew", 0x0421: "Bahasa", 0x040E: "Hungarian", 0x0804: "Simplified Chinese", 0x040F: "Icelandic", 0x0807: "Swiss German", 0x0410: "Italian", 0x0809: "U.K. English", 0x0411: "Japanese", 0x080A: "Spanish (Mexico)", 0x0412: "Korean", 0x080C: "Belgian French", 0x0413: "Dutch", 0x0C0C: "Canadian French", 0x0414: "Norwegian – Bokmal", 0x100C: "Swiss French", 0x0810: "Swiss Italian", 0x0816: "Portuguese (Portugal)", 0x0813: "Belgian Dutch", 0x081A: "Serbo-Croatian (Cyrillic)", 0x0814: "Norwegian – Nynorsk", } VAR_FILE_INFO_CHARS = { 0: "7-bit ASCII", 932: "Japan (Shift – JIS X-0208)", 949: "Korea (Shift – KSC 5601)", 950: "Taiwan (Big5)", 1200: "Unicode", 1250: "Latin-2 (Eastern European)", 1251: "Cyrillic", 1252: "Multilingual", 1253: "Greek", 1254: "Turkish", 1255: "Hebrew", 1256: "Arabic", } COMMON_FILE_INFO_NAMES = { "Assembly Version": "assembly_version", "Build Description": "build_description", "Comments": "comments", "CompanyName": "company_name", "FileDescription": "file_description", "FileVersion": "file_version", "InternalName": "internal_name", "LegalCopyright": "legal_copyright", "LegalTrademarks": "legal_trademarks", "License": "license", "OLESelfRegister": "ole_self_register", "OriginalFilename": "original_filename", "PrivateBuild": "private_build", "ProductName": "product_name", "ProductVersion": "product_version", } def parse_rich(pe): try: if rich_data := pe.parse_rich_header(): rich_dict = { "key": rich_data["key"].hex(), "clear_data": { "data": base64.b64encode(rich_data["clear_data"]), "md5": hashlib.md5(rich_data["clear_data"]).hexdigest(), }, "raw_data": { "data": base64.b64encode(rich_data["raw_data"]), "md5": hashlib.md5(rich_data["raw_data"]).hexdigest(), }, } values = rich_data["values"] rich_dict.update({"info": []}) for i in range(0, len(values), 2): rich_dict["info"].append( { "toolid": values[i] >> 16, "version": values[i] & 0xFFFF, "count": values[i + 1], } ) return rich_dict except pefile.PEFormatError: logging.error("pe_format_error") return def parse_certificates(data): # set up string io as we get data buffer = BytesIO() buffer.write(data) buffer.seek(0) try: pe = SignedPEFile(buffer) try: signed_datas = list(pe.signed_datas) except strelka.ScannerTimeout: raise except Exception: return "no_certs_found" except ( SignedPEParseError, SignerInfoParseError, AuthenticodeParseError, VerificationError, CertificateVerificationError, SignerInfoVerificationError, AuthenticodeVerificationError, ): return "pe_certificate_error" cert_list = [] signer_list = [] counter_signer_list = [] for signed_data in signed_datas: try: certs = signed_data.certificates for cert in certs: asn1 = cert.to_asn1crypto issuer = asn1.issuer.native try: cert_dict = { "country_name": issuer.get("country_name"), "organization_name": issuer.get("organization_name"), "organizational_unit_name": issuer.get( "organizational_unit_name" ), "common_name": issuer.get("common_name"), "serial_number": str(cert.serial_number), "issuer_dn": cert.issuer.dn, "subject_dn": cert.subject.dn, "valid_from": cert.valid_from.isoformat(), "valid_to": cert.valid_to.isoformat(), "signature_algorithim": str( cert.signature_algorithm["algorithm"] ), } cert_list.append(cert_dict) except strelka.ScannerTimeout: raise except Exception: return "exception parsing certificate exception" signer_dict = { "issuer_dn": signed_data.signer_info.issuer.dn, "serial_number": str(signed_data.signer_info.serial_number), "program_name": signed_data.signer_info.program_name, "more_info": signed_data.signer_info.more_info, } # signer information signer_list.append(signer_dict) if signed_data.signer_info.countersigner: if hasattr(signed_data.signer_info.countersigner, "issuer"): counter_signer_issuer_dn = ( signed_data.signer_info.countersigner.issuer.dn ) else: counter_signer_issuer_dn = ( signed_data.signer_info.countersigner.signer_info.issuer.dn ) if hasattr(signed_data.signer_info.countersigner, "serial_number"): counter_signer_sn = ( signed_data.signer_info.countersigner.serial_number ) else: counter_signer_sn = ( signed_data.signer_info.countersigner.signer_info.serial_number ) counter_signer_dict = { "issuer_dn": counter_signer_issuer_dn, "serial_number": str(counter_signer_sn), "signing_time": signed_data.signer_info.countersigner.signing_time.isoformat(), } counter_signer_list.append(counter_signer_dict) except SignedPEParseError: return "no certificate in signed data" security_dict = { "certificates": cert_list, "signers": signer_list, "counter_signers": counter_signer_list, } try: pe.verify() security_dict["verification"] = True except strelka.ScannerTimeout: raise except Exception as e: security_dict["verification"] = False security_dict["verification_error"] = str(e) return security_dict class ScanPe(strelka.Scanner): """Collects metadata from PE files.""" def scan(self, data, file, options, expire_at): extract_overlay = options.get("extract_overlay", False) try: pe = pefile.PE(data=data) if not pe: self.flags.append("pe_load_error") return except pefile.PEFormatError: self.flags.append("pe_format_error") return except AttributeError: self.flags.append("pe_attribute_error") return if rich_dict := parse_rich(pe): if type(rich_dict) is not str: self.event["rich"] = rich_dict else: self.flags.append(rich_dict) if cert_dict := parse_certificates(data): if type(cert_dict) is not str: self.event["security"] = cert_dict else: self.flags.append(cert_dict) self.event["total"] = { "libraries": 0, "resources": 0, "sections": len(pe.sections), "symbols": 0, } self.event["summary"] = {} offset = pe.get_overlay_data_start_offset() if offset and len(data[offset:]) > 0: self.event["overlay"] = {"size": len(data[offset:]), "extracted": False} self.flags.append("overlay") if extract_overlay: # Send extracted file back to Strelka self.emit_file(data[offset:], name="pe_overlay") self.event["overlay"].update({"extracted": True}) if hasattr(pe, "DIRECTORY_ENTRY_DEBUG"): for d in pe.DIRECTORY_ENTRY_DEBUG: try: data = pe.get_data(d.struct.AddressOfRawData, d.struct.SizeOfData) if data.find(b"RSDS") != -1 and len(data) > 24: pdb = data[data.find(b"RSDS") :] self.event["debug"] = { "type": "rsds", "guid": b"%s-%s-%s-%s" % ( binascii.hexlify(pdb[4:8]), binascii.hexlify(pdb[8:10]), binascii.hexlify(pdb[10:12]), binascii.hexlify(pdb[12:20]), ), "age": struct.unpack(" 16: pdb = data[data.find(b"NB10") + 8 :] self.event["debug"] = { "type": "nb10", "created": struct.unpack("