-
Notifications
You must be signed in to change notification settings - Fork 9
/
read_data.py
51 lines (40 loc) · 1.52 KB
/
read_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
import sys
import time
import io
import numpy as np
from threading import Thread
from smart_open import open
import geds_smart_open
from geds_smart_open import GEDS
bucket = 'geds-test'
folder_in = '100g'
folder_out = 'output/terasort'
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_ENDPOINT_URL = os.getenv('AWS_ENDPOINT_URL')
geds_smart_open.register_object_store(bucket, AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
MAX_THREADS = 16
MAX_SIZE = 28 # 256 MB
MAX_SIZE = 32 # 4GB
output_csv = open('file://read_data.csv', 'w')
output_csv.write("Payload Size,Thread Count,Throughput\n")
def read_file(tid, max_threads, num_threads, size):
path = f'geds://{bucket}/benchmark/{num_threads}_{tid}_{size}'
#path = f'geds://{bucket}/benchmark/{max_threads}_{tid}_{size}'
with open(path, 'rb') as f:
buf = f.read()
assert(len(buf) == size)
for t in range(0, MAX_THREADS):
for s in range(1, MAX_SIZE+1):
size = 2**s
num_threads = t + 1
threads = [Thread(target=read_file, args=[i, MAX_THREADS, num_threads, size]) for i in range(0, num_threads)]
start_time = time.time_ns()
[t.start() for t in threads]
[t.join() for t in threads]
duration = (time.time_ns() - start_time) / (1000**3)
rate = float(num_threads) * size / duration / (1024**2)
print(f"{size}, {num_threads}: {rate} MB/s")
output_csv.write(f"{size},{num_threads},{rate}\n")
output_csv.close()