Skip to content

Commit

Permalink
Add support for multiple period timeframes
Browse files Browse the repository at this point in the history
  • Loading branch information
notVitaliy committed Nov 30, 2020
1 parent f063035 commit 4be16e5
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 158 deletions.
5 changes: 4 additions & 1 deletion src/core/strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ export class StrategyCore {
this.baseStrategy = new (strategyLoader(strategy))(exchange, symbol, this.strategyConfig)

const storeOpts = { exchange, symbol, strategy }
PeriodStore.instance.addSymbol(storeOpts, { period, lookbackSize: 250 })

const periods = strategyConfig.periods ? strategyConfig.periods.split(',') : [period]

PeriodStore.instance.addSymbol(storeOpts, { periods, lookbackSize: 250 })

this.orderEngine = new OrderEngine(this.exchangeProvider, strategyConfig)
this.initStrategyWallet()
Expand Down
1 change: 1 addition & 0 deletions src/lib/db/db.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ export interface StrategyConfig {

days: number
period: string
periods: string
markDn: number
markUp: number
orderPollInterval: number
Expand Down
32 changes: 14 additions & 18 deletions src/store/period.store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ describe('PeriodStore', () => {

beforeAll(() => {
periodStore = PeriodStore.instance
periodStore.addSymbol(storeOpts, { period: '1m', lookbackSize: 2 })
periodStore.addSymbol(storeOpts, { periods: ['1m'], lookbackSize: 2 })
periodStore.start(storeOpts)
})

afterEach(() => {
periodStore.clearPeriods('test.test.test')
periodStore.clearPeriods('test.test.test', '1m')
})

it('should add trades', () => {
periodStore.addTrade('test.test.test', makeNewOrder(now) as Trade)
expect(periodStore.periods.get('test.test.test').length).toEqual(1)
expect(periodStore.periods.get('test.test.test').get('1m').length).toEqual(1)
})

it('should set new periods', async (done) => {
const trades = [...Array(9).fill(0)].map((v, i) => makeNewOrder(time(now).sub.s(i * 10))).reverse()
trades.forEach((trade) => periodStore.addTrade('test.test.test', trade as Trade))

expect(periodStore.periods.get('test.test.test').length).toEqual(2)
expect(periodStore.periods.get('test.test.test').get('1m').length).toEqual(2)

done()
})
Expand All @@ -38,35 +38,31 @@ describe('PeriodStore', () => {
trades.forEach((trade, index) => {
periodStore.addTrade('test.test.test', trade as Trade)
// periodStore should keep 2 lookbacks and the third period (first inserted) should be dropped
if (index < 2) expect(periodStore.periods.get('test.test.test').length).toEqual(index + 1)
else expect(periodStore.periods.get('test.test.test').length).toEqual(2)
if (index < 2) expect(periodStore.periods.get('test.test.test').get('1m').length).toEqual(index + 1)
else expect(periodStore.periods.get('test.test.test').get('1m').length).toEqual(2)
})
done()
})

it('check for order of periods', async (done) => {
const trades = [...Array(4).fill(0)].map((v, i) => makeNewOrder(time(now).sub.m(i + 1))).reverse()

const buckets = trades.map((trade) =>
timebucket(trade.timestamp)
.resize('1m')
.toMilliseconds()
)
const buckets = trades.map((trade) => timebucket(trade.timestamp).resize('1m').toMilliseconds())

periodStore.addTrade('test.test.test', trades[0] as Trade)
expect(periodStore.periods.get('test.test.test')[0].time).toEqual(buckets[0])
expect(periodStore.periods.get('test.test.test').get('1m')[0].time).toEqual(buckets[0])

periodStore.addTrade('test.test.test', trades[1] as Trade)
expect(periodStore.periods.get('test.test.test')[0].time).toEqual(buckets[1])
expect(periodStore.periods.get('test.test.test')[1].time).toEqual(buckets[1] - 60000)
expect(periodStore.periods.get('test.test.test').get('1m')[0].time).toEqual(buckets[1])
expect(periodStore.periods.get('test.test.test').get('1m')[1].time).toEqual(buckets[1] - 60000)

periodStore.addTrade('test.test.test', trades[2] as Trade)
expect(periodStore.periods.get('test.test.test')[0].time).toEqual(buckets[2])
expect(periodStore.periods.get('test.test.test')[1].time).toEqual(buckets[2] - 60000)
expect(periodStore.periods.get('test.test.test').get('1m')[0].time).toEqual(buckets[2])
expect(periodStore.periods.get('test.test.test').get('1m')[1].time).toEqual(buckets[2] - 60000)

periodStore.addTrade('test.test.test', trades[3] as Trade)
expect(periodStore.periods.get('test.test.test')[0].time).toEqual(buckets[3])
expect(periodStore.periods.get('test.test.test')[1].time).toEqual(buckets[3] - 60000)
expect(periodStore.periods.get('test.test.test').get('1m')[0].time).toEqual(buckets[3])
expect(periodStore.periods.get('test.test.test').get('1m')[1].time).toEqual(buckets[3] - 60000)

done()
})
Expand Down
165 changes: 93 additions & 72 deletions src/store/period.store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { logger } from '../util/logger'
const singleton = Symbol()

interface PeriodConf {
period: string
periods: string[]
lookbackSize: number
}

Expand All @@ -25,13 +25,13 @@ export class PeriodStore {
return this[singleton]
}

public periods: Map<string, PeriodItem[]> = new Map()
public periods: Map<string, Map<string, PeriodItem[]>> = new Map()

private periodConfigs: Map<string, PeriodConf> = new Map()
private updateEmitters: Map<string, EventBusEmitter<PeriodItem[]>> = new Map()
private periodEmitters: Map<string, EventBusEmitter<PeriodItem[]>> = new Map()
private tradeEventTimeouts: Map<string, NodeJS.Timer> = new Map()
private periodTimer: Map<string, NodeJS.Timer> = new Map()
private updateEmitters: Map<string, Map<string, EventBusEmitter<PeriodItem[]>>> = new Map()
private periodEmitters: Map<string, Map<string, EventBusEmitter<PeriodItem[]>>> = new Map()
private tradeEventTimeouts: Map<string, Map<string, NodeJS.Timer>> = new Map()
private periodTimer: Map<string, Map<string, NodeJS.Timer>> = new Map()
private periodStates: Map<string, PERIOD_STATE> = new Map()

private constructor() {}
Expand All @@ -43,70 +43,92 @@ export class PeriodStore {

logger.debug(`Adding ${idStr} to period store.`)

this.periods.set(idStr, [])
this.periodConfigs.set(idStr, conf)
this.tradeEventTimeouts.set(idStr, null)
this.tradeEventTimeouts.set(idStr, new Map())
this.periodTimer.set(idStr, new Map())

this.periodStates.set(idStr, PERIOD_STATE.PREROLL)

const tradeListener: EventBusListener<Trade> = eventBus.get(EVENT.XCH_TRADE)(exchange)(symbol).listen
/* istanbul ignore next */
tradeListener((trade: Trade) => this.addTrade(idStr, trade))

this.updateEmitters.set(idStr, eventBus.get(EVENT.PERIOD_UPDATE)(exchange)(symbol)(strategy).emit)
this.periodEmitters.set(idStr, eventBus.get(EVENT.PERIOD_NEW)(exchange)(symbol)(strategy).emit)
this.updateEmitters.set(idStr, new Map())
this.periodEmitters.set(idStr, new Map())
this.periods.set(idStr, new Map())

const updateEmitters = this.updateEmitters.get(idStr)
const periodEmitters = this.periodEmitters.get(idStr)
const periods = this.periods.get(idStr)

conf.periods.forEach((period) => {
// tslint:disable-next-line: no-shadowed-variable
updateEmitters.set(period, (periods) => eventBus.get(EVENT.PERIOD_UPDATE)(exchange)(symbol)(strategy).emit({ period, periods }))
periodEmitters.set(period, () => eventBus.get(EVENT.PERIOD_NEW)(exchange)(symbol)(strategy).emit(period))
periods.set(period, [])
})
}

/* istanbul ignore next */
public start(storeOpts: StoreOpts) {
const idStr = this.makeIdStr(storeOpts)
this.periodStates.set(idStr, PERIOD_STATE.RUNNING)

this.periodTimer.set(
idStr,
setTimeout(() => this.publishPeriod(idStr), this.getPeriodTimeout(idStr))
)
const conf = this.periodConfigs.get(idStr)

conf.periods.forEach((period) => {
const timer = setTimeout(() => this.publishPeriod(idStr, period), this.getPeriodTimeout(idStr, period))

this.periodTimer.get(idStr).set(period, timer)
})
}

public stop(storeOpts: StoreOpts) {
const idStr = this.makeIdStr(storeOpts)
this.periodStates.set(idStr, PERIOD_STATE.STOPPED)
clearTimeout(this.periodTimer.get(idStr))

const conf = this.periodConfigs.get(idStr)

conf.periods.forEach((period) => {
clearTimeout(this.periodTimer.get(idStr).get(period))
})
}

public addTrade(idStr: string, trade: Trade) {
const periodState = this.periodStates.get(idStr)

if (periodState === PERIOD_STATE.STOPPED) return

const { period } = this.periodConfigs.get(idStr)
const periods = this.periods.get(idStr)
const { amount, price, timestamp } = trade
const bucket = timebucket(timestamp).resize(period).toMilliseconds()

// aka prerolled data
const tradeBasedPeriodChange = !periods.length || periods[0].time < bucket

if (tradeBasedPeriodChange) this.newPeriod(idStr, bucket)

// special handling if the trade is the first within the period
if (!periods[0].open) periods[0].open = price
periods[0].low = !periods[0].low ? price : Math.min(price, periods[0].low)
periods[0].high = Math.max(price, periods[0].high)
periods[0].close = price
periods[0].volume += amount
this.emitTrades(idStr)
if (!tradeBasedPeriodChange) return

// should only happen if timer did something wrong or on Preroll
this.checkPeriodWithoutTrades(idStr)
this.emitTradeImmediate(idStr)
this.periodEmitters.get(idStr)()
const conf = this.periodConfigs.get(idStr)
conf.periods.forEach((period) => {
const periods = this.periods.get(idStr).get(period)
const { amount, price, timestamp } = trade
const bucket = timebucket(timestamp).resize(period).toMilliseconds()

// aka prerolled data
const tradeBasedPeriodChange = !periods.length || periods[0].time < bucket

if (tradeBasedPeriodChange) this.newPeriod(idStr, period, bucket)

// special handling if the trade is the first within the period
if (!periods[0].open) periods[0].open = price
periods[0].low = !periods[0].low ? price : Math.min(price, periods[0].low)
periods[0].high = Math.max(price, periods[0].high)
periods[0].close = price
periods[0].volume += amount
this.emitTrades(idStr, period)
if (!tradeBasedPeriodChange) return

// should only happen if timer did something wrong or on Preroll
this.checkPeriodWithoutTrades(idStr, period)
this.emitTradeImmediate(idStr, period)
this.periodEmitters.get(idStr).get(period)()
})
}

public newPeriod(idStr: string, bucket: number) {
public newPeriod(idStr: string, period: string, bucket: number) {
const { lookbackSize } = this.periodConfigs.get(idStr)
const periods = this.periods.get(idStr)
const periods = this.periods.get(idStr).get(period)

// create an empty period-skeleton
periods.unshift({
Expand All @@ -122,41 +144,41 @@ export class PeriodStore {
if (periods.length > lookbackSize) periods.pop()
}

public clearPeriods(idStr: string) {
this.periods.set(idStr, [])
public clearPeriods(idStr: string, period: string) {
this.periods.get(idStr).set(period, [])
}

/**
* This method is throttling the `PERIOD_UPDATE` events to not result in a calculate call for every trade.
* Instead its called after 100ms timeout after a trade income was recognized
* @param idStr period identifier
*/
private emitTrades(idStr: string) {
private emitTrades(idStr: string, period: string) {
if (this.periodStates.get(idStr) === PERIOD_STATE.STOPPED) return

if (this.tradeEventTimeouts.has(idStr)) return
const fn = () => this.emitTradeImmediate(idStr)
this.tradeEventTimeouts.set(idStr, setTimeout(fn, 100))
if (this.tradeEventTimeouts.get(idStr).has(period)) return
const fn = () => this.emitTradeImmediate(idStr, period)
this.tradeEventTimeouts.get(idStr).set(period, setTimeout(fn, 100))
}

/**
* used to emit a immediate `PERIOD_UPDATE` event.
* Could be used if a period was finished and there is a calculation needed
* @param idStr period identifier
*/
private emitTradeImmediate(idStr: string) {
private emitTradeImmediate(idStr: string, period: string) {
if (this.periodStates.get(idStr) === PERIOD_STATE.STOPPED) return

const periods = this.periods.get(idStr)
this.updateEmitters.get(idStr)([...periods])
const periods = this.periods.get(idStr).get(period)
this.updateEmitters.get(idStr).get(period)([...periods])

// @todo(notVitaliy): Find a better place for this
wsServer.broadcast('period-update', { ...this.parseIdStr(idStr), period: periods[0] })

/* istanbul ignore else */
if (this.tradeEventTimeouts.has(idStr)) {
clearTimeout(this.tradeEventTimeouts.get(idStr))
this.tradeEventTimeouts.delete(idStr)
if (this.tradeEventTimeouts.get(idStr).has(period)) {
clearTimeout(this.tradeEventTimeouts.get(idStr).get(period))
this.tradeEventTimeouts.get(idStr).delete(period)
}
}

Expand All @@ -165,53 +187,52 @@ export class PeriodStore {
}

/* istanbul ignore next */
private getPeriodTimeout(idStr: string) {
const period = this.periodConfigs.get(idStr).period
private getPeriodTimeout(idStr: string, period: string) {
const bucketEnd = timebucket(period).getEndOfBucketMS()
const now = new Date().getTime()
logger.silly(`Next ${idStr} period will be published at ${new Date(bucketEnd).toTimeString()}`)
return bucketEnd > now ? bucketEnd - now : 0
}

/* istanbul ignore next */
private checkPeriodWithoutTrades(idStr: string) {
private checkPeriodWithoutTrades(idStr: string, period: string) {
if (this.periodStates.get(idStr) === PERIOD_STATE.STOPPED) return

const periods = this.periods.get(idStr)
const periods = this.periods.get(idStr).get(period)
if (periods.length < 2) return

const [period, lastPeriod] = periods
const [thisPeriod, lastPeriod] = periods
const { close } = lastPeriod

// period has no open price, so there wasnt any trade.
if (!period.open && close) {
period.open = close
period.high = close
period.low = close
period.close = close
period.volume = 0
if (!thisPeriod.open && close) {
thisPeriod.open = close
thisPeriod.high = close
thisPeriod.low = close
thisPeriod.close = close
thisPeriod.volume = 0
logger.silly(`Emitting PERIOD_UPDATE for empty period without any trade.`)
this.emitTrades(idStr)
this.emitTrades(idStr, period)
}
}

/* istanbul ignore next */
private publishPeriod(idStr: string) {
clearTimeout(this.periodTimer.get(idStr))
private publishPeriod(idStr: string, period: string) {
clearTimeout(this.periodTimer.get(idStr).get(period))
if (this.periodStates.get(idStr) === PERIOD_STATE.STOPPED) return

this.checkPeriodWithoutTrades(idStr)
this.checkPeriodWithoutTrades(idStr, period)
// Publish period to event bus
this.periodEmitters.get(idStr)()
this.periodEmitters.get(idStr).get(period)()

// @todo(notVitaliy): Find a better place for this
wsServer.broadcast('period-new', { ...this.parseIdStr(idStr), period: this.periods.get(idStr)[0] })
wsServer.broadcast('period-new', { ...this.parseIdStr(idStr), period: this.periods.get(idStr).get(period)[0] })

// prepare new period
this.newPeriod(idStr, timebucket(this.periodConfigs.get(idStr).period).toMilliseconds())
this.newPeriod(idStr, period, timebucket(period).toMilliseconds())

const fn = () => this.publishPeriod(idStr)
this.periodTimer.set(idStr, setTimeout(fn, this.getPeriodTimeout(idStr)))
const fn = () => this.publishPeriod(idStr, period)
this.periodTimer.get(idStr).set(period, setTimeout(fn, this.getPeriodTimeout(idStr, period)))
}

private parseIdStr(idStr: string): StoreOpts {
Expand Down
4 changes: 2 additions & 2 deletions src/strategy/strategies/aroon/aroon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class Aroon extends BaseStrategy<AroonOptions, void> {
this.options = { ...this.options, ...options }
}

public calculate(periods: PeriodItem[]) {
public calculate(period: string, periods: PeriodItem[]) {
if (!periods.length) return

const up = AroonUp.calculate(periods, this.options.periods)
Expand All @@ -87,7 +87,7 @@ export class Aroon extends BaseStrategy<AroonOptions, void> {
this.periods[0] = { up, down }
}

public onPeriod() {
public onPeriod(period: string) {
const signal = this.getSignal()
const [{ up, down }] = this.periods

Expand Down
Loading

0 comments on commit 4be16e5

Please sign in to comment.