-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathreplicator.js
More file actions
112 lines (89 loc) · 2.33 KB
/
replicator.js
File metadata and controls
112 lines (89 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
var Stream = require('stream');
var net = require('net');
var stdin = process.openStdin();
var broadCastStream = new Stream();
broadCastStream.writable = true;
var peers = {};
var events = {};
var dataStore = {};
var myPort = process.argv[2];
broadCastStream.write = function (data) {
var result = JSON.parse(data.toString());
if (!events[result.id]) {
events[result.id] = data;
dataStore[result.key] = result.value;
for (p in peers) {
if (result.source === p) {
continue;
}
peers[p].write(data);
}
}
};
var commands = {
'data': function () {
console.log(dataStore);
},
'me': function () {
console.log(myPort);
},
'events': function () {
console.log(Object.keys(events));
},
'peers': function () {
console.log(Object.keys(peers));
},
'save': function (data) {
broadCastStream.write(new Buffer(JSON.stringify(data)));
}
}
stdin.on('data', function (data) {
var input = data.toString().trim();
var matches = input.match(/(\w+)=(.*)/);
if (matches) {
commands.save({ source: myPort, id: new Date().getTime(), key: matches[1], value: matches[2] });
}
else if (commands[input]) {
commands[input]();
}
});
var connect = function (others) {
others.forEach(function (o) {
if (peers[o] || o === myPort) {
return;
}
var socket = peers[o] = net.connect(o);
socket.once('data', function (data) {
var result = JSON.parse(data);
socket.write(JSON.stringify({ port: myPort, peers: Object.keys(peers) }));
dataStore = result.data;
connect(result.peers);
socket.pipe(broadCastStream, {end: false});
});
function cleanup () {
delete peers[o];
socket.removeListener('error', cleanup);
socket.removeListener('end', cleanup);
}
socket.on('error', cleanup);
socket.on('end', cleanup);
});
};
var server = net.createServer(function (socket) {
var port;
socket.write(JSON.stringify({ data: dataStore, peers: Object.keys(peers) }));
socket.once('data', function (data) {
var result = JSON.parse(data);
port = result.port;
peers[port] = socket;
connect(result.peers);
socket.pipe(broadCastStream, {end: false});
});
socket.on('end', function () {
if (port) {
delete peers[port];
}
});
});
connect(process.argv.slice(3));
server.listen(myPort);