import base64
import binascii
import datetime
import hashlib
# Disable Signifiy Debugging Logging
import logging
import struct
from io import BytesIO
import pefile
from signify.authenticode import SignedPEFile
from signify.exceptions import (
AuthenticodeParseError,
AuthenticodeVerificationError,
CertificateVerificationError,
SignedPEParseError,
SignerInfoParseError,
SignerInfoVerificationError,
VerificationError,
)
from strelka import strelka
logger = logging.getLogger("signify")
logger.propagate = False
# Ref: https://ebourg.github.io/jsign/apidocs/src-html/net/jsign/pe/SectionFlag.html
CHARACTERISTICS_DLL = {
0x0020: "HIGH_ENTROPY_VA",
0x0040: "DYNAMIC_BASE",
0x0080: "FORCE_INTEGRITY",
0x0100: "NX_COMPAT",
0x0200: "NO_ISOLATION",
0x0400: "NO_SEH",
0x0800: "NO_BIND",
0x1000: "APPCONTAINER",
0x2000: "WDM_DRIVER",
0x4000: "GUARD_CF",
0x8000: "TERMINAL_SERVER_AWARE",
}
CHARACTERISTICS_IMAGE = {
0x0001: "RELOCS_STRIPPED",
0x0002: "EXECUTABLE_IMAGE",
0x0004: "LINE_NUMS_STRIPPED",
0x0008: "LOCAL_SYMS_STRIPPED",
0x0010: "AGGRESIVE_WS_TRIM",
0x0020: "LARGE_ADDRESS_AWARE",
0x0040: "16BIT_MACHINE",
0x0080: "BYTES_REVERSED_LO",
0x0100: "32BIT_MACHINE",
0x0200: "DEBUG_STRIPPED",
0x0400: "REMOVABLE_RUN_FROM_SWAP",
0x0800: "NET_RUN_FROM_SWAP",
0x1000: "SYSTEM",
0x2000: "DLL",
0x4000: "UP_SYSTEM_ONLY",
0x8000: "BYTES_REVERSED_HI",
}
CHARACTERISTICS_SECTION = {
0x00000000: "TYPE_REG",
0x00000001: "TYPE_DSECT",
0x00000002: "TYPE_NOLOAD",
0x00000004: "TYPE_GROUP",
0x00000008: "TYPE_NO_PAD",
0x00000010: "TYPE_COPY",
0x00000020: "CNT_CODE",
0x00000040: "CNT_INITIALIZED_DATA",
0x00000080: "CNT_UNINITIALIZED_DATA",
0x00000100: "LNK_OTHER",
0x00000200: "LNK_INFO",
0x00000400: "LNK_OVER",
0x00000800: "LNK_REMOVE",
0x00001000: "LNK_COMDAT",
0x00004000: "MEM_PROTECTED|NO_DEFER_SPEC_EXC",
0x00008000: "GPREL|MEM_FARDATA",
0x00010000: "MEM_SYSHEAP",
0x00020000: "MEM_PURGEABLE|MEM_16BIT",
0x00040000: "MEM_LOCKED",
0x00080000: "MEM_PRELOAD",
0x00100000: "ALIGN_1BYTES",
0x00200000: "ALIGN_2BYTES",
0x00300000: "ALIGN_4BYTES",
0x00400000: "ALIGN_8BYTES",
0x00500000: "ALIGN_16BYTES",
0x00600000: "ALIGN_32BYTES",
0x00700000: "ALIGN_64BYTES",
0x00800000: "ALIGN_128BYTES",
0x00900000: "ALIGN_256BYTES",
0x00A00000: "ALIGN_512BYTES",
0x00B00000: "ALIGN_1024BYTES",
0x00C00000: "ALIGN_2048BYTES",
0x00D00000: "ALIGN_4096BYTES",
0x00E00000: "ALIGN_8192BYTES",
0x00F00000: "ALIGN_MASK",
0x01000000: "LNK_NRELOC_OVFL",
0x02000000: "MEM_DISCARDABLE",
0x04000000: "MEM_NOT_CACHED",
0x08000000: "MEM_NOT_PAGED",
0x10000000: "MEM_SHARED",
0x20000000: "MEM_EXECUTE",
0x40000000: "MEM_READ",
0x80000000: "MEM_WRITE",
}
# https://docs.microsoft.com/en-us/windows/win32/api/verrsrc/ns-verrsrc-tagvs_fixedfileinfo
FIXED_FILE_INFO_FLAGS = {
0x00000001: "DEBUG",
0x00000010: "INFOINFERRED",
0x00000004: "PATCHED",
0x00000002: "PRERELEASE",
0x00000008: "PRIVATEBUILD",
0x00000020: "SPECIALBUILD",
}
FIXED_FILE_INFO_OS = {
0x00000000: "UNKNOWN",
0x00000001: "WINDOWS16",
0x00000002: "PM16",
0x00000003: "PM32",
0x00000004: "WINDOWS32",
0x00010000: "DOS",
0x00040000: "NT",
0x00020000: "OS216",
0x00030000: "OS232",
}
FIXED_FILE_INFO_SUBTYPE = {
(0x00000003, 0x00000000): "UNKNOWN",
(0x00000003, 0x00000001): "DRV_PRINTER",
(0x00000003, 0x00000002): "DRV_KEYBOARD",
(0x00000003, 0x00000003): "DRV_LANGUAGE",
(0x00000003, 0x00000004): "DRV_DISPLAY",
(0x00000003, 0x00000005): "DRV_MOUSE",
(0x00000003, 0x00000006): "DRV_NETWORK",
(0x00000003, 0x00000007): "DRV_SYSTEM",
(0x00000003, 0x00000008): "DRV_INSTALLABLE",
(0x00000003, 0x00000009): "DRV_SOUND",
(0x00000003, 0x0000000A): "DRV_COMM",
(0x00000003, 0x0000000C): "DRV_VERSIONED_PRINTER",
(0x00000004, 0x00000000): "UNKNOWN",
(0x00000004, 0x00000001): "FONT_RASTER",
(0x00000004, 0x00000002): "FONT_VECTOR",
(0x00000004, 0x00000003): "FONT_TRUETYPE",
}
FIXED_FILE_INFO_TYPE = {
0x00000000: "UNKNOWN",
0x00000001: "APP",
0x00000002: "DLL",
0x00000003: "DRV",
0x00000004: "FONT",
0x00000005: "VXD",
0x00000007: "STATIC_LIB",
}
MAGIC_DOS = {
0x5A4D: "DOS",
0x4D5A: "DOSZM",
0x454E: "NE",
0x454C: "LE",
0x584C: "LX",
0x5A56: "TE",
0x00004550: "NT",
}
MAGIC_IMAGE = {
0x10B: "32_BIT",
0x20B: "64_BIT",
0x107: "ROM_IMAGE",
}
VAR_FILE_INFO_LANGS = {
0x0401: "Arabic",
0x0415: "Polish",
0x0402: "Bulgarian",
0x0416: "Portuguese (Brazil)",
0x0403: "Catalan",
0x0417: "Rhaeto-Romanic",
0x0404: "Traditional Chinese",
0x0418: "Romanian",
0x0405: "Czech",
0x0419: "Russian",
0x0406: "Danish",
0x041A: "Croato-Serbian (Latin)",
0x0407: "German",
0x041B: "Slovak",
0x0408: "Greek",
0x041C: "Albanian",
0x0409: "U.S. English",
0x041D: "Swedish",
0x040A: "Castilian Spanish",
0x041E: "Thai",
0x040B: "Finnish",
0x041F: "Turkish",
0x040C: "French",
0x0420: "Urdu",
0x040D: "Hebrew",
0x0421: "Bahasa",
0x040E: "Hungarian",
0x0804: "Simplified Chinese",
0x040F: "Icelandic",
0x0807: "Swiss German",
0x0410: "Italian",
0x0809: "U.K. English",
0x0411: "Japanese",
0x080A: "Spanish (Mexico)",
0x0412: "Korean",
0x080C: "Belgian French",
0x0413: "Dutch",
0x0C0C: "Canadian French",
0x0414: "Norwegian â Bokmal",
0x100C: "Swiss French",
0x0810: "Swiss Italian",
0x0816: "Portuguese (Portugal)",
0x0813: "Belgian Dutch",
0x081A: "Serbo-Croatian (Cyrillic)",
0x0814: "Norwegian â Nynorsk",
}
VAR_FILE_INFO_CHARS = {
0: "7-bit ASCII",
932: "Japan (Shift â JIS X-0208)",
949: "Korea (Shift â KSC 5601)",
950: "Taiwan (Big5)",
1200: "Unicode",
1250: "Latin-2 (Eastern European)",
1251: "Cyrillic",
1252: "Multilingual",
1253: "Greek",
1254: "Turkish",
1255: "Hebrew",
1256: "Arabic",
}
COMMON_FILE_INFO_NAMES = {
"Assembly Version": "assembly_version",
"Build Description": "build_description",
"Comments": "comments",
"CompanyName": "company_name",
"FileDescription": "file_description",
"FileVersion": "file_version",
"InternalName": "internal_name",
"LegalCopyright": "legal_copyright",
"LegalTrademarks": "legal_trademarks",
"License": "license",
"OLESelfRegister": "ole_self_register",
"OriginalFilename": "original_filename",
"PrivateBuild": "private_build",
"ProductName": "product_name",
"ProductVersion": "product_version",
}
def parse_rich(pe):
try:
if rich_data := pe.parse_rich_header():
rich_dict = {
"key": rich_data["key"].hex(),
"clear_data": {
"data": base64.b64encode(rich_data["clear_data"]),
"md5": hashlib.md5(rich_data["clear_data"]).hexdigest(),
},
"raw_data": {
"data": base64.b64encode(rich_data["raw_data"]),
"md5": hashlib.md5(rich_data["raw_data"]).hexdigest(),
},
}
values = rich_data["values"]
rich_dict.update({"info": []})
for i in range(0, len(values), 2):
rich_dict["info"].append(
{
"toolid": values[i] >> 16,
"version": values[i] & 0xFFFF,
"count": values[i + 1],
}
)
return rich_dict
except pefile.PEFormatError:
logging.error("pe_format_error")
return
def parse_certificates(data):
# set up string io as we get data
buffer = BytesIO()
buffer.write(data)
buffer.seek(0)
try:
pe = SignedPEFile(buffer)
try:
signed_datas = list(pe.signed_datas)
except strelka.ScannerTimeout:
raise
except Exception:
return "no_certs_found"
except (
SignedPEParseError,
SignerInfoParseError,
AuthenticodeParseError,
VerificationError,
CertificateVerificationError,
SignerInfoVerificationError,
AuthenticodeVerificationError,
):
return "pe_certificate_error"
cert_list = []
signer_list = []
counter_signer_list = []
for signed_data in signed_datas:
try:
certs = signed_data.certificates
for cert in certs:
asn1 = cert.to_asn1crypto
issuer = asn1.issuer.native
try:
cert_dict = {
"country_name": issuer.get("country_name"),
"organization_name": issuer.get("organization_name"),
"organizational_unit_name": issuer.get(
"organizational_unit_name"
),
"common_name": issuer.get("common_name"),
"serial_number": str(cert.serial_number),
"issuer_dn": cert.issuer.dn,
"subject_dn": cert.subject.dn,
"valid_from": cert.valid_from.isoformat(),
"valid_to": cert.valid_to.isoformat(),
"signature_algorithim": str(
cert.signature_algorithm["algorithm"]
),
}
cert_list.append(cert_dict)
except strelka.ScannerTimeout:
raise
except Exception:
return "exception parsing certificate exception"
signer_dict = {
"issuer_dn": signed_data.signer_info.issuer.dn,
"serial_number": str(signed_data.signer_info.serial_number),
"program_name": signed_data.signer_info.program_name,
"more_info": signed_data.signer_info.more_info,
}
# signer information
signer_list.append(signer_dict)
if signed_data.signer_info.countersigner:
if hasattr(signed_data.signer_info.countersigner, "issuer"):
counter_signer_issuer_dn = (
signed_data.signer_info.countersigner.issuer.dn
)
else:
counter_signer_issuer_dn = (
signed_data.signer_info.countersigner.signer_info.issuer.dn
)
if hasattr(signed_data.signer_info.countersigner, "serial_number"):
counter_signer_sn = (
signed_data.signer_info.countersigner.serial_number
)
else:
counter_signer_sn = (
signed_data.signer_info.countersigner.signer_info.serial_number
)
counter_signer_dict = {
"issuer_dn": counter_signer_issuer_dn,
"serial_number": str(counter_signer_sn),
"signing_time": signed_data.signer_info.countersigner.signing_time.isoformat(),
}
counter_signer_list.append(counter_signer_dict)
except SignedPEParseError:
return "no certificate in signed data"
security_dict = {
"certificates": cert_list,
"signers": signer_list,
"counter_signers": counter_signer_list,
}
try:
pe.verify()
security_dict["verification"] = True
except strelka.ScannerTimeout:
raise
except Exception as e:
security_dict["verification"] = False
security_dict["verification_error"] = str(e)
return security_dict
class ScanPe(strelka.Scanner):
"""Collects metadata from PE files."""
def scan(self, data, file, options, expire_at):
extract_overlay = options.get("extract_overlay", False)
try:
pe = pefile.PE(data=data)
if not pe:
self.flags.append("pe_load_error")
return
except pefile.PEFormatError:
self.flags.append("pe_format_error")
return
except AttributeError:
self.flags.append("pe_attribute_error")
return
if rich_dict := parse_rich(pe):
if type(rich_dict) is not str:
self.event["rich"] = rich_dict
else:
self.flags.append(rich_dict)
if cert_dict := parse_certificates(data):
if type(cert_dict) is not str:
self.event["security"] = cert_dict
else:
self.flags.append(cert_dict)
self.event["total"] = {
"libraries": 0,
"resources": 0,
"sections": len(pe.sections),
"symbols": 0,
}
self.event["summary"] = {}
offset = pe.get_overlay_data_start_offset()
if offset and len(data[offset:]) > 0:
self.event["overlay"] = {"size": len(data[offset:]), "extracted": False}
self.flags.append("overlay")
if extract_overlay:
# Send extracted file back to Strelka
self.emit_file(data[offset:], name="pe_overlay")
self.event["overlay"].update({"extracted": True})
if hasattr(pe, "DIRECTORY_ENTRY_DEBUG"):
for d in pe.DIRECTORY_ENTRY_DEBUG:
try:
data = pe.get_data(d.struct.AddressOfRawData, d.struct.SizeOfData)
if data.find(b"RSDS") != -1 and len(data) > 24:
pdb = data[data.find(b"RSDS") :]
self.event["debug"] = {
"type": "rsds",
"guid": b"%s-%s-%s-%s"
% (
binascii.hexlify(pdb[4:8]),
binascii.hexlify(pdb[8:10]),
binascii.hexlify(pdb[10:12]),
binascii.hexlify(pdb[12:20]),
),
"age": struct.unpack("