-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Pascal Spörri <[email protected]>
- Loading branch information
Showing
6 changed files
with
268 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# GEDS AI Workload Examples | ||
|
||
Setup: | ||
|
||
- Create the python environment: `python3 -m venv venv` | ||
- Source the python environment: `source venv/bin/activate` | ||
- Install GEDS smart open: Download the latest GEDS smart\_open whl and install it: | ||
``` | ||
VERSION=1.0.5 | ||
wget https://github.com/IBM/GEDS/releases/download/v${VERSION}/geds_smart_open-${VERSION}-py3-none-any.whl.ubuntu22.04-release | ||
mv geds_smart_open-${VERSION}-py3-none-any.whl.ubuntu22.04-release geds_smart_open-${VERSION}-py3-none-any.whl | ||
``` | ||
|
||
Run the examples: | ||
|
||
- `ai_training_example.py`: Simulates snapshotting from multiple GPUS and storing the data on S3 in the background. | ||
- `write_data.py`: Write data to GEDS | ||
- `read_data.py`: Read written data from the network (created by `write_data`) | ||
- `write_data_spilling.py`: Write data + explicit spilling to S3 to determine spilling throughput. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import os | ||
import sys | ||
import time | ||
import io | ||
import numpy as np | ||
from threading import Thread | ||
|
||
|
||
from smart_open import open | ||
import geds_smart_open | ||
from geds_smart_open import GEDS | ||
|
||
bucket = 'geds-test' | ||
|
||
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | ||
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | ||
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL') | ||
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) | ||
|
||
output_csv = open('file://ai_training.csv', 'w') | ||
output_csv.write("Threads,Time,Data Read,Data Written,Data Spilled\n") | ||
|
||
def read_file(tid, buffers): | ||
path = f'geds://{bucket}/fake_ml_model_{tid}.bin' | ||
with open(path, 'rb') as f: | ||
buffers[tid] = f.read() | ||
|
||
def write_checkpoints(tid, buffers): | ||
path = f'geds://{bucket}/checkpoint/checkpoint_{tid}.bin' | ||
with open(path, 'wb') as f: | ||
f.write(buffers[tid]) | ||
|
||
def persist(): | ||
geds_smart_open.relocate() | ||
|
||
def benchmark_threads(num_threads, csv): | ||
buffers = [None]*num_threads | ||
# Read checkpoint | ||
start_time = time.time_ns() | ||
threads = [Thread(target=read_file, args=(i, buffers)) for i in range(0, num_threads)] | ||
[t.start() for t in threads] | ||
[t.join() for t in threads] | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
length = sum([len(b) for b in buffers]) | ||
print(f'{num_threads}: Reading {length} took {duration}') | ||
csv.write(f'{num_threads},{duration},{length},0,0\n') | ||
|
||
# Write checkpoints | ||
start_time = time.time_ns() | ||
threads = [Thread(target=write_checkpoints, args=(i, buffers)) for i in range(0, num_threads)] | ||
[t.start() for t in threads] | ||
[t.join() for t in threads] | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
print(f'{num_threads}: Writing {length} took {duration}') | ||
csv.write(f'{num_threads},{duration},0,{length},0\n') | ||
buffers = None | ||
start_time = time.time_ns() | ||
persist() | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
print(f'{num_threads}: Relocating {length} took {duration}') | ||
csv.write(f'{num_threads},{duration},0,0,{length}\n') | ||
|
||
for t in [1,2, 4, 6]: | ||
benchmark_threads(t, output_csv) | ||
output_csv.flush() | ||
|
||
output_csv.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import os | ||
import sys | ||
import time | ||
import io | ||
import numpy as np | ||
from threading import Thread | ||
|
||
|
||
from smart_open import open | ||
import geds_smart_open | ||
from geds_smart_open import GEDS | ||
|
||
bucket = 'geds-test' | ||
|
||
folder_in = '100g' | ||
folder_out = 'output/terasort' | ||
|
||
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | ||
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | ||
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL') | ||
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) | ||
|
||
MAX_THREADS = 16 | ||
MAX_SIZE = 28 # 256 MB | ||
MAX_SIZE = 32 # 4GB | ||
|
||
output_csv = open('file://read_data.csv', 'w') | ||
output_csv.write("Payload Size,Thread Count,Throughput\n") | ||
|
||
|
||
def read_file(tid, max_threads, num_threads, size): | ||
path = f'geds://{bucket}/benchmark/{num_threads}_{tid}_{size}' | ||
#path = f'geds://{bucket}/benchmark/{max_threads}_{tid}_{size}' | ||
with open(path, 'rb') as f: | ||
buf = f.read() | ||
assert(len(buf) == size) | ||
|
||
for t in range(0, MAX_THREADS): | ||
for s in range(1, MAX_SIZE+1): | ||
size = 2**s | ||
num_threads = t + 1 | ||
threads = [Thread(target=read_file, args=[i, MAX_THREADS, num_threads, size]) for i in range(0, num_threads)] | ||
start_time = time.time_ns() | ||
[t.start() for t in threads] | ||
[t.join() for t in threads] | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
rate = float(num_threads) * size / duration / (1024**2) | ||
print(f"{size}, {num_threads}: {rate} MB/s") | ||
output_csv.write(f"{size},{num_threads},{rate}\n") | ||
|
||
output_csv.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#!/usr/bin/env bash | ||
set -euo pipefail | ||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" | ||
cd $SCRIPT_DIR | ||
|
||
export GEDS_METADATASERVER=flex12:4381 | ||
export GEDS_TMP=/mnt/psp/GEDS_XXXXXX | ||
export GEDS_AVAILABLE_STORAGE=$(( 600 * 1024 * 1024 * 1024 )) | ||
python $@ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import os | ||
import sys | ||
import time | ||
import io | ||
import numpy as np | ||
from threading import Thread | ||
|
||
|
||
from smart_open import open | ||
import geds_smart_open | ||
from geds_smart_open import GEDS | ||
|
||
bucket = 'geds-test' | ||
|
||
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | ||
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | ||
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL') | ||
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) | ||
|
||
MAX_THREADS = 16 | ||
MAX_SIZE = 28 # 256 MB | ||
MAX_SIZE = 33 # 8GB | ||
|
||
output_csv = open('file://write_data.csv', 'w') | ||
output_csv.write("Payload Size,Thread Count,Throughput\n") | ||
|
||
|
||
def write_file(tid, nthreads, buffer, size): | ||
path = f'geds://{bucket}/benchmark/{nthreads}_{tid}_{size}' | ||
with open(path, 'wb') as f: | ||
f.write(buffer) | ||
f.flush() | ||
f.close() | ||
|
||
for t in range(0, MAX_THREADS): | ||
num_threads = t + 1 | ||
if num_threads > 1 and num_threads % 2 == 1: | ||
continue | ||
for s in range(1, MAX_SIZE+1): | ||
size = 2**s | ||
buffers = [np.random.randint(0, high=255, size=size, dtype=np.uint8) for i in range(0, num_threads)] | ||
threads = [Thread(target=write_file, args=[i, num_threads, buffers[i], size]) for i in range(0, num_threads)] | ||
start_time = time.time_ns() | ||
[t.start() for t in threads] | ||
[t.join() for t in threads] | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
rate = float(num_threads) * size / duration / (1024**2) | ||
print(f"{size}, {num_threads}: {rate} MB/s") | ||
output_csv.write(f"{size},{num_threads},{rate}\n") | ||
output_csv.flush() | ||
|
||
output_csv.close() | ||
# Keep GEDS open to allow reading data | ||
time.sleep(1000000000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import os | ||
import sys | ||
import time | ||
import io | ||
import numpy as np | ||
from threading import Thread | ||
|
||
|
||
from smart_open import open | ||
import geds_smart_open | ||
from geds_smart_open import GEDS | ||
|
||
bucket = 'geds-test' | ||
|
||
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | ||
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | ||
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL') | ||
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) | ||
|
||
MAX_THREADS = 16 | ||
MAX_SIZE = 28 # 256 MB | ||
MAX_SIZE = 33 # 8GB | ||
|
||
output_csv = open('file://write_data.csv', 'w') | ||
output_csv.write("Payload Size,Thread Count,Throughput\n") | ||
|
||
|
||
spill_csv = open('file://write_data_spill.csv', 'w') | ||
spill_csv.write("Payload Size,Object Count,Throughput\n") | ||
|
||
def write_file(tid, nthreads, buffer, size): | ||
path = f'geds://{bucket}/benchmark/{nthreads}_{tid}_{size}' | ||
with open(path, 'wb') as f: | ||
f.write(buffer) | ||
f.flush() | ||
f.close() | ||
|
||
for t in range(0, MAX_THREADS): | ||
num_threads = t + 1 | ||
if num_threads > 1 and num_threads % 2 == 1: | ||
continue | ||
for s in range(1, MAX_SIZE+1): | ||
size = 2**s | ||
# if num_threads == 16 and s == MAX_SIZE: | ||
# continue | ||
buffers = [np.random.randint(0, high=255, size=size, dtype=np.uint8) for i in range(0, num_threads)] | ||
threads = [Thread(target=write_file, args=[i, num_threads, buffers[i], size]) for i in range(0, num_threads)] | ||
start_time = time.time_ns() | ||
[t.start() for t in threads] | ||
[t.join() for t in threads] | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
rate = float(num_threads) * size / duration / (1024**2) | ||
print(f"{size}, {num_threads}: {rate} MB/s") | ||
output_csv.write(f"{size},{num_threads},{rate}\n") | ||
output_csv.flush() | ||
start_time = time.time_ns() | ||
geds_smart_open.relocate() | ||
duration = (time.time_ns() - start_time) / (1000**3) | ||
rel_size = size * num_threads | ||
rate = float(rel_size) / duration / (1024**2) | ||
print(f"Relocation {rel_size}, {num_threads}: {rate}") | ||
spill_csv.write(f"{rel_size},{num_threads},{rate}\n") | ||
spill_csv.flush() | ||
spill_csv.close() | ||
output_csv.close() | ||
|