Skip to content

Commit

Permalink
Add AI Training Examples.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed May 23, 2024
1 parent aa516b4 commit 200ea22
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 0 deletions.
20 changes: 20 additions & 0 deletions examples/ai-workload/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# GEDS AI Workload Examples

Setup:

- Create the python environment: `python3 -m venv venv`
- Source the python environment: `source venv/bin/activate`
- Install GEDS smart open: Download the latest GEDS smart\_open whl and install it:
```
VERSION=1.0.5
wget https://github.com/IBM/GEDS/releases/download/v${VERSION}/geds_smart_open-${VERSION}-py3-none-any.whl.ubuntu22.04-release
mv geds_smart_open-${VERSION}-py3-none-any.whl.ubuntu22.04-release geds_smart_open-${VERSION}-py3-none-any.whl
```

Run the examples:

- `ai_training_example.py`: Simulates snapshotting from multiple GPUS and storing the data on S3 in the background.
- `write_data.py`: Write data to GEDS
- `read_data.py`: Read written data from the network (created by `write_data`)
- `write_data_spilling.py`: Write data + explicit spilling to S3 to determine spilling throughput.

67 changes: 67 additions & 0 deletions examples/ai-workload/ai_training_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import sys
import time
import io
import numpy as np
from threading import Thread


from smart_open import open
import geds_smart_open
from geds_smart_open import GEDS

bucket = 'geds-test'

AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL')
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

output_csv = open('file://ai_training.csv', 'w')
output_csv.write("Threads,Time,Data Read,Data Written,Data Spilled\n")

def read_file(tid, buffers):
path = f'geds://{bucket}/fake_ml_model_{tid}.bin'
with open(path, 'rb') as f:
buffers[tid] = f.read()

def write_checkpoints(tid, buffers):
path = f'geds://{bucket}/checkpoint/checkpoint_{tid}.bin'
with open(path, 'wb') as f:
f.write(buffers[tid])

def persist():
geds_smart_open.relocate()

def benchmark_threads(num_threads, csv):
buffers = [None]*num_threads
# Read checkpoint
start_time = time.time_ns()
threads = [Thread(target=read_file, args=(i, buffers)) for i in range(0, num_threads)]
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
length = sum([len(b) for b in buffers])
print(f'{num_threads}: Reading {length} took {duration}')
csv.write(f'{num_threads},{duration},{length},0,0\n')

# Write checkpoints
start_time = time.time_ns()
threads = [Thread(target=write_checkpoints, args=(i, buffers)) for i in range(0, num_threads)]
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
print(f'{num_threads}: Writing {length} took {duration}')
csv.write(f'{num_threads},{duration},0,{length},0\n')
buffers = None
start_time = time.time_ns()
persist()
duration = (time.time_ns() - start_time) / (1000**3)
print(f'{num_threads}: Relocating {length} took {duration}')
csv.write(f'{num_threads},{duration},0,0,{length}\n')

for t in [1,2, 4, 6]:
benchmark_threads(t, output_csv)
output_csv.flush()

output_csv.close()
51 changes: 51 additions & 0 deletions examples/ai-workload/read_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import sys
import time
import io
import numpy as np
from threading import Thread


from smart_open import open
import geds_smart_open
from geds_smart_open import GEDS

bucket = 'geds-test'

folder_in = '100g'
folder_out = 'output/terasort'

AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL')
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

MAX_THREADS = 16
MAX_SIZE = 28 # 256 MB
MAX_SIZE = 32 # 4GB

output_csv = open('file://read_data.csv', 'w')
output_csv.write("Payload Size,Thread Count,Throughput\n")


def read_file(tid, max_threads, num_threads, size):
path = f'geds://{bucket}/benchmark/{num_threads}_{tid}_{size}'
#path = f'geds://{bucket}/benchmark/{max_threads}_{tid}_{size}'
with open(path, 'rb') as f:
buf = f.read()
assert(len(buf) == size)

for t in range(0, MAX_THREADS):
for s in range(1, MAX_SIZE+1):
size = 2**s
num_threads = t + 1
threads = [Thread(target=read_file, args=[i, MAX_THREADS, num_threads, size]) for i in range(0, num_threads)]
start_time = time.time_ns()
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
rate = float(num_threads) * size / duration / (1024**2)
print(f"{size}, {num_threads}: {rate} MB/s")
output_csv.write(f"{size},{num_threads},{rate}\n")

output_csv.close()
10 changes: 10 additions & 0 deletions examples/ai-workload/wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd $SCRIPT_DIR

export GEDS_METADATASERVER=flex12:4381
export GEDS_TMP=/mnt/psp/GEDS_XXXXXX
export GEDS_AVAILABLE_STORAGE=$(( 600 * 1024 * 1024 * 1024 ))
python $@

54 changes: 54 additions & 0 deletions examples/ai-workload/write_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
import sys
import time
import io
import numpy as np
from threading import Thread


from smart_open import open
import geds_smart_open
from geds_smart_open import GEDS

bucket = 'geds-test'

AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL')
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

MAX_THREADS = 16
MAX_SIZE = 28 # 256 MB
MAX_SIZE = 33 # 8GB

output_csv = open('file://write_data.csv', 'w')
output_csv.write("Payload Size,Thread Count,Throughput\n")


def write_file(tid, nthreads, buffer, size):
path = f'geds://{bucket}/benchmark/{nthreads}_{tid}_{size}'
with open(path, 'wb') as f:
f.write(buffer)
f.flush()
f.close()

for t in range(0, MAX_THREADS):
num_threads = t + 1
if num_threads > 1 and num_threads % 2 == 1:
continue
for s in range(1, MAX_SIZE+1):
size = 2**s
buffers = [np.random.randint(0, high=255, size=size, dtype=np.uint8) for i in range(0, num_threads)]
threads = [Thread(target=write_file, args=[i, num_threads, buffers[i], size]) for i in range(0, num_threads)]
start_time = time.time_ns()
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
rate = float(num_threads) * size / duration / (1024**2)
print(f"{size}, {num_threads}: {rate} MB/s")
output_csv.write(f"{size},{num_threads},{rate}\n")
output_csv.flush()

output_csv.close()
# Keep GEDS open to allow reading data
time.sleep(1000000000)
66 changes: 66 additions & 0 deletions examples/ai-workload/write_data_spilling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import sys
import time
import io
import numpy as np
from threading import Thread


from smart_open import open
import geds_smart_open
from geds_smart_open import GEDS

bucket = 'geds-test'

AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL')
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

MAX_THREADS = 16
MAX_SIZE = 28 # 256 MB
MAX_SIZE = 33 # 8GB

output_csv = open('file://write_data.csv', 'w')
output_csv.write("Payload Size,Thread Count,Throughput\n")


spill_csv = open('file://write_data_spill.csv', 'w')
spill_csv.write("Payload Size,Object Count,Throughput\n")

def write_file(tid, nthreads, buffer, size):
path = f'geds://{bucket}/benchmark/{nthreads}_{tid}_{size}'
with open(path, 'wb') as f:
f.write(buffer)
f.flush()
f.close()

for t in range(0, MAX_THREADS):
num_threads = t + 1
if num_threads > 1 and num_threads % 2 == 1:
continue
for s in range(1, MAX_SIZE+1):
size = 2**s
# if num_threads == 16 and s == MAX_SIZE:
# continue
buffers = [np.random.randint(0, high=255, size=size, dtype=np.uint8) for i in range(0, num_threads)]
threads = [Thread(target=write_file, args=[i, num_threads, buffers[i], size]) for i in range(0, num_threads)]
start_time = time.time_ns()
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
rate = float(num_threads) * size / duration / (1024**2)
print(f"{size}, {num_threads}: {rate} MB/s")
output_csv.write(f"{size},{num_threads},{rate}\n")
output_csv.flush()
start_time = time.time_ns()
geds_smart_open.relocate()
duration = (time.time_ns() - start_time) / (1000**3)
rel_size = size * num_threads
rate = float(rel_size) / duration / (1024**2)
print(f"Relocation {rel_size}, {num_threads}: {rate}")
spill_csv.write(f"{rel_size},{num_threads},{rate}\n")
spill_csv.flush()
spill_csv.close()
output_csv.close()

0 comments on commit 200ea22

Please sign in to comment.