Skip to content

Commit

Permalink
Improve smart_open integration with GEDS.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Apr 17, 2024
1 parent cb37f70 commit e1a433e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .travis/build-geds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ if [ "${TRAVIS:-""}" == "true" ]; then
-e BUILD_TARGET=${DOCKER_BUILD_TARGET} \
-e BUILD_TYPE=${CMAKE_BUILD_TYPE} \
-e GEDS_VERSION=${GEDS_VERSION} \
-e GEDS_INSTALL_PREFIX="/install/" \
-e GEDS_INSTALL_PREFIX="/install/geds" \
-e ARTIFACTS_PREFIX="/src/geds/travis_artifacts/" \
-w "/build/geds" \
-t docker.io/python:3.10-buster \
Expand Down
4 changes: 2 additions & 2 deletions package_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ BUILD_TARGET=$(echo ${BUILD_TARGET} | awk '{print tolower($0)}')
BUILD_TYPE=$(echo ${BUILD_TYPE} | awk '{print tolower($0)}')

ARTIFACTS_DIR="${ARTIFACTS_DIR:-"${ROOT}/travis_artifacts"}"
GEDS_INSTALL_PREFIX=${GEDS_INSTALL_PREFIX:-"${ROOT}/travis_install"}
GEDS_INSTALL_PREFIX=${GEDS_INSTALL_PREFIX:-"${ROOT}/travis_install/geds"}
BUILD_LOC=$(mktemp -d /tmp/gedspy_XXX)

cp -a src/python/geds_smart_open/ "${BUILD_LOC}/"
cd "${BUILD_LOC}/geds_smart_open"
cp "${GEDS_INSTALL_PREFIX}/geds/python/pygeds.so" src/geds_smart_open/
cp "${GEDS_INSTALL_PREFIX}/python/pygeds.so" src/geds_smart_open/
sed -i "s/SNAPSHOT/${GEDS_VERSION}/g" "pyproject.toml"

pip install 'build[virtualenv]'
Expand Down
29 changes: 27 additions & 2 deletions src/python/geds_smart_open/src/geds_smart_open/geds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io
import os
import threading
import atexit
from typing import Iterable

try:
Expand Down Expand Up @@ -59,7 +60,7 @@ def fileno(self) -> int:

def close(self) -> None:
"""Flush and close this stream"""
if self.is_writeable:
if not self.closed and self.is_writeable:
self.file.seal()
self.file = None

Expand All @@ -72,7 +73,7 @@ def closed(self) -> bool:
return self.file is None

def flush(self):
if self.is_writeable:
if not self.closed and self.is_writeable:
self.file.seal()

def isatty(self) -> bool:
Expand All @@ -88,6 +89,10 @@ def checkReadable(self):
if not self.readable:
raise OSError("The file is not readable")

def checkClosed(self):
if self.closed:
raise IOError("the file is already closed!")

def seekable(self) -> bool:
return True

Expand All @@ -112,6 +117,7 @@ def read(self, limit: int = -1):
of file. If the object is in non-blocking mode and no bytes are
available, None is returned.
"""
self.checkClosed()
self.checkReadable()
maxcount = self.file.size - self.position
assert maxcount >= 0
Expand All @@ -128,11 +134,16 @@ def read(self, limit: int = -1):

def readinto(self, buffer):
self.checkReadable()
if self.closed:
return -1
count = self.file.read(buffer, self.position, len(buffer))
self.position += count
return count

def readline(self, limit: int = -1) -> bytes:
if self.closed:
return -1

previous_position = self.position
print("readline " + limit)
if limit != -1:
Expand All @@ -154,24 +165,29 @@ def readline(self, limit: int = -1) -> bytes:
return line.getvalue()

def readall(self) -> bytes:
self.checkClosed()

length = self.file.size - self.position
buffer = bytearray(length)
count = self.readinto(buffer)
return buffer[0:count]

def write(self, b):
self.checkClosed()
if not self.is_writeable:
raise IOError("write is not allowed: the file is not writeable!")
self.file.write(b, self.position, len(b))
self.position += len(b)
return len(b)

def writelines(self, lines: Iterable) -> None:
self.checkClosed()
for line in lines:
self.write(line)
self.write(self.line_terminator)

def truncate(self, size=None) -> int:
self.checkClosed()
if not self.is_writeable:
raise IOError("truncate not allowed: the file is not writeable!")
if size is None:
Expand Down Expand Up @@ -224,6 +240,12 @@ def get(cls) -> pygeds.GEDS:
cls.init_geds()
return cls._geds

@classmethod
def handle_shutdown(cls) -> None:
if cls._geds != None:
print("Stopping GEDS --> Spilling data.")
cls._geds.stop()

@classmethod
def register_object_store(
cls, bucket: str, endpoint_url: str, access_key: str, secret_key: str
Expand All @@ -244,6 +266,9 @@ def register_object_store(
def object_store_mapped(cls, bucket: str) -> bool:
return bucket in cls._known_s3_buckets

@atexit.register
def handle_shutdown():
GEDSInstance.handle_shutdown()

def register_object_store(
bucket: str, endpoint_url: str, access_key: str, secret_key: str
Expand Down
3 changes: 2 additions & 1 deletion src/python/wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ PYBIND11_MODULE(pygeds, m) {
.def_readwrite("port", &GEDSConfig::port)
.def_readwrite("port_http_server", &GEDSConfig::portHttpServer)
.def_readwrite("local_storage_path", &GEDSConfig::localStoragePath)
.def_readwrite("cache_block_size", &GEDSConfig::cacheBlockSize);
.def_readwrite("cache_block_size", &GEDSConfig::cacheBlockSize)
.def_readwrite("cache_objects_from_s3", &GEDSConfig::cache_objects_from_s3);

py::class_<GEDS, std::shared_ptr<GEDS>>(m, "GEDS")
.def_property_readonly_static(
Expand Down

0 comments on commit e1a433e

Please sign in to comment.