Last active
December 27, 2024 05:05
-
-
Save ToxicCrack/94d7e6ba858bfc6568dcd7cd0b361a19 to your computer and use it in GitHub Desktop.
download megascans purchased assets for https://gist.github.com/maalrron/877b2edb23cc5d99d6a6b4c22f708e58
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import sys | |
import concurrent.futures | |
import requests | |
import pprint | |
import shutil | |
######## INITIAL SETUP ######## | |
# Define the token, download path, and target category | |
token = "UPDATEME" | |
# Update with the correct path to the directory Example: "C:/Users/MyAccount/Documents/QuixelZips" | |
download_path = "UPDATEME" | |
# Update with the correct path to the ms_asset_categories.json file, not the directory. Example: :C:/Users/MyAccount/Documents/QuixelZips/ms_asset_categoies.json" | |
json_file_path = "./ms_asset_categories.json" | |
# Pick a folder to store the download cache at (a text file). Then create the text file cache.txt there. Paste it below. IE: "C:/Users/MyAccount/Documents/Quixel/cache.txt" | |
cache_file_path = "./cache.txt" | |
#Download from https://github.com/WAUthethird/quixel-megascans-scripts -> complete_asset_metadata.tar.zst | |
asset_metadata_path = "UPDATEME/asset_metadata.json" | |
## Set target download category/categories. | |
#working: 3d asset, 3d plant, surface, brush, displacement, imperfection, decal | |
target_category = "3d asset" | |
#create subdirectories based on the category | |
save_in_subdirectories = True | |
#specify the image type for each texture | |
#albedo, displacement, normal, roughness, metalness, ao, bump, specular, gloss, normalbump, cavity | |
mime_texture_types = { | |
"default": "image/jpeg", #image/jpeg or image/x-exr | |
"displacement": "image/x-exr" | |
} | |
#Download HighPoly? | |
highpoly = False | |
#Donwload ztool (ZBrush) file? | |
ztool = False | |
#Use to overwrite existing cached items. (Example if you want to downlaod a different size texture. Or if they messed up and you had to adjust script to try again.) | |
overwrite = False | |
######## SETUP - FINISHED ######## | |
#-----------------------------------------------------------------------------------------------------------------------------------# | |
######## VARS - DON'T TOUCH ######## | |
headers = { | |
"Authorization": "Bearer " + token, | |
} | |
downloadCount = 0 | |
selectedSize = 0 | |
selectedTier = 0 | |
######## ENDVARS - DON'T TOUCH ######## | |
#-----------------------------------------------------------------------------------------------------------------------------------# | |
def requestExtendedInfo(asset_id): | |
url = f"https://quixel.com/v1/assets/{asset_id}/extended" | |
response = requests.get(url, stream=True) | |
if response.status_code == 400: | |
print(f"Error 400: {response.text}") | |
else: | |
uassets = response.json()["uasset"] | |
for asset in uassets: | |
if (asset["tier"] == selectedTier): | |
return None ## This is testing and not needed currently | |
# Function to normalize category strings | |
def normalize_category(category): | |
return category.strip().lower().rstrip('s') | |
# Function to load asset categories from JSON file | |
def load_asset_categories(json_file): | |
with open(json_file, 'r') as f: | |
return json.load(f) | |
# Function to convert categories to a space-separated string | |
def categories_to_string(categories, separator=" "): | |
result = [] | |
if isinstance(categories, dict): | |
for key, value in categories.items(): | |
if isinstance(value, dict) and value: | |
subcategories = categories_to_string(value, separator) | |
if subcategories: | |
result.append(f"{key}{separator}{subcategories}") | |
else: | |
result.append(key) | |
elif isinstance(value, list): | |
for item in value: | |
result.append(normalize_category(item)) | |
else: | |
result.append(normalize_category(key)) | |
return separator.join(result) | |
# Load the asset categories from the JSON file | |
asset_categories_dict = load_asset_categories(json_file_path) | |
# Print what it's trying to match against | |
print(f"Trying to match against target category: {target_category}") | |
def clickDownloadButton(asset_id): | |
#Not needed publicly yet. For testing | |
url = "https://quixel.com/v1/assets/xbmobcz/extended" | |
def get_asset_download_id(asset_id): | |
url = "https://quixel.com/v1/downloads" | |
paramToPass = get_asset_payload(asset_id) | |
trys = 0 | |
paramLen = len(paramToPass["components"]) | |
while(trys < paramLen-1): | |
trys += 1 | |
download_url_response = requests.post(url, headers=headers, json=paramToPass) | |
if download_url_response.status_code == 200: | |
print(f"Found Download URL for {asset_id}") | |
return download_url_response.json()["id"] | |
elif download_url_response.status_code == 401: | |
print("Possible expired token. Please get a new one from https://quixel.com/megascans/home/ and then update the script. \n") | |
print("If you just ran the script and downloaded stuff prior to this, just re-run the script and try again.") | |
input("Press any key to quit. \n") | |
os._exit(0) | |
else: | |
print(f"Failed to get asset download url for id: {asset_id}") | |
response = download_url_response.json() | |
if(response["code"] == "INVALID_PAYLOAD"): | |
if("type not found" in response["msg"]): | |
payload = json.loads(response["msg"].replace(" type not found", "")) | |
if(payload): | |
for idx, v in enumerate(paramToPass["components"]): | |
if(paramToPass["components"][idx]["type"] == payload["type"]): | |
paramToPass["components"].pop(idx) | |
print("Removed "+payload["type"]+" from payload. Trying again...") | |
break | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
#os._exit(0) | |
#return None | |
def download_asset(download_id, download_directory): | |
os.makedirs(download_directory, exist_ok=True) | |
url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=True&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads" | |
##print(f"Attempting to download from: {url}") | |
response = requests.get(url, stream=True) | |
if response.status_code == 400: | |
print(f"Error 400: {response.text}") # Print the response to see what's causing the issue | |
attempt_count = 0 | |
delay = 5 | |
max_attempts = 5 | |
while attempt_count < max_attempts: | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
content_disposition = response.headers.get("Content-Disposition") | |
filename = content_disposition.split("filename=")[-1].strip('"') if content_disposition else download_id | |
file_path = os.path.join(download_directory, filename) | |
print(f"### Downloading file: {filename} ###") | |
try: ##ctrl-q for multi line | |
with requests.get(url, stream=True) as r: | |
with open(file_path, 'wb') as f: | |
shutil.copyfileobj(r.raw, f) | |
#with open(file_path, "wb") as file: | |
# for chunk in response.iter_content(chunk_size=4096): | |
# file.write(chunk) | |
print(f"### File downloaded successfully: {file_path} ###") | |
return True | |
except requests.exceptions.ChunkedEncodingError as e: | |
print(f"Error during download: {e}") | |
time.sleep(delay) | |
delay += 5 | |
attempt_count += 1 | |
else: | |
print(f"Failed to download asset {download_id}, status code: {response.status_code}") | |
print(f"Response: {response.text}") # Print the response content for more details | |
return False | |
print(f"Exceeded maximum retry attempts for asset {download_id}") | |
return False | |
match input("Do you want to redownload existing cached downloads?\n" | |
"Please enter y or n\n"): | |
case "y": | |
overwrite = True | |
case "n": | |
overwrite = False | |
# print(overwrite) | |
# Load cached assets | |
cached_assets = set() | |
if os.path.exists(cache_file_path): | |
with open(cache_file_path, "r") as cache_file: | |
cached_assets = set(cache_file.read().splitlines()) | |
# Normalize target category for matching | |
normalized_target_categories = [normalize_category(part) for part in target_category.split("/")] | |
matching_asset_ids = [] | |
# Check matches for each asset in the loaded categorie s | |
for asset_id, categories in asset_categories_dict.items(): | |
# Convert the categories to a single string for matching | |
categories_str = categories_to_string(categories) | |
categories_path = categories_to_string(categories, "/") | |
# Check if all parts of target_category exist in the categories string | |
matches = all(normalize_category(part) in categories_str.lower() for part in normalized_target_categories) | |
## matches and not in. -> add | |
## matches and in and overwrite -> add | |
if matches and asset_id not in cached_assets: | |
print(f"Asset ID: {asset_id} matches target category: {target_category}") | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
elif matches and asset_id in cached_assets and overwrite: | |
print(f"Asset ID: {asset_id} matches target category: {target_category}") | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
if not matching_asset_ids: | |
print("No new assets found for the target category.") | |
exit() | |
# Ask the user for confirmation before downloading | |
print(f"{len(matching_asset_ids)} assets found.") | |
downloadCount = len(matching_asset_ids) | |
confirmation = input(f"Do you want to download these {len(matching_asset_ids)} {target_category} assets? (y/n): ").strip().lower() | |
if confirmation != "y": | |
print("Download canceled.") | |
exit() | |
# Function to handle downloading for threading | |
def download_asset_with_id(asset): | |
asset_id = asset["asset_id"] | |
path = "" | |
if(save_in_subdirectories): | |
path = asset["path"].lower() | |
download_id = get_asset_download_id(asset_id) | |
if download_id: | |
return download_asset(download_id, download_path+path) | |
else: | |
print(f"No download id found for {asset_id}.") | |
return False | |
def get_asset_payload(asset_id): | |
asset = [asset for asset in asset_metadata["asset_metadata"].values() if asset["full_metadata"]["id"] == asset_id] | |
if len(asset) <=0: | |
print(f"Asset {asset_id} not found in asset_metadata.json!") | |
return False | |
asset = asset[0] | |
if "components" in asset["full_metadata"]: # Find all components for this asset, necessary to explicitly request .exr | |
type_list = list(set([component["type"] for component in asset["full_metadata"]["components"]])) | |
else: | |
type_list = list(set([component["type"] for component in asset["full_metadata"]["maps"]])) | |
type_list.sort() | |
asset_components = [{"type": image_map, "mimeType": mime_texture_types[image_map] if image_map in mime_texture_types else mime_texture_types["default"]} for image_map in type_list] | |
payload = {"asset": asset_id, | |
"config": {"highpoly": highpoly, | |
"lowerlod_meshes": True, | |
"lowerlod_normals": True, | |
"ztool": ztool, | |
"brushes": True, | |
"meshMimeType": "application/x-fbx", | |
"albedo_lods": True}, | |
"components": asset_components} | |
return payload | |
asset_metadata = None | |
try: | |
with open(asset_metadata_path, "r", encoding="utf-8") as f: | |
asset_metadata = json.load(f) | |
except FileNotFoundError: | |
print(f"\n\nCouldn't find asset_metadata.json in the directory you selected, {asset_metadata_path}") | |
input("Press Enter to exit...") | |
exit(0) | |
#Open the cache file for appending new downloads | |
with open(cache_file_path, "a+") as cache_file: | |
# Use threading for faster downloading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
futures = {executor.submit(download_asset_with_id, asset): asset for asset in matching_asset_ids} | |
for future in concurrent.futures.as_completed(futures): | |
asset = futures[future] | |
asset_id = asset["asset_id"] | |
try: | |
result = future.result() | |
if result: | |
downloadCount-=1; | |
print(f"{downloadCount} remaining items to download.") | |
# Add the asset to the cache file after successful download | |
cache_file.write(f"{asset_id}\n") | |
cache_file.flush() | |
except Exception as e: | |
print(f"Error downloading asset {asset_id}: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, re | |
from glob import glob | |
import subprocess | |
#This script checks each zip file recursively if it is valid. If not, it gets removed from the cache.txt and will get deleted | |
#After that, try downloading again (with re-downloading cached files NO) | |
download_path = "PATH/TO/QUIXEL/ZIPS" | |
cache_file_path = "./cache.txt" | |
print("Checking for almost empty zipfiles...") | |
for filename in glob(download_path+'**/*__*.zip', recursive=True): | |
matches = re.search('_([a-zA-Z0-9]*)__', filename) | |
asset_id = matches.group(1) | |
print("Removing %s from cache file" % asset_id) | |
cmd = f"sed -i '/{asset_id}/d' "+cache_file_path | |
os.system(cmd) | |
os.remove(filename) | |
print("Checking for corrupt zipfiles...") | |
for filename in glob(download_path+'**/*.zip', recursive=True): | |
result = subprocess.run(['zip', '-T', filename], capture_output=True) | |
if result.returncode != 0: | |
matches = re.search('_([a-z0-9]*)_(2|4|8)K', filename) | |
asset_id = matches.group(1) | |
print("Zipfile invalid! Removing %s from cache file" % asset_id) | |
cmd = f"sed -i '/{asset_id}/d' "+cache_file_path | |
os.system(cmd) | |
os.remove(filename) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Some small points that would help improve this gist.
Join paths correctly
L259:
return download_asset(download_id, os.path.join(download_path, path))
Name the zip the name of the asset
L151 the function should include
asset_id
for an argument, and when the function is called on L259, you need to addasset_id
there as well.Then between L171 and L172:
Integrate zipfile checking during download
Add the
check_zip_file
function:Then check it in your
download_asset
function in the try block. Looking for the specific exception. Example of said function with zipfile checking: