Skip to content

Commit

Permalink
feat(ingest): maintain ordering in file-backed dict (#11346)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 10, 2024
1 parent 80970b1 commit 311ea10
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
40 changes: 29 additions & 11 deletions metadata-ingestion/src/datahub/utilities/file_backed_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ class FileBackedDict(MutableMapping[str, _VT], Closeable, Generic[_VT]):
"""A dict-like object that stores its data in a temporary SQLite database.
This is useful for storing large amounts of data that don't fit in memory.
Like a standard Python dict / OrderedDict, it maintains insertion order.
It maintains a small in-memory cache to avoid having to serialize/deserialize
data from the database too often. This is an implementation detail that isn't
exposed to the user.
"""

# Use a predefined connection, able to be shared across multiple FileBacked* objects
Expand Down Expand Up @@ -212,8 +218,9 @@ def __post_init__(self) -> None:
self.cache_eviction_batch_size > 0
), "cache_eviction_batch_size must be positive"

assert "key" not in self.extra_columns, '"key" is a reserved column name'
assert "value" not in self.extra_columns, '"value" is a reserved column name'
for reserved_column in ("key", "value", "rowid"):
if reserved_column in self.extra_columns:
raise ValueError(f'"{reserved_column}" is a reserved column name')

if self.shared_connection:
self._conn = self.shared_connection
Expand All @@ -227,10 +234,13 @@ def __post_init__(self) -> None:
self._active_object_cache = collections.OrderedDict()

# Create the table.
# We could use the built-in sqlite `rowid` column, but that can get changed
# if a VACUUM is performed and would break our ordering guarantees.
if_not_exists = "IF NOT EXISTS" if self._conn.allow_table_name_reuse else ""
self._conn.execute(
f"""CREATE TABLE {if_not_exists} {self.tablename} (
key TEXT PRIMARY KEY,
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT UNIQUE,
value BLOB
{''.join(f', {column_name} BLOB' for column_name in self.extra_columns.keys())}
)"""
Expand Down Expand Up @@ -280,13 +290,20 @@ def _prune_cache(self, num_items_to_prune: int) -> None:
items_to_write.append(tuple(values))

if items_to_write:
# Tricky: By using a INSERT INTO ... ON CONFLICT (key) structure, we can
# ensure that the rowid remains the same if a value is updated but is
# autoincremented when rows are inserted.
self._conn.executemany(
f"""INSERT OR REPLACE INTO {self.tablename} (
f"""INSERT INTO {self.tablename} (
key,
value
{''.join(f', {column_name}' for column_name in self.extra_columns.keys())}
)
VALUES ({', '.join(['?'] *(2 + len(self.extra_columns)))})""",
VALUES ({', '.join(['?'] *(2 + len(self.extra_columns)))})
ON CONFLICT (key) DO UPDATE SET
value = excluded.value
{''.join(f', {column_name} = excluded.{column_name}' for column_name in self.extra_columns.keys())}
""",
items_to_write,
)

Expand Down Expand Up @@ -356,14 +373,15 @@ def mark_dirty(self, key: str) -> None:
self._active_object_cache[key] = self._active_object_cache[key][0], True

def __iter__(self) -> Iterator[str]:
# Cache should be small, so safe set cast to avoid mutation during iteration
cache_keys = set(self._active_object_cache.keys())
yield from cache_keys
self.flush()

cursor = self._conn.execute(f"SELECT key FROM {self.tablename}")
# Our active object cache should now be empty, so it's fine to
# just pull from the DB.
cursor = self._conn.execute(
f"SELECT key FROM {self.tablename} ORDER BY rowid ASC"
)
for row in cursor:
if row[0] not in cache_keys:
yield row[0]
yield row[0]

def items_snapshot(
self, cond_sql: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,42 @@ def test_file_dict_stores_counter() -> None:
assert in_memory_counters[i].most_common(2) == cache[str(i)].most_common(2)


def test_file_dict_ordering() -> None:
"""
We require that FileBackedDict maintains insertion order, similar to Python's
built-in dict. This test makes one of each and validates that they behave the same.
"""

cache = FileBackedDict[int](
serializer=str,
deserializer=int,
cache_max_size=1,
)
data = {}

num_items = 14

for i in range(num_items):
cache[str(i)] = i
data[str(i)] = i

assert list(cache.items()) == list(data.items())

# Try some deletes.
for i in range(3, num_items, 3):
del cache[str(i)]
del data[str(i)]

assert list(cache.items()) == list(data.items())

# And some updates + inserts.
for i in range(2, num_items, 2):
cache[str(i)] = i * 10
data[str(i)] = i * 10

assert list(cache.items()) == list(data.items())


@dataclass
class Pair:
x: int
Expand Down

0 comments on commit 311ea10

Please sign in to comment.