|
| 1 | +import { render } from 'preact' |
| 2 | +import { useCallback, useEffect, useRef, useState } from 'preact/hooks' |
| 3 | +import ReconnectingWebSocket from 'reconnecting-websocket' |
| 4 | +import { |
| 5 | + type CollabData, |
| 6 | + ControlUI, |
| 7 | + GlobalStyle, |
| 8 | + type StreamwallConnection, |
| 9 | + useStreamwallState, |
| 10 | + useYDoc, |
| 11 | +} from 'streamwall-control-ui' |
| 12 | +import { |
| 13 | + type ControlCommand, |
| 14 | + stateDiff, |
| 15 | + type StreamwallState, |
| 16 | +} from 'streamwall-shared' |
| 17 | +import * as Y from 'yjs' |
| 18 | + |
| 19 | +function useStreamwallWebsocketConnection( |
| 20 | + wsEndpoint: string, |
| 21 | +): StreamwallConnection { |
| 22 | + const wsRef = useRef<{ |
| 23 | + ws: ReconnectingWebSocket |
| 24 | + msgId: number |
| 25 | + responseMap: Map<number, (msg: object) => void> |
| 26 | + }>() |
| 27 | + const [isConnected, setIsConnected] = useState(false) |
| 28 | + const { |
| 29 | + docValue: sharedState, |
| 30 | + doc: stateDoc, |
| 31 | + setDoc: setStateDoc, |
| 32 | + } = useYDoc<CollabData>(['views']) |
| 33 | + const [streamwallState, setStreamwallState] = useState<StreamwallState>() |
| 34 | + const appState = useStreamwallState(streamwallState) |
| 35 | + |
| 36 | + useEffect(() => { |
| 37 | + let lastStateData: StreamwallState | undefined |
| 38 | + const ws = new ReconnectingWebSocket(wsEndpoint, [], { |
| 39 | + maxReconnectionDelay: 5000, |
| 40 | + minReconnectionDelay: 1000 + Math.random() * 500, |
| 41 | + reconnectionDelayGrowFactor: 1.1, |
| 42 | + }) |
| 43 | + ws.binaryType = 'arraybuffer' |
| 44 | + ws.addEventListener('open', () => setIsConnected(true)) |
| 45 | + ws.addEventListener('close', () => { |
| 46 | + setStateDoc(new Y.Doc()) |
| 47 | + setIsConnected(false) |
| 48 | + }) |
| 49 | + ws.addEventListener('message', (ev) => { |
| 50 | + if (ev.data instanceof ArrayBuffer) { |
| 51 | + return |
| 52 | + } |
| 53 | + const msg = JSON.parse(ev.data) |
| 54 | + if (msg.response && wsRef.current != null) { |
| 55 | + const { responseMap } = wsRef.current |
| 56 | + const responseCb = responseMap.get(msg.id) |
| 57 | + if (responseCb) { |
| 58 | + responseMap.delete(msg.id) |
| 59 | + responseCb(msg) |
| 60 | + } |
| 61 | + } else if (msg.type === 'state' || msg.type === 'state-delta') { |
| 62 | + let state: StreamwallState |
| 63 | + if (msg.type === 'state') { |
| 64 | + state = msg.state |
| 65 | + } else { |
| 66 | + // Clone so updated object triggers React renders |
| 67 | + state = stateDiff.clone( |
| 68 | + stateDiff.patch(lastStateData, msg.delta), |
| 69 | + ) as StreamwallState |
| 70 | + } |
| 71 | + lastStateData = state |
| 72 | + setStreamwallState(state) |
| 73 | + } else { |
| 74 | + console.warn('unexpected ws message', msg) |
| 75 | + } |
| 76 | + }) |
| 77 | + wsRef.current = { ws, msgId: 0, responseMap: new Map() } |
| 78 | + }, []) |
| 79 | + |
| 80 | + const send = useCallback( |
| 81 | + (msg: ControlCommand, cb?: (msg: unknown) => void) => { |
| 82 | + if (!wsRef.current) { |
| 83 | + throw new Error('Websocket not initialized') |
| 84 | + } |
| 85 | + const { ws, msgId, responseMap } = wsRef.current |
| 86 | + ws.send( |
| 87 | + JSON.stringify({ |
| 88 | + ...msg, |
| 89 | + id: msgId, |
| 90 | + }), |
| 91 | + ) |
| 92 | + if (cb) { |
| 93 | + responseMap.set(msgId, cb) |
| 94 | + } |
| 95 | + wsRef.current.msgId++ |
| 96 | + }, |
| 97 | + [], |
| 98 | + ) |
| 99 | + |
| 100 | + useEffect(() => { |
| 101 | + if (!wsRef.current) { |
| 102 | + throw new Error('Websocket not initialized') |
| 103 | + } |
| 104 | + const { ws } = wsRef.current |
| 105 | + |
| 106 | + function sendUpdate(update: Uint8Array, origin: string) { |
| 107 | + if (origin === 'server') { |
| 108 | + return |
| 109 | + } |
| 110 | + wsRef.current?.ws.send(update) |
| 111 | + } |
| 112 | + |
| 113 | + function receiveUpdate(ev: MessageEvent) { |
| 114 | + if (!(ev.data instanceof ArrayBuffer)) { |
| 115 | + return |
| 116 | + } |
| 117 | + Y.applyUpdate(stateDoc, new Uint8Array(ev.data), 'server') |
| 118 | + } |
| 119 | + |
| 120 | + stateDoc.on('update', sendUpdate) |
| 121 | + ws.addEventListener('message', receiveUpdate) |
| 122 | + return () => { |
| 123 | + stateDoc.off('update', sendUpdate) |
| 124 | + ws.removeEventListener('message', receiveUpdate) |
| 125 | + } |
| 126 | + }, [stateDoc]) |
| 127 | + |
| 128 | + return { |
| 129 | + ...appState, |
| 130 | + isConnected, |
| 131 | + send, |
| 132 | + sharedState, |
| 133 | + stateDoc, |
| 134 | + } |
| 135 | +} |
| 136 | + |
| 137 | +function App() { |
| 138 | + const { BASE_URL } = import.meta.env |
| 139 | + |
| 140 | + const connection = useStreamwallWebsocketConnection( |
| 141 | + (BASE_URL === '/' ? `ws://${location.host}` : BASE_URL) + '/client/ws', |
| 142 | + ) |
| 143 | + |
| 144 | + return ( |
| 145 | + <> |
| 146 | + <GlobalStyle /> |
| 147 | + <ControlUI connection={connection} /> |
| 148 | + </> |
| 149 | + ) |
| 150 | +} |
| 151 | + |
| 152 | +render(<App />, document.body) |
0 commit comments