Skip to content

Commit

Permalink
Intro "global" eventEmitter for loosely coupled inter-module communic…
Browse files Browse the repository at this point in the history
…ation (DeviaVir#1324)

* setup basics for eventEmitter driven module communication

* get rid of unused dependency

* document silent option

* put conf loading before command line overrides
  • Loading branch information
krystophv authored and DeviaVir committed Feb 14, 2018
1 parent f5acbe6 commit 607a72c
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 193 deletions.
97 changes: 40 additions & 57 deletions boot.js
Original file line number Diff line number Diff line change
@@ -1,80 +1,63 @@
var _ = require('lodash')
var path = require('path')
var minimist = require('minimist')
var minimist = require('minimist')
var version = require('./package.json').version
var EventEmitter = require('events')

module.exports = function (cb) {
var zenbot = require('./')
var c = getConfiguration()
var zenbot = { version }
var args = minimist(process.argv.slice(3))
var conf

var defaults = require('./conf-sample')
Object.keys(defaults).forEach(function (k) {
if (typeof c[k] === 'undefined') {
c[k] = defaults[k]
if(!_.isUndefined(args.conf)){
try {
conf = require(path.resolve(process.cwd(), args.conf))
} catch (err) {
console.log('Fall back to conf.js, ' + err)
conf = require('./conf')
}
})
zenbot.conf = c

function withMongo () {
cb(null, zenbot)
} else {
conf = require('./conf')
}

var authStr = '', authMechanism

if(c.mongo.username){
authStr = encodeURIComponent(c.mongo.username)
var defaults = require('./conf-sample')
_.defaultsDeep(conf, defaults)
zenbot.conf = conf

var eventBus = new EventEmitter()
conf.eventBus = eventBus

var authStr = '', authMechanism, connectionString

if(c.mongo.password) authStr += ':' + encodeURIComponent(c.mongo.password)
if(conf.mongo.username){
authStr = encodeURIComponent(conf.mongo.username)

if(conf.mongo.password) authStr += ':' + encodeURIComponent(conf.mongo.password)

authStr += '@'

// authMechanism could be a conf.js parameter to support more mongodb authentication methods
authMechanism = 'DEFAULT'
}

var u = (function() {
if (c.mongo.connectionString) {
return c.mongo.connectionString
}

return 'mongodb://' + authStr + c.mongo.host + ':' + c.mongo.port + '/' + c.mongo.db + '?' +
(c.mongo.replicaSet ? '&replicaSet=' + c.mongo.replicaSet : '' ) +
if (conf.mongo.connectionString) {
connectionString = conf.mongo.connectionString
} else {
connectionString = 'mongodb://' + authStr + conf.mongo.host + ':' + conf.mongo.port + '/' + conf.mongo.db + '?' +
(conf.mongo.replicaSet ? '&replicaSet=' + conf.mongo.replicaSet : '' ) +
(authMechanism ? '&authMechanism=' + authMechanism : '' )
})()
require('mongodb').MongoClient.connect(u, function (err, client) {
}

require('mongodb').MongoClient.connect(connectionString, function (err, client) {
if (err) {
//zenbot.set('zenbot:db.mongo', null)
console.error('WARNING: MongoDB Connection Error: ', err)
console.error('WARNING: without MongoDB some features (such as backfilling/simulation) may be disabled.')
console.error('Attempted authentication string: ' + u)
return withMongo()
console.error('Attempted authentication string: ' + connectionString)
cb(null, zenbot)
return
}
var db = client.db(c.mongo.db)
var db = client.db(conf.mongo.db)
_.set(zenbot, 'conf.db.mongo', db)
withMongo()
cb(null, zenbot)
})

function getConfiguration() {
var args = minimist(process.argv.slice(3))
var conf = undefined

try {
if(!_.isUndefined(args.conf)){
try {
conf = require(path.resolve(process.cwd(), args.conf))
} catch (ee) {
console.log('Fall back to conf.js, ' + ee)
conf = require('./conf')
}
} else {
conf = require('./conf')
}
}
catch (e) {
console.log('Fall back to sample-conf.js, ' + e)
conf = {}
}

// prevent modifying cached module with a clone
return _.cloneDeep(conf)
}
}
}
58 changes: 33 additions & 25 deletions commands/sim.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var tb = require('timebucket')
, objectifySelector = require('../lib/objectify-selector')
, engineFactory = require('../lib/engine')
, collectionService = require('../lib/services/collection-service')
, _ = require('lodash')

module.exports = function (program, conf) {
program
Expand Down Expand Up @@ -41,16 +42,24 @@ module.exports = function (program, conf) {
.option('--enable_stats', 'enable printing order stats')
.option('--backtester_generation <generation>','creates a json file in simulations with the generation number', Number, -1)
.option('--verbose', 'print status lines on every period')
.option('--silent', 'only output on completion (can speed up sim)')
.action(function (selector, cmd) {
var s = {options: minimist(process.argv)}
var s = { options: minimist(process.argv) }
var so = s.options
delete so._
if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}
Object.keys(conf).forEach(function (k) {
if (typeof cmd[k] !== 'undefined') {
if (!_.isUndefined(cmd[k])) {
so[k] = cmd[k]
}
})
var tradesCollection = collectionService(conf).getTrades()
var eventBus = conf.eventBus

if (so.start) {
so.start = moment(so.start, 'YYYYMMDDhhmm').valueOf()
Expand All @@ -77,12 +86,6 @@ module.exports = function (program, conf) {
so.selector = objectifySelector(selector || conf.selector)
so.mode = 'sim'

if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}
var engine = engineFactory(s, conf)
if (!so.min_periods) so.min_periods = 1
var cursor, reversing, reverse_point
Expand Down Expand Up @@ -197,7 +200,6 @@ module.exports = function (program, conf) {
fs.writeFileSync(out_target, out)
console.log('wrote', out_target)
}

process.exit(0)
}

Expand Down Expand Up @@ -230,34 +232,40 @@ module.exports = function (program, conf) {
if (!opts.query.time) opts.query.time = {}
opts.query.time['$gte'] = query_start
}
tradesCollection.find(opts.query).sort(opts.sort).limit(opts.limit).toArray(function (err, trades) {
if (err) throw err
if (!trades.length) {
var collectionCursor = tradesCollection.find(opts.query).sort(opts.sort).stream()
var numTrades = 0
var lastTrade

collectionCursor.on('data', function(trade){
lastTrade = trade
numTrades++
if (so.symmetrical && reversing) {
trade.orig_time = trade.time
trade.time = reverse_point + (reverse_point - trade.time)
}
eventBus.emit('trade', trade)
})

collectionCursor.on('end', function(){
if(numTrades === 0){
if (so.symmetrical && !reversing) {
reversing = true
reverse_point = cursor
return getNext()
}
engine.exit(exitSim)
}
if (so.symmetrical && reversing) {
trades.forEach(function (trade) {
trade.orig_time = trade.time
trade.time = reverse_point + (reverse_point - trade.time)
})
}
engine.update(trades, function (err) {
if (err) throw err
} else {
if (reversing) {
cursor = trades[trades.length - 1].orig_time
cursor = lastTrade.orig_time
}
else {
cursor = trades[trades.length - 1].time
cursor = lastTrade.time
}
setImmediate(getNext)
})
}
setImmediate(getNext)
})
}

getNext()
})
}
Expand Down
13 changes: 7 additions & 6 deletions commands/trade.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ module.exports = function (program, conf) {
var botStartTime = moment().add(so.run_for, 'm')
}
delete so._
if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}
Object.keys(conf).forEach(function (k) {
if (typeof cmd[k] !== 'undefined') {
so[k] = cmd[k]
Expand All @@ -70,12 +76,7 @@ module.exports = function (program, conf) {
so.debug = cmd.debug
so.stats = !cmd.disable_stats
so.mode = so.paper ? 'paper' : 'live'
if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}

so.selector = objectifySelector(selector || conf.selector)
s.exchange = require(`../extensions/exchanges/${so.selector.exchange_id}/exchange`)(conf)
if (!s.exchange) {
Expand Down
13 changes: 7 additions & 6 deletions commands/train.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ module.exports = function (program, conf) {
var s = {options: minimist(process.argv)}
var so = s.options
delete so._
if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}
Object.keys(conf).forEach(function (k) {
if (typeof cmd[k] !== 'undefined') {
so[k] = cmd[k]
Expand Down Expand Up @@ -117,12 +123,7 @@ module.exports = function (program, conf) {
}
so.selector = objectifySelector(selector || conf.selector)
so.mode = 'train'
if (cmd.conf) {
var overrides = require(path.resolve(process.cwd(), cmd.conf))
Object.keys(overrides).forEach(function (k) {
so[k] = overrides[k]
})
}

var engine = engineFactory(s, conf)

if (!so.min_periods) so.min_periods = 1
Expand Down
3 changes: 0 additions & 3 deletions index.js

This file was deleted.

Loading

0 comments on commit 607a72c

Please sign in to comment.