Skip to content

Commit

Permalink
Storage + Data Model Refactor (fixes webrecorder#3):
Browse files Browse the repository at this point in the history
 - Add default vs custom (s3) storage
 - K8S: All storages correspond to secrets
 - K8S: Default storages inited via helm
 - K8S: Custom storage results in custom secret (per archive)
 - K8S: Don't add secret per crawl config
 - API for changing storage per archive
 - Docker: default storage just hard-coded from env vars (only one for now)
 - Validate custom storage via aiobotocore before confirming
 - Data Model: remove usage from users
 - Data Model: support adding multiple files per crawl for parallel crawls
 - Data Model: track completions for parallel crawls
 - Data Model: initial support for tags per crawl, add collection as 'coll' tag

README fixes
  • Loading branch information
ikreymer committed Oct 10, 2021
1 parent b6d1e49 commit 19879fe
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 163 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ Note: When deployed in local Docker, failed crawls are not retried currently. Sc

To deploy to K8s, `helm` is required. Browsertrix Cloud comes with a helm chart, which can be installed as follows:

`helm install -f ./chart/values.yaml btrix`
`helm install -f ./chart/values.yaml btrix ./chart/`

This will create a `browsertrix-cloud` service in the default namespace.

For a quick update, the following is recommended:

`helm upgrade -f ./chart/values.yaml btrix ./chart/ --recreate-pods
`helm upgrade -f ./chart/values.yaml btrix ./chart/`


Note: When deployed in Kubernetes, failed crawls are automatically retried. Scheduling is handled via Kubernetes Cronjobs, and crawl jobs are run in the `crawlers` namespace.
Expand Down
84 changes: 43 additions & 41 deletions backend/archives.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""
Archive API handling
"""
import os
import uuid
from datetime import datetime

from typing import Optional, Dict

from typing import Dict, Union, Literal

from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
Expand Down Expand Up @@ -36,15 +34,24 @@ class UpdateRole(InviteRequest):
"""Update existing role for user"""


# ============================================================================
class DefaultStorage(BaseModel):
""" Storage reference """

type: Literal["default"] = "default"
name: str
path: str = ""


# ============================================================================
class S3Storage(BaseModel):
"""S3 Storage Model"""

type: str = "S3Storage"
type: Literal["s3"] = "s3"

endpoint_url: str
access_key: str
secret_key: str
is_public: Optional[bool]


# ============================================================================
Expand All @@ -55,7 +62,7 @@ class Archive(BaseMongoModel):

users: Dict[str, UserRole]

storage: S3Storage
storage: Union[S3Storage, DefaultStorage]

usage: Dict[str, int] = {}

Expand All @@ -81,9 +88,10 @@ def _is_auth(self, user, value):

def serialize_for_user(self, user: User):
"""Serialize based on current user access"""
exclude = set()
exclude = {"storage"}

if not self.is_owner(user):
exclude = {"users", "storage"}
exclude.add("users")

if not self.is_crawler(user):
exclude.add("usage")
Expand All @@ -108,46 +116,33 @@ def __init__(self, db, email):

self.router = None
self.archive_crawl_dep = None
self.archive_owner_dep = None

async def add_archive(self, archive: Archive):
"""Add new archive"""
return await self.archives.insert_one(archive.to_dict())

@staticmethod
def get_endpoint_url(base, id_):
"""Get endpoint for a specific archive from base"""
return os.path.join(base, id_) + "/"

async def create_new_archive_for_user(
self,
archive_name: str,
base_endpoint_url: str,
access_key: str,
secret_key: str,
storage_name,
user: User,
):
# pylint: disable=too-many-arguments
"""Create new archive with default storage for new user"""

id_ = str(uuid.uuid4())

endpoint_url = self.get_endpoint_url(base_endpoint_url, id_)

storage = S3Storage(
endpoint_url=endpoint_url,
access_key=access_key,
secret_key=secret_key,
name="default",
)
storage_path = id_ + "/"

archive = Archive(
id=id_,
name=archive_name,
users={str(user.id): UserRole.OWNER},
storage=storage,
storage=DefaultStorage(name=storage_name, path=storage_path),
)

print(f"Created New Archive with storage at {endpoint_url}")
print(f"Created New Archive with storage {storage_name} / {storage_path}")
await self.add_archive(archive)

async def get_archives_for_user(self, user: User, role: UserRole = UserRole.VIEWER):
Expand All @@ -174,6 +169,14 @@ async def update(self, archive: Archive):
"""Update existing archive"""
self.archives.replace_one({"_id": archive.id}, archive.to_dict())

async def update_storage(
self, archive: Archive, storage: Union[S3Storage, DefaultStorage]
):
""" Update storage on an existing archive """
return await self.archives.find_one_and_update(
{"_id": archive.id}, {"$set": {"storage": storage.dict()}}
)

async def add_new_user_invite(
self, new_user_invite: NewUserInvite, inviter_email, archive_name
):
Expand Down Expand Up @@ -251,6 +254,17 @@ async def archive_crawl_dep(

return archive

async def archive_owner_dep(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_owner(user):
raise HTTPException(
status_code=403,
detail="User does not have permission to perform this action",
)

return archive

router = APIRouter(
prefix="/archives/{aid}",
dependencies=[Depends(archive_dep)],
Expand All @@ -259,6 +273,7 @@ async def archive_crawl_dep(

ops.router = router
ops.archive_crawl_dep = archive_crawl_dep
ops.archive_owner_dep = archive_owner_dep

@app.get("/archives", tags=["archives"])
async def get_archives(user: User = Depends(user_dep)):
Expand All @@ -274,16 +289,9 @@ async def get_archive(
@router.post("/invite", tags=["invites"])
async def invite_user(
invite: InviteRequest,
archive: Archive = Depends(archive_dep),
archive: Archive = Depends(archive_owner_dep),
user: User = Depends(user_dep),
):

if not archive.is_owner(user):
raise HTTPException(
status_code=403,
detail="User does not have permission to invite other users",
)

invite_code = uuid.uuid4().hex

invite_pending = InvitePending(
Expand Down Expand Up @@ -323,16 +331,10 @@ async def invite_user(
@router.patch("/user-role", tags=["invites"])
async def set_role(
update: UpdateRole,
archive: Archive = Depends(archive_dep),
archive: Archive = Depends(archive_owner_dep),
user: User = Depends(user_dep),
):

if not archive.is_owner(user):
raise HTTPException(
status_code=403,
detail="User does not have permission to invite other users",
)

other_user = await users.db.get_by_email(update.email)
if not other_user:
raise HTTPException(
Expand Down
4 changes: 2 additions & 2 deletions backend/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ async def add_crawl_config(
crawlconfig = CrawlConfig.from_dict(data)

await self.crawl_manager.add_crawl_config(
crawlconfig=crawlconfig,
storage=archive.storage,
crawlconfig=crawlconfig, storage=archive.storage
)

return result

async def update_crawl_schedule(self, cid: str, update: UpdateSchedule):
Expand Down
43 changes: 32 additions & 11 deletions backend/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ class CrawlScale(BaseModel):
scale: int = 1


# ============================================================================
class CrawlFile(BaseModel):
""" output of a crawl """

filename: str
hash: str
size: int


# ============================================================================
class Crawl(BaseMongoModel):
""" Store State of a Crawl (Finished or Running) """
Expand All @@ -45,12 +54,13 @@ class Crawl(BaseMongoModel):
state: str

scale: int = 1
completions: Optional[int] = 0

stats: Optional[Dict[str, str]]

filename: Optional[str]
size: Optional[int]
hash: Optional[str]
files: Optional[List[CrawlFile]]

tags: Optional[Dict[str, str]]


# ============================================================================
Expand Down Expand Up @@ -90,19 +100,31 @@ async def init_redis(self, redis_url):

async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """
crawl = await self.crawl_manager.validate_crawl_complete(msg)
crawl, crawl_file = await self.crawl_manager.process_crawl_complete(msg)
if not crawl:
print("Not a valid crawl complete msg!", flush=True)
return

await self.store_crawl(crawl, update_existing=True)
await self.store_crawl(crawl, crawl_file)

async def store_crawl(self, crawl: Crawl, crawl_file: CrawlFile = None):
"""Add finished crawl to db, increment archive usage.
If crawl file provided, update and add file"""
if crawl_file:
crawl_update = {
"$set": crawl.to_dict(exclude={"files", "completions"}),
"$push": {"files": crawl_file.dict()},
}

async def store_crawl(self, crawl: Crawl, update_existing=False):
""" Add finished crawl to db, increment archive usage """
if update_existing:
await self.crawls.find_one_and_replace(
{"_id": crawl.id}, crawl.to_dict(), upsert=True
if crawl.state == "complete":
crawl_update["$inc"] = {"completions": 1}

await self.crawls.find_one_and_update(
{"_id": crawl.id},
crawl_update,
upsert=True,
)

else:
try:
await self.crawls.insert_one(crawl.to_dict())
Expand All @@ -112,7 +134,6 @@ async def store_crawl(self, crawl: Crawl, update_existing=False):

dura = int((crawl.finished - crawl.started).total_seconds())

print(crawl, flush=True)
print(f"Duration: {dura}", flush=True)

await self.archives.inc_usage(crawl.aid, dura)
Expand Down
4 changes: 2 additions & 2 deletions backend/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def serialize(self):
"""convert Archive to dict"""
return self.dict(exclude_unset=True, exclude_defaults=True, exclude_none=True)

def to_dict(self):
def to_dict(self, **opts):
"""convert to dict for mongo"""
res = self.dict()
res = self.dict(**opts)
res["_id"] = res.pop("id", "")
return res
Loading

0 comments on commit 19879fe

Please sign in to comment.