Skip to content

Commit

Permalink
initial commit. it's working!
Browse files Browse the repository at this point in the history
TODO:
* sig
* labeler repo and declaration record
  https://atproto.com/specs/label#labeler-service-identity
  https://docs.bsky.app/docs/advanced-guides/moderation#labelers
* filter out bsky.app self labels that are already shown there
  • Loading branch information
snarfed committed Oct 20, 2024
1 parent bea9184 commit 6f76920
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 1 deletion.
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.coverage
/build/
/docs/_build/
/dist/
/.eggs/
/*.egg-info
/l
/local*/
flask_secret_key
private_notes
service_account_creds.json
TAGS
*.pyc
.DS_Store
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
# self-labeler
A Bluesky/AT Protocol mod service (labeler) that emits custom self-labels

A [Bluesky](https://bsky.social/)/[AT Protocol](https://atproto.com/)'s [labeler aka mod service](https://bsky.social/about/blog/4-13-2023-moderation) that emits the custom [self-labels](https://atproto.com/specs/label#self-labels-in-records) that already exist inside records.

Apart from the self labels built into [bsky.app](https://bsky.app/), other custom self-labels are often not displayed or handled by clients. This surfaces those labels and makes them visible.

[Background discussion.](https://github.com/bluesky-social/atproto/discussions/2885)
106 changes: 106 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Self-labeler Flask app and implementation.
Uses jetstream:
https://github.com/bluesky-social/jetstream
Example command line:
websocat 'wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post'
"""
from datetime import datetime
import json
import logging
import os
from pathlib import Path
from queue import Queue
import threading
from threading import Lock, Thread

from flask import Flask
import lexrpc.flask_server
import lexrpc.server
import simple_websocket

logger = logging.getLogger(__name__)
logging.basicConfig()

# elements are Queues of lists of dict com.atproto.label.defs objects to emit
subscribers = []
subscribers_lock = Lock()


# Flask app
app = Flask(__name__)
app.json.compact = False
app_dir = Path(__file__).parent
app.config.from_pyfile(app_dir / 'config.py')


# ATProto XRPC server
xrpc_server = lexrpc.server.Server(validate=True)
lexrpc.flask_server.init_flask(xrpc_server, app)


def jetstream():
host = os.environ['JETSTREAM_HOST']
logger.info(f'connecting to jetstream at {host}')

ws = simple_websocket.Client(f'wss://{host}/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.actor.profile')
while True:
try:
msg = json.loads(ws.receive())
commit = msg.get('commit')
if (msg.get('kind') == 'commit'
and commit.get('operation') in ('create', 'update')):
uri = f'at://{msg["did"]}/{commit["collection"]}/{commit["rkey"]}'
labels = [{
'ver': 1,
'src': msg['did'],
'uri': uri,
'cid': commit['cid'],
'val': label['val'],
'cts': datetime.now().isoformat(),
# 'sig': , TODO
} for label in commit['record'].get('labels', {}).get('values', [])]
if labels:
logger.info(f'emitting to {len(subscribers)} subscribers: {uri} {[l["val"] for l in labels]}')
for sub in subscribers:
sub.put({
'seq': msg['time_us'],
'labels': labels,
})

except simple_websocket.ConnectionClosed as cc:
logger.info(f'reconnecting after jetstream disconnect: {cc}')


@xrpc_server.method('com.atproto.label.subscribeLabels')
def subscribe_labels(cursor=None):
if cursor:
logger.info(f'ignoring cursor {cursor}, starting at head')
# raise NotImplementedError('cursor not yet supported')

labels = Queue()
with subscribers_lock:
subscribers.append(label)

try:
while True:
yield labels.get()
finally:
subscribers.remove(labels)


# start jetstream consumer
# if LOCAL_SERVER or not DEBUG:
assert 'jetstream' not in [t.name for t in threading.enumerate()]
Thread(target=jetstream, name='jetstream').start()


@app.get('/liveness_check')
@app.get('/readiness_check')
def health_check():
"""App Engine Flex health checks.
https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python#updated_health_checks
"""
return 'OK'
39 changes: 39 additions & 0 deletions app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python
#
# gcloud -q app deploy --project bridgy-federated hub.yaml

# application: bridgy-federated

service: default
env: flex
runtime: python
runtime_config:
operating_system: ubuntu22
runtime_version: "3.12"

resources:
cpu: 1
memory_gb: 1.6

# can't be internal because Bluesky relay(s) need to be able to connect to us
# over websocket for subscribeRepos
network:
instance_ip_mode: external

env_variables:
PLC_HOST: plc.directory
APPVIEW_HOST: api.bsky.app
RELAY_HOST: bsky.network
JETSTREAM_HOST: jetstream2.us-west.bsky.network

manual_scaling:
instances: 1

liveness_check:
# default 300; does lowering this speed up deploy? seems like maybe, a bit?
# https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python#liveness_checks
initial_delay_sec: 30

# https://cloud.google.com/appengine/docs/flexible/python/runtime#application_startup
# https://docs.gunicorn.org/en/latest/settings.html#timeout
entrypoint: gunicorn --workers 1 --threads 50 -b :$PORT app:app
26 changes: 26 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Flask config and env vars.
https://flask.palletsprojects.com/en/latest/config/
"""
import logging
import os

if False:
ENV = 'development'
SECRET_KEY = 'sooper seekret'
else:
ENV = 'production'
with open('flask_secret_key') as f:
SECRET_KEY = f.read().strip()

logging.getLogger().setLevel(logging.INFO)
# if logging_client := getattr(appengine_config, 'logging_client'):
# logging_client.setup_logging(log_level=logging.INFO)

for logger in ('lexrpc',):
logging.getLogger(logger).setLevel(logging.DEBUG)

os.environ.setdefault('APPVIEW_HOST', 'api.bsky.local')
os.environ.setdefault('RELAY_HOST', 'bsky.network.local')
os.environ.setdefault('PLC_HOST', 'plc.bsky.local')
os.environ.setdefault('JETSTREAM_HOST', 'jetstream.local')
35 changes: 35 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
git+https://github.com/snarfed/lexrpc.git#egg=lexrpc
git+https://github.com/snarfed/arroba.git#egg=arroba

bases==0.3.0
cachetools==5.5.0
carbox==0.3
cbor2==5.6.5
certifi==2024.8.30
cffi==1.17.1
charset-normalizer==3.4.0
cryptography==43.0.3
dag-cbor==0.3.3
dag-json==0.3
dnspython==2.7.0
Flask==3.0.3
flask-sock==0.7.0
grapheme==0.6.0
h11==0.14.0
idna==3.10
iterators==0.2.0
Jinja2==3.1.4
lexrpc==1.0
libipld==1.2.3
MarkupSafe==3.0.2
multiformats==0.3.1.post4
multiformats-config==0.3.1
pycparser==2.22
pyjwt==2.9.0
requests==2.32.3
simple-websocket==1.1.0
typing-extensions==4.12.2
typing-validation==1.2.11.post4
urllib3==2.2.3
Werkzeug==3.0.4
wsproto==1.2.0
19 changes: 19 additions & 0 deletions test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Unit tests for app.py."""
from threading import Thread
from unittest import TestCase

from lexrpc.tests.test-client import FakeWebsocketClient

from app import app, jetstream



class AppTest(TestCase):
def setUp(self):
simple_websocket.Client = FakeWebsocketClient
FakeWebsocketClient.sent = []
FakeWebsocketClient.to_receive = []

def test_label(self):
FakeWebsocketClient.to_receive = []
Thread(target=jetstream, daemon=True).start()

0 comments on commit 6f76920

Please sign in to comment.