↓ It's a technical article, but it's about how this app was created ↓
Please check it out!
1. Introduction
This article proposes a client-side data management method using event sourcing.
The proposal aims to address the following requirements:
- Reduce database server maintenance costs and allow the application to hold data independently in the initial stages
- Design that can flexibly accommodate future needs for synchronization with servers
- Design that can flexibly respond to changes in data aggregation requirements
- Easily usable from React-implemented client applications
This article will demonstrate that the proposed implementation meets the above requirements and can ultimately be used from React-based client applications in the following manner:
Recording events
const { addEvent } = useListenEvent(); addEvent({ itemId });
Using aggregated data
const { totals } = useListenEvent(); const total = totals[itemId];
2. Technologies Used
To realize this proposal, we adopted event sourcing and CQRS.
Event sourcing is a technique that records state changes in an application as events and reconstructs the current state by replaying these events. This method has the following advantages:
- Maintaining a complete audit trail
- Ability to rewind the state to any point in time
- Flexible analysis and derivation of models based on events
CQRS is an architectural pattern that separates the responsibilities of data updates (commands) and reads (queries). This pattern has the following advantages:
- Ability to optimize reading and writing independently
- Simplification of complex domain models
3. System Design
The proposed system consists of the following components:
- EventStore: Responsible for storing and retrieving events
- EventAggregator: Responsible for aggregating events and managing state
- React Hooks: Responsible for connecting UI and data layer
By combining these components, we realize event sourcing and CQRS on the client side.
4. Implementation Details
4.1 Data Structures
The basic data structures are as follows:
// Base type for aggregatable events interface Aggregatable {} // Type for data recorded in the local database interface StoredEvent<V extends Aggregatable> { localId: number; // Auto-increment primary key globalId: string; // UUIDv7 for global primary key value: V; } // Base type for aggregated data interface AggregatedValue<T> { aggregationKey: T; }
Events are defined as Aggregatable, become StoredEvent when saved in the local database, and then become AggregatedValue after aggregation processing.
Aggregatable and AggregatedValue can freely set properties in interfaces that inherit from them.
StoredEvent has a localId that is automatically numbered locally, as well as a globalId in UUIDv7 format. These values ensure that processing can be done in chronological order with some degree of accuracy even if the data is sent to the server and processed in the future.
Based on these interfaces, specific events and aggregated values are defined.
4.2 EventStore
The EventStore class provides functionality to store events using IndexedDB and retrieve them efficiently. The main features are as follows:
- Adding events
- Retrieving events before and after a specified ID
- Event subscription functionality
import { Aggregatable, StoredEvent } from "./types"; import { v7 as uuidv7 } from 'uuid'; export interface GetEventsOptions { limit?: number; } const DatabaseVersion = 1; const DataStoreName = 'Events'; export class EventStore<V extends Aggregatable> { private db: IDBDatabase | null = null; private listeners: Set<(msg: StoredEvent<V>) => Promise<void>> = new Set(); constructor(public databaseName: string) {} async initialize(): Promise<void> { await this.initializeDatabase(); } async add(value: V): Promise<void> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readwrite'); const store = trx.objectStore(DataStoreName); const globalId = uuidv7(); const request = store.add({ globalId, value }); request.onerror = () => reject(new Error(`Add error: ${request.error?.message || 'Unknown error'}`)); request.onsuccess = () => { const localId = request.result as number; const storedEvent: StoredEvent<V> = { localId, globalId, value }; this.broadcastAddEvent(storedEvent); resolve(); }; trx.onerror = () => reject(new Error(`Transaction error: ${trx.error?.message || 'Unknown error'}`)); trx.onabort = () => reject(new Error('Transaction aborted')); }); } async getEventsAfter(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'next', IDBKeyRange.lowerBound(localId, true), options); } async getEventsBefore(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'prev', IDBKeyRange.upperBound(localId, true), options); } private async getEvents( direction: IDBCursorDirection, range: IDBKeyRange, options?: GetEventsOptions ): Promise<StoredEvent<V>[]> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readonly'); const store = trx.objectStore(DataStoreName); const results: StoredEvent<V>[] = []; const request = store.openCursor(range, direction); request.onerror = () => reject(request.error); request.onsuccess = (event) => { const cursor = (event.target as IDBRequest<IDBCursorWithValue>).result; if (cursor) { const storedEvent: StoredEvent<V> = { localId: cursor.key as number, globalId: cursor.value.globalId, value: cursor.value.value }; results.push(storedEvent); if (!options?.limit || results.length < options.limit) { cursor.continue(); } else { resolve(results); } } else { resolve(results); } }; }); } private async initializeDatabase(): Promise<void> { ... snip ... } async hasAnyRecord(): Promise<boolean> { ... snip ... } private broadcastAddEvent(event: StoredEvent<V>) { ... snip ... } subscribe(listener: (msg: StoredEvent<V>) => Promise<void>): () => void { ... snip... } dispose() { ...snip... } }
4.3 EventAggregator
The EventAggregator class aggregates events and manages the current state. It is an abstract class, and concrete classes inheriting from it need to implement the functions marked as abstract. The main features are as follows:
- Processing new events
- Batch processing for event aggregation
- Managing processed ranges
- Providing aggregation results
import {AggregatedValue, Aggregatable, StoredEvent} from "./types"; import { EventStore, GetEventsOptions } from "./eventStore"; interface ProcessedRange { start: number; end: number; } const MetadataStoreName = 'Metadata'; const ProcessedRangesKey = 'ProcessedRanges'; export type AggregatorChangeListener<V> = (changedItem: V) => void; export abstract class EventAggregator<V extends Aggregatable, A extends AggregatedValue<string>> { protected db: IDBDatabase | null = null; private processedRanges: ProcessedRange[] = []; private processingIntervalId: number | null = null; private listeners: Set<AggregatorChangeListener<A>> = new Set(); constructor( protected eventStore: EventStore<V>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) {} async initialize(): Promise<void> { ...snip... } // Implement these abstract functions in inherited classes protected abstract applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void; protected abstract processEvent(trx: IDBTransaction, event: StoredEvent<V>): Promise<A>; private applyMetadataMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { const store = db.createObjectStore(MetadataStoreName, {keyPath: 'key'}); } } private async handleNewEvent(ev: StoredEvent<V>): Promise<void> { if (!this.db) throw new Error('Database not initialized'); const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { const changedItem = await this.processEvent(tx, ev); this.updateProcessedRanges(ev.localId, ev.localId); await this.saveProcessedRanges(tx); if (changedItem) { this.notifyListeners([changedItem]); } return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing new event:", err); tx.abort(); } } private async processEvents(): Promise<void> { if (!this.db) throw new Error('Database not initialized'); if (this.isFullyCovered()) { this.stopProcessing(); return; } const range = await this.findRangeToProcess(); if (!range) { return; } const options: GetEventsOptions = { limit: Math.min(this.batchSize, range.end - range.start - 1) }; const eventsBefore = await this.eventStore.getEventsBefore(range.end, options); if (eventsBefore.length === 0) { return; } const maxId = eventsBefore[0].localId; const minId = eventsBefore[eventsBefore.length-1].localId; const changedItems: A[] = []; const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { for (const ev of eventsBefore) { const changedItem = await this.processEvent(tx, ev); if (changedItem) { changedItems.push(changedItem); } } if (eventsBefore.length < this.batchSize) { this.updateProcessedRanges(1, maxId); } else { this.updateProcessedRanges(minId, maxId); } await this.saveProcessedRanges(tx); return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing events:", err); tx.abort(); } this.notifyListeners(changedItems); } private async findRangeToProcess(): Promise<ProcessedRange | null> { const size = this.processedRanges.length; if (size === 0) { return { start: 0, end: Number.MAX_SAFE_INTEGER } } const rangeEnd = this.processedRanges[size-1].start; if (rangeEnd === 1) { return null; } if (rangeEnd === 0) { throw new Error('Unexpected value'); } if (1 < size) { const rangeStart = this.processedRanges[size-2].end; return { start: rangeStart, end: rangeEnd }; } // size === 1 return { start: 0, end: rangeEnd }; } private isFullyCovered(): boolean { if (this.processedRanges.length === 0) { this.eventStore.hasAnyRecord().then((hasAnyRecord) => { return !hasAnyRecord; }); return false; } return this.processedRanges.length === 1 && this.processedRanges[0].start === 0; } private updateProcessedRanges(start: number, end: number): void { const newRange: ProcessedRange = { start, end }; const allRanges = [...this.processedRanges, newRange]; allRanges.sort((a, b) => a.start - b.start); const mergedRanges: ProcessedRange[] = []; let currentRange = allRanges[0]; for (let i = 1; i < allRanges.length; i++) { const nextRange = allRanges[i]; if (currentRange.end + 1 >= nextRange.start) { currentRange.end = Math.max(currentRange.end, nextRange.end); } else { mergedRanges.push(currentRange); currentRange = nextRange; } } mergedRanges.push(currentRange); this.processedRanges = mergedRanges; } private async loadProcessedRanges(): Promise<void> { ...snip... } private async saveProcessedRanges(tx: IDBTransaction): Promise<void> { ...snip... } startProcessing(): void { ...snip... } stopProcessing(): void { ...snip... } private notifyListeners(changes: A[]): void { ...snip... } subscribe(listener: AggregatorChangeListener<A>): () => void { ...snip... } private async initializeDatabase(): Promise<void> { ...snip... } dispose() { ... snip... } }
5. Specific Use Case
To demonstrate the effectiveness of this system, we present an example of its use in a music playback application.
5.1 Recording and Aggregating Music Playback Events
MusicListenEventStore stores events indicating that music has been played. MusicListenEventAggregator aggregates the total number of plays for each track.
MusicListenEventStore and MusicListenEventAggregator Classes
import {Aggregatable, AggregatedValue, StoredEvent} from "../../types"; import {EventStore} from "../../eventStore"; import {EventAggregator} from "../../eventAggregator"; const TotalStoreName = 'Total'; export interface MusicListenEvent extends Aggregatable { itemId: string; } export interface MusicListenAggregationValue extends AggregatedValue<string> { total: number; } export class MusicListenEventStore extends EventStore<MusicListenEvent> {} export class MusicListenEventAggregator extends EventAggregator<MusicListenEvent, MusicListenAggregationValue> { constructor( protected eventStore: EventStore<MusicListenEvent>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) { ...snip... } protected applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { db.createObjectStore(TotalStoreName, {keyPath: 'aggregationKey'}); } } protected async processEvent(trx: IDBTransaction, event: StoredEvent<MusicListenEvent>): Promise<MusicListenAggregationValue> { return new Promise((resolve, reject) => { const store = trx.objectStore(TotalStoreName); const aggregationKey = event.value.itemId; const getReq = store.get(aggregationKey); getReq.onerror = (error) => reject(getReq.error); getReq.onsuccess = () => { const data = getReq.result as MusicListenAggregationValue | undefined; const total = (data?.total ?? 0) + 1; const updated: MusicListenAggregationValue = { aggregationKey: aggregationKey, total }; const putReq = store.put(updated); putReq.onerror = () => reject(putReq.error); putReq.onsuccess = () => resolve(updated); }; }); } async getAggregatedTotal(itemIds: string[]): Promise<{ [key: string]: number }> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([TotalStoreName], 'readonly'); const store = trx.objectStore(TotalStoreName); const getItemData = (itemId: string): Promise<[string, number]> => new Promise((resolveItem, rejectItem) => { const request = store.get(itemId); request.onerror = () => rejectItem(new Error(`Error fetching data for item ${itemId}: ${request.error}`)); request.onsuccess = () => { const data = request.result as MusicListenAggregationValue; resolveItem([itemId, data ? data.total : 0]); }; }); Promise.all(itemIds.map(getItemData)) .then(entries => entries.reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}) ) .then(resolve) .catch(reject); trx.onerror = () => { reject(new Error(`Transaction error: ${trx.error}`)); }; }); } }
If changes to the aggregation logic or additions to the aggregation targets become necessary after release, a new Aggregator can be created.
For example, consider creating a MusicListenEventAggregatorV2 that newly adds "time of first play" as an aggregation target.
In this case, if you specify an instance of MusicListenEventStore to the constructor of MusicListenEventAggregatorV2 in the same way as MusicListenEventAggregator, MusicListenEventAggregatorV2 can also aggregate all events accumulated in MusicListenEventStore.
useListenEvent Hook
By using a React custom hook like the following, it becomes possible to easily add events and retrieve aggregation results from React components using the above classes.
'use client'; import React, {createContext, useContext, useState, useEffect, ReactNode, useRef, useMemo} from 'react'; import {MusicListenAggregationValue, MusicListenEvent, MusicListenEventAggregator} from './listenEvent'; import {EventStore} from "../../eventStore"; type ListenEventContextType = { addEvent: (event: MusicListenEvent) => Promise<void>; totals: { [key: string]: number }; isInitializing: boolean; isSyncing: boolean; error: Error | null; }; export const ListenEventContext = createContext<ListenEventContextType | undefined>(undefined); export type ListenEventContextProps = { keys: string[]; children: ReactNode; }; export const ListenEventProvider: React.FC<ListenEventContextProps> = ({ keys, children }) => { const [totals, setTotals] = useState<{ [key: string]: number }>({}); const [isInitializing, setIsInitializing] = useState(true); const [isSyncing, setIsSyncing] = useState(false); const [error, setError] = useState<Error | null>(null); const { eventStore, aggregator } = useMemo(() => { const eventStore = new EventStore<MusicListenEvent>('MusicListenEvents'); const aggregator = new MusicListenEventAggregator(eventStore, 'MusicListenAggregator_V2'); (async() => { await eventStore.initialize(); await aggregator.initialize(); aggregator.startProcessing(); })().then(() => { setIsInitializing(false); setIsSyncing(true); }).catch((err) => { setError(new Error(`Failed to initialize EventStore/Aggregator: ${err}`)); setIsInitializing(false); setIsSyncing(false); }); return { eventStore, aggregator }; }, []); useEffect(() => { const handleUpdate = (updated: MusicListenAggregationValue) => { setTotals(prevTotals => ({ ...prevTotals, [updated.aggregationKey]: updated.total })); }; const unsubscribe = aggregator.subscribe(handleUpdate); return () => { if (typeof unsubscribe === 'function') { unsubscribe(); } }; }, [eventStore, aggregator]); useEffect(() => { if (isInitializing || error != null) return; const fetchTotals = async () => { try { const result = await aggregator.getAggregatedTotal(keys); if (result != null) { setTotals(result); } } catch (err) { setError(err instanceof Error ? err : new Error('Failed to fetch totals')); } setIsSyncing(false); }; fetchTotals(); }, [keys, isInitializing]); return ( <ListenEventContext.Provider value={{ addEvent: eventStore.add.bind(eventStore), totals, isInitializing, isSyncing, error }}> {children} </ListenEventContext.Provider> ); }; export const useListenEvent = () => { const context = useContext(ListenEventContext); if (context === undefined) { throw new Error('useListenTotal must be used within a ListenEventContext'); } return context; };
The usage is as shown at the beginning:
Recording an event
const { addEvent } = useListenEvent(); addEvent({ itemId });
Using aggregated data
const { totals } = useListenEvent(); const total = totals[itemId];
However, since React.Context is used, the above processes need to be written within the following ListenEventProvider:
<ListenEventProvider keys={listenKeys}> // Need to execute within this </ListenEventProvider>
6. Challenges
For the implementation based on this proposal to effectively integrate with the server side, there are the following challenges:
- Implementing a mechanism to upload events recorded on the client to the server
- Efficient processing and aggregation of event data on the server
- Ensuring consistency between client-side and server-side aggregation results
7. Conclusion
This article proposed a method for implementing event sourcing and CQRS on the client side.
This proposal makes it possible to balance the reduction of database server maintenance costs in the initial stages with ensuring future extensibility.
We hope this proposal will serve as an aid for efficient data management methods for mobile application and SPA developers.