Skip to content

Instantly share code, notes, and snippets.

@ToxicCrack
Last active December 27, 2024 05:05
Show Gist options
  • Save ToxicCrack/94d7e6ba858bfc6568dcd7cd0b361a19 to your computer and use it in GitHub Desktop.
Save ToxicCrack/94d7e6ba858bfc6568dcd7cd0b361a19 to your computer and use it in GitHub Desktop.
import os
import json
import sys
import concurrent.futures
import requests
import pprint
import shutil
######## INITIAL SETUP ########
# Define the token, download path, and target category
token = "UPDATEME"
# Update with the correct path to the directory Example: "C:/Users/MyAccount/Documents/QuixelZips"
download_path = "UPDATEME"
# Update with the correct path to the ms_asset_categories.json file, not the directory. Example: :C:/Users/MyAccount/Documents/QuixelZips/ms_asset_categoies.json"
json_file_path = "./ms_asset_categories.json"
# Pick a folder to store the download cache at (a text file). Then create the text file cache.txt there. Paste it below. IE: "C:/Users/MyAccount/Documents/Quixel/cache.txt"
cache_file_path = "./cache.txt"
#Download from https://github.com/WAUthethird/quixel-megascans-scripts -> complete_asset_metadata.tar.zst
asset_metadata_path = "UPDATEME/asset_metadata.json"
## Set target download category/categories.
#working: 3d asset, 3d plant, surface, brush, displacement, imperfection, decal
target_category = "3d asset"
#create subdirectories based on the category
save_in_subdirectories = True
#specify the image type for each texture
#albedo, displacement, normal, roughness, metalness, ao, bump, specular, gloss, normalbump, cavity
mime_texture_types = {
"default": "image/jpeg", #image/jpeg or image/x-exr
"displacement": "image/x-exr"
}
#Download HighPoly?
highpoly = False
#Donwload ztool (ZBrush) file?
ztool = False
#Use to overwrite existing cached items. (Example if you want to downlaod a different size texture. Or if they messed up and you had to adjust script to try again.)
overwrite = False
######## SETUP - FINISHED ########
#-----------------------------------------------------------------------------------------------------------------------------------#
######## VARS - DON'T TOUCH ########
headers = {
"Authorization": "Bearer " + token,
}
downloadCount = 0
selectedSize = 0
selectedTier = 0
######## ENDVARS - DON'T TOUCH ########
#-----------------------------------------------------------------------------------------------------------------------------------#
def requestExtendedInfo(asset_id):
url = f"https://quixel.com/v1/assets/{asset_id}/extended"
response = requests.get(url, stream=True)
if response.status_code == 400:
print(f"Error 400: {response.text}")
else:
uassets = response.json()["uasset"]
for asset in uassets:
if (asset["tier"] == selectedTier):
return None ## This is testing and not needed currently
# Function to normalize category strings
def normalize_category(category):
return category.strip().lower().rstrip('s')
# Function to load asset categories from JSON file
def load_asset_categories(json_file):
with open(json_file, 'r') as f:
return json.load(f)
# Function to convert categories to a space-separated string
def categories_to_string(categories, separator=" "):
result = []
if isinstance(categories, dict):
for key, value in categories.items():
if isinstance(value, dict) and value:
subcategories = categories_to_string(value, separator)
if subcategories:
result.append(f"{key}{separator}{subcategories}")
else:
result.append(key)
elif isinstance(value, list):
for item in value:
result.append(normalize_category(item))
else:
result.append(normalize_category(key))
return separator.join(result)
# Load the asset categories from the JSON file
asset_categories_dict = load_asset_categories(json_file_path)
# Print what it's trying to match against
print(f"Trying to match against target category: {target_category}")
def clickDownloadButton(asset_id):
#Not needed publicly yet. For testing
url = "https://quixel.com/v1/assets/xbmobcz/extended"
def get_asset_download_id(asset_id):
url = "https://quixel.com/v1/downloads"
paramToPass = get_asset_payload(asset_id)
trys = 0
paramLen = len(paramToPass["components"])
while(trys < paramLen-1):
trys += 1
download_url_response = requests.post(url, headers=headers, json=paramToPass)
if download_url_response.status_code == 200:
print(f"Found Download URL for {asset_id}")
return download_url_response.json()["id"]
elif download_url_response.status_code == 401:
print("Possible expired token. Please get a new one from https://quixel.com/megascans/home/ and then update the script. \n")
print("If you just ran the script and downloaded stuff prior to this, just re-run the script and try again.")
input("Press any key to quit. \n")
os._exit(0)
else:
print(f"Failed to get asset download url for id: {asset_id}")
response = download_url_response.json()
if(response["code"] == "INVALID_PAYLOAD"):
if("type not found" in response["msg"]):
payload = json.loads(response["msg"].replace(" type not found", ""))
if(payload):
for idx, v in enumerate(paramToPass["components"]):
if(paramToPass["components"][idx]["type"] == payload["type"]):
paramToPass["components"].pop(idx)
print("Removed "+payload["type"]+" from payload. Trying again...")
break
else:
print("DEBUG_ERROR: " + str(response))
return None
else:
print("DEBUG_ERROR: " + str(response))
return None
#os._exit(0)
#return None
def download_asset(download_id, download_directory):
os.makedirs(download_directory, exist_ok=True)
url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=True&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads"
##print(f"Attempting to download from: {url}")
response = requests.get(url, stream=True)
if response.status_code == 400:
print(f"Error 400: {response.text}") # Print the response to see what's causing the issue
attempt_count = 0
delay = 5
max_attempts = 5
while attempt_count < max_attempts:
response = requests.get(url, stream=True)
if response.status_code == 200:
content_disposition = response.headers.get("Content-Disposition")
filename = content_disposition.split("filename=")[-1].strip('"') if content_disposition else download_id
file_path = os.path.join(download_directory, filename)
print(f"### Downloading file: {filename} ###")
try: ##ctrl-q for multi line
with requests.get(url, stream=True) as r:
with open(file_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
#with open(file_path, "wb") as file:
# for chunk in response.iter_content(chunk_size=4096):
# file.write(chunk)
print(f"### File downloaded successfully: {file_path} ###")
return True
except requests.exceptions.ChunkedEncodingError as e:
print(f"Error during download: {e}")
time.sleep(delay)
delay += 5
attempt_count += 1
else:
print(f"Failed to download asset {download_id}, status code: {response.status_code}")
print(f"Response: {response.text}") # Print the response content for more details
return False
print(f"Exceeded maximum retry attempts for asset {download_id}")
return False
match input("Do you want to redownload existing cached downloads?\n"
"Please enter y or n\n"):
case "y":
overwrite = True
case "n":
overwrite = False
# print(overwrite)
# Load cached assets
cached_assets = set()
if os.path.exists(cache_file_path):
with open(cache_file_path, "r") as cache_file:
cached_assets = set(cache_file.read().splitlines())
# Normalize target category for matching
normalized_target_categories = [normalize_category(part) for part in target_category.split("/")]
matching_asset_ids = []
# Check matches for each asset in the loaded categorie s
for asset_id, categories in asset_categories_dict.items():
# Convert the categories to a single string for matching
categories_str = categories_to_string(categories)
categories_path = categories_to_string(categories, "/")
# Check if all parts of target_category exist in the categories string
matches = all(normalize_category(part) in categories_str.lower() for part in normalized_target_categories)
## matches and not in. -> add
## matches and in and overwrite -> add
if matches and asset_id not in cached_assets:
print(f"Asset ID: {asset_id} matches target category: {target_category}")
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path})
elif matches and asset_id in cached_assets and overwrite:
print(f"Asset ID: {asset_id} matches target category: {target_category}")
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path})
if not matching_asset_ids:
print("No new assets found for the target category.")
exit()
# Ask the user for confirmation before downloading
print(f"{len(matching_asset_ids)} assets found.")
downloadCount = len(matching_asset_ids)
confirmation = input(f"Do you want to download these {len(matching_asset_ids)} {target_category} assets? (y/n): ").strip().lower()
if confirmation != "y":
print("Download canceled.")
exit()
# Function to handle downloading for threading
def download_asset_with_id(asset):
asset_id = asset["asset_id"]
path = ""
if(save_in_subdirectories):
path = asset["path"].lower()
download_id = get_asset_download_id(asset_id)
if download_id:
return download_asset(download_id, download_path+path)
else:
print(f"No download id found for {asset_id}.")
return False
def get_asset_payload(asset_id):
asset = [asset for asset in asset_metadata["asset_metadata"].values() if asset["full_metadata"]["id"] == asset_id]
if len(asset) <=0:
print(f"Asset {asset_id} not found in asset_metadata.json!")
return False
asset = asset[0]
if "components" in asset["full_metadata"]: # Find all components for this asset, necessary to explicitly request .exr
type_list = list(set([component["type"] for component in asset["full_metadata"]["components"]]))
else:
type_list = list(set([component["type"] for component in asset["full_metadata"]["maps"]]))
type_list.sort()
asset_components = [{"type": image_map, "mimeType": mime_texture_types[image_map] if image_map in mime_texture_types else mime_texture_types["default"]} for image_map in type_list]
payload = {"asset": asset_id,
"config": {"highpoly": highpoly,
"lowerlod_meshes": True,
"lowerlod_normals": True,
"ztool": ztool,
"brushes": True,
"meshMimeType": "application/x-fbx",
"albedo_lods": True},
"components": asset_components}
return payload
asset_metadata = None
try:
with open(asset_metadata_path, "r", encoding="utf-8") as f:
asset_metadata = json.load(f)
except FileNotFoundError:
print(f"\n\nCouldn't find asset_metadata.json in the directory you selected, {asset_metadata_path}")
input("Press Enter to exit...")
exit(0)
#Open the cache file for appending new downloads
with open(cache_file_path, "a+") as cache_file:
# Use threading for faster downloading
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = {executor.submit(download_asset_with_id, asset): asset for asset in matching_asset_ids}
for future in concurrent.futures.as_completed(futures):
asset = futures[future]
asset_id = asset["asset_id"]
try:
result = future.result()
if result:
downloadCount-=1;
print(f"{downloadCount} remaining items to download.")
# Add the asset to the cache file after successful download
cache_file.write(f"{asset_id}\n")
cache_file.flush()
except Exception as e:
print(f"Error downloading asset {asset_id}: {e}")
import os, re
from glob import glob
import subprocess
#This script checks each zip file recursively if it is valid. If not, it gets removed from the cache.txt and will get deleted
#After that, try downloading again (with re-downloading cached files NO)
download_path = "PATH/TO/QUIXEL/ZIPS"
cache_file_path = "./cache.txt"
print("Checking for almost empty zipfiles...")
for filename in glob(download_path+'**/*__*.zip', recursive=True):
matches = re.search('_([a-zA-Z0-9]*)__', filename)
asset_id = matches.group(1)
print("Removing %s from cache file" % asset_id)
cmd = f"sed -i '/{asset_id}/d' "+cache_file_path
os.system(cmd)
os.remove(filename)
print("Checking for corrupt zipfiles...")
for filename in glob(download_path+'**/*.zip', recursive=True):
result = subprocess.run(['zip', '-T', filename], capture_output=True)
if result.returncode != 0:
matches = re.search('_([a-z0-9]*)_(2|4|8)K', filename)
asset_id = matches.group(1)
print("Zipfile invalid! Removing %s from cache file" % asset_id)
cmd = f"sed -i '/{asset_id}/d' "+cache_file_path
os.system(cmd)
os.remove(filename)
@PolyShifter
Copy link

Some small points that would help improve this gist.

Join paths correctly

L259:
return download_asset(download_id, os.path.join(download_path, path))

Name the zip the name of the asset

L151 the function should include asset_id for an argument, and when the function is called on L259, you need to add asset_id there as well.
Then between L171 and L172:

            asset = asset_metadata["asset_metadata"].get(asset_id)
            if asset:
                asset_name = asset["name"].replace(" ", "_")
                ext = os.path.splitext(filename)[-1]
                base_path = os.path.dirname(filename)
                filename = os.path.join(base_path, asset_name + ext)

Integrate zipfile checking during download

Add the check_zip_file function:

def check_zip_file(file_path):
    the_zip_file = zipfile.ZipFile(file_path)
    result = the_zip_file.testzip()
    if result is not None:
        print("First bad file in zip: %s" % result)

Then check it in your download_asset function in the try block. Looking for the specific exception. Example of said function with zipfile checking:

def download_asset(download_id, download_directory):
    lower_target_category = target_category.lower().replace(' ', '_')
    download_directory = os.path.join(download_directory, lower_target_category)
    os.makedirs(download_directory, exist_ok=True)

    url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=true&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads"

    print(f"Attempting to download from: {url}")
    
    response = requests.get(url, stream=True)
    
    if response.status_code == 400:
        print(f"Error 400: {response.text}")  # Print the response to see what's causing the issue

    attempt_count = 0
    delay = 5
    max_attempts = 5

    while attempt_count < max_attempts:
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            content_disposition = response.headers.get("Content-Disposition")
            filename = content_disposition.split("filename=")[-1].strip('"') if content_disposition else download_id
            file_path = os.path.join(download_directory, filename)

            print(f"Downloading file: {file_path}")

            try:
                with open(file_path, "wb") as file:
                    for chunk in response.iter_content(chunk_size=8192):
                        file.write(chunk)
                check_zip_file(file_path.replace('\\', '/'))
                print(f"File downloaded successfully: {file_path}")
                return True
            except requests.exceptions.ChunkedEncodingError as e:
                print(f"Error during download: {e}")
                time.sleep(delay)
                delay += 5
                attempt_count += 1
            except zipfile.BadZipFile as e:
                print("####### Bad zip file found, removing zip file and attempting to redownload...")
                os.remove(file_path)
        else:
            print(f"Failed to download asset {download_id}, status code: {response.status_code}")
            print(f"Response: {response.text}")  # Print the response content for more details
            return False

    print(f"Exceeded maximum retry attempts for asset {download_id}")
    return False

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment