Last active
February 25, 2020 14:22
-
-
Save fakuivan/33e681a4ae4892ff6dd3c77ae6eeda2e to your computer and use it in GitHub Desktop.
Script for determining the block_id/chunk density for a specific minecraft world
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from nbt.nbt import NBTFile | |
from nbt.chunk import McRegionChunk | |
from nbt.world import WorldFolder, AnvilWorldFolder, McRegionWorldFolder | |
from pathlib import Path | |
from pprint import pprint | |
from typing import List, Dict, Tuple, Optional, Generator, Iterable, Union, NamedTuple, DefaultDict, Any | |
from collections import defaultdict | |
from io import TextIOWrapper | |
import json | |
import argparse | |
class Chunk(NamedTuple): | |
x: int | |
z: int | |
# {<block_id>:{[<chunk_coord>]:<density>}} | |
ChunkDensityMapping = DefaultDict[Chunk, int] | |
ChunkDensityPerItemMapping = DefaultDict[int, ChunkDensityMapping] | |
# {<block_id>:{<chunk_x>:{<chunk_z>:{<density>}}}} | |
MappingExport = DefaultDict[int, DefaultDict[int, DefaultDict[int, int]]] | |
def export_mapping(mapping: ChunkDensityPerItemMapping) -> MappingExport: | |
export: MappingExport = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) | |
for block_id, density_per_chunk in mapping.items(): | |
for chunk, density in density_per_chunk.items(): | |
export[block_id][chunk.x][chunk.z] = density | |
return export | |
def import_mapping(export: Any) -> ChunkDensityPerItemMapping: | |
if not isinstance(export, dict): | |
raise ValueError("Imported object is not a dictionary") | |
imported: ChunkDensityPerItemMapping = defaultdict(lambda: defaultdict(int)) | |
for block_id, density_per_chunk_x in export.items(): | |
block_id = convert_int(block_id, "Block id is not an integer") | |
if not isinstance(density_per_chunk_x, dict): | |
raise ValueError("Density mapping is not a dictionary") | |
for chunk_x, density_per_chunk_xz in density_per_chunk_x.items(): | |
chunk_x = convert_int(chunk_x, "Chunk.x is not an integer") | |
if not isinstance(density_per_chunk_xz, dict): | |
raise ValueError("Density mapping for x is not a dictionary") | |
for chunk_z, chunk_density in density_per_chunk_xz.items(): | |
chunk_z = convert_int(chunk_z, "Chunk.z is not an integer") | |
chunk_density = convert_int(chunk_density, "Chunk density is not an integer") | |
imported[block_id][Chunk(chunk_x, chunk_z)] = chunk_density | |
return imported | |
def convert_int(value: Any, message: str) -> int: | |
try: | |
return int(value) | |
except ValueError: | |
raise ValueError(message) | |
def main() -> int: | |
parser = argparse.ArgumentParser(description="Find and rank occurrences of items in a per-chunk basis") | |
subparsers = parser.add_subparsers(help="Data gathering", dest="subparser_name") | |
world_parser = subparsers.add_parser("world", help="Gather block data from world directory") | |
world_parser.add_argument("map_dir", metavar="MAP_DIR", type=Path, help="Path to the map directory") | |
world_parser.add_argument("--block_ids", type=int, required=True, nargs="+", help="Block IDs to detect") | |
infile_parser = subparsers.add_parser("cache", help="Gather block data from cache file") | |
infile_parser.add_argument("infile", type=argparse.FileType('r'), metavar="INFILE", | |
help="Input file to process already collected world data") | |
parser.add_argument("--output", type=argparse.FileType('w'), | |
help="Output file to dump mapping data (in json format)") | |
args = parser.parse_args() | |
subparser: Optional[str] = getattr(args, "subparser_name", None) | |
blocks_in_chunks: ChunkDensityPerItemMapping = defaultdict(lambda: defaultdict(int)) | |
if subparser is None: | |
parser.print_usage() | |
parser.exit(1, f"{parser.prog}: No subcommand specified") | |
elif subparser == "world": | |
map_dir: Path = Path(args.map_dir) | |
block_ids: List[int] = [int(_id) for _id in list(args.block_ids)] | |
world = WorldFolder(str(map_dir)) | |
if isinstance(world, AnvilWorldFolder): | |
print("Processing world with format Anvil...") | |
anvil_process_world(world, block_ids, blocks_in_chunks) | |
elif isinstance(world, McRegionWorldFolder): | |
print("Processing world with format Region...") | |
mcregion_process_world(world, block_ids, blocks_in_chunks) | |
elif subparser == "infile": | |
infile: TextIOWrapper = args.infile | |
blocks_in_chunks = import_mapping(json.load(infile)) | |
output: Optional[TextIOWrapper] = args.output | |
if output is not None: | |
print(f"Writing to output file...") | |
json.dump(export_mapping(blocks_in_chunks), output, indent=4) | |
else: | |
pprint(blocks_in_chunks) | |
return 0 | |
def anvil_process_world(world: AnvilWorldFolder, | |
ids: List[int], | |
blocks_in_chunks: ChunkDensityPerItemMapping) -> None: | |
regions = len(world.get_regionfiles()) | |
for region_number, region in enumerate(world.iter_regions()): | |
print(f"Processing region file {region.filename}... {region_number}/{regions} files processed. ") | |
for chunk_coords in (Chunk(int(chk['x']), int(chk['z'])) for chk in region.get_chunks()): | |
#print(f" Processing chunk {chunk_coords}") | |
chunk = region.get_nbt(chunk_coords.x, chunk_coords.z) | |
for blocks, y_pos in anvil_iter_chunk_block_sections(chunk): | |
#print(f" Processing section {y_pos}") | |
process_blocks(blocks, ids, chunk_coords, blocks_in_chunks) | |
print(f"Processing finished for world {world}") | |
def anvil_iter_chunk_block_sections(chunk: NBTFile) -> \ | |
Generator[Tuple[Optional[Union[bytearray, List[int]]], int], None, None]: | |
for section in chunk["Level"]["Sections"]: # type: TAG_Compound | |
y_val: int = section["Y"].value | |
try: | |
blocks_tag = section["Blocks"] # type: TAG_Byte_Array | |
blocks = blocks_tag.value | |
except (KeyError, AttributeError): | |
yield None, y_val | |
continue | |
try: | |
add_tag = section["Add"] # type: TAG_Byte_Array | |
add = array_4bit_to_byte(add_tag.value) | |
except (KeyError, AttributeError): | |
yield blocks, y_val | |
else: | |
yield [(value + 256 * add[index]) for index, value in enumerate(blocks)], y_val | |
# Copied from | |
# https://github.com/twoolie/NBT/blob/3e501d123267e69d571f5f6c3619c31a3c498d59/examples/anvil_blockdata.py#L26-L37 | |
def array_4bit_to_byte(array): | |
"""Convert a 2048-byte array of 4096 4-bit values to an array of 4096 1-byte values. | |
The result is of type bytearray(). | |
Note that the first byte of the created arrays contains the LEAST significant | |
bits of the first byte of the Data. NOT to the MOST significant bits, as you | |
might expected. This is because Minecraft stores data in that way. | |
""" | |
def iterarray(_array): | |
for b in _array: | |
yield(b & 15) # Little end of the byte | |
yield((b >> 4) & 15) # Big end of the byte | |
return bytearray(iterarray(array)) | |
def mcregion_process_world(world: McRegionWorldFolder, | |
ids: List[int], | |
blocks_in_chunks: ChunkDensityPerItemMapping) -> None: | |
for chunk in world.iter_chunks(): # type: McRegionChunk | |
chunk_coords: Chunk = Chunk(*chunk.get_coords()) | |
process_blocks(chunk.blocks, ids, chunk_coords, blocks_in_chunks) | |
def process_blocks(blocks: Iterable[int], | |
ids: Iterable[int], | |
chunk_coords: Chunk, | |
blocks_in_chunks: ChunkDensityPerItemMapping) -> None: | |
for block_id in blocks: | |
if block_id not in ids: | |
continue | |
blocks_in_chunks[block_id][chunk_coords] += 1 | |
if __name__ == "__main__": | |
import sys as _sys | |
_sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment