/* Aerostat Beam Coder - Node.js native bindings to FFmpeg Copyright (C) 2019 Streampunk Media Ltd. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . https://www.streampunk.media/ mailto:[email protected] 14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K. */ const beamcoder = require('bindings')('beamcoder'); const { Writable, Readable, Transform } = require('stream'); const doTimings = false; const timings = []; function frameDicer(encoder, isAudio) { let sampleBytes = 4; // Assume floating point 4 byte samples for now... const numChannels = encoder.channels; const dstNumSamples = encoder.frame_size; let dstFrmBytes = dstNumSamples * sampleBytes; const doDice = false === beamcoder.encoders()[encoder.name].capabilities.VARIABLE_FRAME_SIZE; let lastFrm = null; let lastBuf = []; const nullBuf = []; for (let b = 0; b < numChannels; ++b) nullBuf.push(Buffer.alloc(0)); const addFrame = srcFrm => { let result = []; let dstFrm; let curStart = 0; if (!lastFrm) { lastFrm = beamcoder.frame(srcFrm.toJSON()); lastBuf = nullBuf; dstFrmBytes = dstNumSamples * sampleBytes; } if (lastBuf[0].length > 0) dstFrm = beamcoder.frame(lastFrm.toJSON()); else dstFrm = beamcoder.frame(srcFrm.toJSON()); dstFrm.nb_samples = dstNumSamples; dstFrm.pkt_duration = dstNumSamples; while (curStart + dstFrmBytes - lastBuf[0].length <= srcFrm.nb_samples * sampleBytes) { const resFrm = beamcoder.frame(dstFrm.toJSON()); resFrm.data = lastBuf.map((d, i) => Buffer.concat([ d, srcFrm.data[i].slice(curStart, curStart + dstFrmBytes - d.length)], dstFrmBytes)); result.push(resFrm); dstFrm.pts += dstNumSamples; dstFrm.pkt_dts += dstNumSamples; curStart += dstFrmBytes - lastBuf[0].length; lastFrm.pts = 0; lastFrm.pkt_dts = 0; lastBuf = nullBuf; } lastFrm.pts = dstFrm.pts; lastFrm.pkt_dts = dstFrm.pkt_dts; lastBuf = srcFrm.data.map(d => d.slice(curStart, srcFrm.nb_samples * sampleBytes)); return result; }; const getLast = () => { let result = []; if (lastBuf[0].length > 0) { const resFrm = beamcoder.frame(lastFrm.toJSON()); resFrm.data = lastBuf.map(d => d.slice(0)); resFrm.nb_samples = lastBuf[0].length / sampleBytes; resFrm.pkt_duration = resFrm.nb_samples; lastFrm.pts = 0; lastBuf = nullBuf; result.push(resFrm); } return result; }; this.dice = (frames, flush = false) => { if (isAudio && doDice) { let result = frames.reduce((muxFrms, frm) => { addFrame(frm).forEach(f => muxFrms.push(f)); return muxFrms; }, []); if (flush) getLast().forEach(f => result.push(f)); return result; } return frames; }; } function serialBalancer(numStreams) { let pending = []; // initialise with negative ts and no pkt // - there should be no output until each stream has sent its first packet for (let s = 0; s < numStreams; ++s) pending.push({ ts: -Number.MAX_VALUE, streamIndex: s }); const adjustTS = (pkt, srcTB, dstTB) => { const adj = (srcTB[0] * dstTB[1]) / (srcTB[1] * dstTB[0]); pkt.pts = Math.round(pkt.pts * adj); pkt.dts = Math.round(pkt.dts * adj); pkt.duration > 0 ? Math.round(pkt.duration * adj) : Math.round(adj); }; const pullPkts = (pkt, streamIndex, ts) => { return new Promise(resolve => { Object.assign(pending[streamIndex], { pkt: pkt, ts: ts, resolve: resolve }); const minTS = pending.reduce((acc, pend) => Math.min(acc, pend.ts), Number.MAX_VALUE); // console.log(streamIndex, pending.map(p => p.ts), minTS); const nextPend = pending.find(pend => pend.pkt && (pend.ts === minTS)); if (nextPend) nextPend.resolve(nextPend.pkt); if (!pkt) resolve(); }); }; this.writePkts = (packets, srcStream, dstStream, writeFn, final = false) => { if (packets && packets.packets.length) { return packets.packets.reduce(async (promise, pkt) => { await promise; pkt.stream_index = dstStream.index; adjustTS(pkt, srcStream.time_base, dstStream.time_base); const pktTS = pkt.pts * dstStream.time_base[0] / dstStream.time_base[1]; return writeFn(await pullPkts(pkt, dstStream.index, pktTS)); }, Promise.resolve()); } else if (final) return pullPkts(null, dstStream.index, Number.MAX_VALUE); }; } function parallelBalancer(params, streamType, numStreams) { let resolveGet = null; const tag = 'video' === streamType ? 'v' : 'a'; const pending = []; // initialise with negative ts and no pkt // - there should be no output until each stream has sent its first packet for (let s = 0; s < numStreams; ++s) pending.push({ ts: -Number.MAX_VALUE, streamIndex: s }); const makeSet = resolve => { if (resolve) { // console.log('makeSet', pending.map(p => p.ts)); const nextPends = pending.every(pend => pend.pkt) ? pending : null; const final = pending.filter(pend => true === pend.final); if (nextPends) { nextPends.forEach(pend => pend.resolve()); resolve({ value: nextPends.map(pend => { return { name: `in${pend.streamIndex}:${tag}`, frames: [ pend.pkt ] }; }), done: false }); resolveGet = null; pending.forEach(pend => Object.assign(pend, { pkt: null, ts: Number.MAX_VALUE })); } else if (final.length > 0) { final.forEach(f => f.resolve()); resolve({ done: true }); } else { resolveGet = resolve; } } }; const pushPkt = async (pkt, streamIndex, ts) => new Promise(resolve => { Object.assign(pending[streamIndex], { pkt: pkt, ts: ts, final: pkt ? false : true, resolve: resolve }); makeSet(resolveGet); }); const pullSet = async () => new Promise(resolve => makeSet(resolve)); const readStream = new Readable({ objectMode: true, highWaterMark: params.highWaterMark ? params.highWaterMark || 4 : 4, read() { (async () => { const start = process.hrtime(); const reqTime = start[0] * 1e3 + start[1] / 1e6; const result = await pullSet(); if (result.done) this.push(null); else { result.value.timings = result.value[0].frames[0].timings; result.value.timings[params.name] = { reqTime: reqTime, elapsed: process.hrtime(start)[1] / 1000000 }; this.push(result.value); } })(); }, }); readStream.pushPkts = (packets, stream, streamIndex, final = false) => { if (packets && packets.frames.length) { return packets.frames.reduce(async (promise, pkt) => { await promise; const ts = pkt.pts * stream.time_base[0] / stream.time_base[1]; pkt.timings = packets.timings; return pushPkt(pkt, streamIndex, ts); }, Promise.resolve()); } else if (final) { return pushPkt(null, streamIndex, Number.MAX_VALUE); } }; return readStream; } function teeBalancer(params, numStreams) { let resolvePush = null; const pending = []; for (let s = 0; s < numStreams; ++s) pending.push({ frames: null, resolve: null, final: false }); const pullFrame = async index => { return new Promise(resolve => { if (pending[index].frames) { resolve({ value: pending[index].frames, done: false }); Object.assign(pending[index], { frames: null, resolve: null }); } else if (pending[index].final) resolve({ done: true }); else pending[index].resolve = resolve; if (resolvePush && pending.every(p => null === p.frames)) { resolvePush(); resolvePush = null; } }); }; const readStreams = []; for (let s = 0; s < numStreams; ++s) readStreams.push(new Readable({ objectMode: true, highWaterMark: params.highWaterMark ? params.highWaterMark || 4 : 4, read() { (async () => { const start = process.hrtime(); const reqTime = start[0] * 1e3 + start[1] / 1e6; const result = await pullFrame(s); if (result.done) this.push(null); else { result.value.timings[params.name] = { reqTime: reqTime, elapsed: process.hrtime(start)[1] / 1000000 }; this.push(result.value); } })(); }, })); readStreams.pushFrames = frames => { return new Promise(resolve => { pending.forEach((p, index) => { if (frames.length) p.frames = frames[index].frames; else p.final = true; }); pending.forEach(p => { if (p.resolve) { if (p.frames) { p.frames.timings = frames.timings; p.resolve({ value: p.frames, done: false }); } else if (p.final) p.resolve({ done: true }); } Object.assign(p, { frames: null, resolve: null }); }); resolvePush = resolve; }); }; return readStreams; } function transformStream(params, processFn, flushFn, reject) { return new Transform({ objectMode: true, highWaterMark: params.highWaterMark ? params.highWaterMark || 4 : 4, transform(val, encoding, cb) { (async () => { const start = process.hrtime(); const reqTime = start[0] * 1e3 + start[1] / 1e6; const result = await processFn(val); result.timings = val.timings; if (result.timings) result.timings[params.name] = { reqTime: reqTime, elapsed: process.hrtime(start)[1] / 1000000 }; cb(null, result); })().catch(cb); }, flush(cb) { (async () => { const result = flushFn ? await flushFn() : null; if (result) result.timings = {}; cb(null, result); })().catch(cb); } }).on('error', err => reject(err)); } const calcStats = (arr, elem, prop) => { const mean = arr.reduce((acc, cur) => cur[elem] ? acc + cur[elem][prop] : acc, 0) / arr.length; const stdDev = Math.pow(arr.reduce((acc, cur) => cur[elem] ? acc + Math.pow(cur[elem][prop] - mean, 2) : acc, 0) / arr.length, 0.5); const max = arr.reduce((acc, cur) => cur[elem] ? Math.max(cur[elem][prop], acc) : acc, 0); const min = arr.reduce((acc, cur) => cur[elem] ? Math.min(cur[elem][prop], acc) : acc, Number.MAX_VALUE); return { mean: mean, stdDev: stdDev, max: max, min: min }; }; function writeStream(params, processFn, finalFn, reject) { return new Writable({ objectMode: true, highWaterMark: params.highWaterMark ? params.highWaterMark || 4 : 4, write(val, encoding, cb) { (async () => { const start = process.hrtime(); const reqTime = start[0] * 1e3 + start[1] / 1e6; const result = await processFn(val); if ('mux' === params.name) { const pktTimings = val.timings; pktTimings[params.name] = { reqTime: reqTime, elapsed: process.hrtime(start)[1] / 1000000 }; if (doTimings) timings.push(pktTimings); } cb(null, result); })().catch(cb); }, final(cb) { (async () => { const result = finalFn ? await finalFn() : null; if (doTimings && ('mux' === params.name)) { const elapsedStats = {}; Object.keys(timings[0]).forEach(k => elapsedStats[k] = calcStats(timings.slice(10, -10), k, 'elapsed')); console.log('elapsed:'); console.table(elapsedStats); const absArr = timings.map(t => { const absDelays = {}; const keys = Object.keys(t); keys.forEach((k, i) => absDelays[k] = { reqDelta: i > 0 ? t[k].reqTime - t[keys[i-1]].reqTime : 0 }); return absDelays; }); const absStats = {}; Object.keys(absArr[0]).forEach(k => absStats[k] = calcStats(absArr.slice(10, -10), k, 'reqDelta')); console.log('request time delta:'); console.table(absStats); const totalsArr = timings.map(t => { const total = (t.mux && t.read) ? t.mux.reqTime - t.read.reqTime + t.mux.elapsed : 0; return { total: { total: total }}; }); console.log('total time:'); console.table(calcStats(totalsArr.slice(10, -10), 'total', 'total')); } cb(null, result); })().catch(cb); } }).on('error', err => reject(err)); } function readStream(params, demuxer, ms, index) { const time_base = demuxer.streams[index].time_base; const end_pts = ms ? ms.end * time_base[1] / time_base[0] : Number.MAX_SAFE_INTEGER; async function getPacket() { let packet = null; do { packet = await demuxer.read(); } while (packet && packet.stream_index !== index); return packet; } return new Readable({ objectMode: true, highWaterMark: params.highWaterMark ? params.highWaterMark || 4 : 4, read() { (async () => { const start = process.hrtime(); const reqTime = start[0] * 1e3 + start[1] / 1e6; const packet = await getPacket(); if (packet && (packet.pts < end_pts)) { packet.timings = {}; packet.timings.read = { reqTime: reqTime, elapsed: process.hrtime(start)[1] / 1000000 }; this.push(packet); } else this.push(null); })(); } }); } function createBeamWritableStream(params, governor) { const beamStream = new Writable({ highWaterMark: params.highwaterMark || 16384, write: (chunk, encoding, cb) => { (async () => { await governor.write(chunk); cb(); })(); } }); return beamStream; } function demuxerStream(params) { const governor = new beamcoder.governor({}); const stream = createBeamWritableStream(params, governor); stream.on('finish', () => governor.finish()); stream.on('error', console.error); stream.demuxer = options => { options.governor = governor; // delay initialisation of demuxer until stream has been written to - avoids lock-up return new Promise(resolve => setTimeout(async () => resolve(await beamcoder.demuxer(options)), 20)); }; return stream; } function createBeamReadableStream(params, governor) { const beamStream = new Readable({ highWaterMark: params.highwaterMark || 16384, read: size => { (async () => { const chunk = await governor.read(size); if (0 === chunk.length) beamStream.push(null); else beamStream.push(chunk); })(); } }); return beamStream; } function muxerStream(params) { const governor = new beamcoder.governor({ highWaterMark: 1 }); const stream = createBeamReadableStream(params, governor); stream.on('end', () => governor.finish()); stream.on('error', console.error); stream.muxer = options => { options.governor = governor; return beamcoder.muxer(options); }; return stream; } async function makeSources(params) { if (!params.video) params.video = []; if (!params.audio) params.audio = []; params.video.forEach(p => p.sources.forEach(src => { if (src.input_stream) { const demuxerStream = beamcoder.demuxerStream({ highwaterMark: 1024 }); src.input_stream.pipe(demuxerStream); src.format = demuxerStream.demuxer({ iformat: src.iformat, options: src.options }); } else src.format = beamcoder.demuxer({ url: src.url, iformat: src.iformat, options: src.options }); })); params.audio.forEach(p => p.sources.forEach(src => { if (src.input_stream) { const demuxerStream = beamcoder.demuxerStream({ highwaterMark: 1024 }); src.input_stream.pipe(demuxerStream); src.format = demuxerStream.demuxer({ iformat: src.iformat, options: src.options }); } else src.format = beamcoder.demuxer({ url: src.url, iformat: src.iformat, options: src.options }); })); await params.video.reduce(async (promise, p) => { await promise; return p.sources.reduce(async (promise, src) => { await promise; src.format = await src.format; if (src.ms && !src.input_stream) src.format.seek({ time: src.ms.start }); return src.format; }, Promise.resolve()); }, Promise.resolve()); await params.audio.reduce(async (promise, p) => { await promise; return p.sources.reduce(async (promise, src) => { await promise; src.format = await src.format; if (src.ms && !src.input_stream) src.format.seek({ time: src.ms.start }); return src.format; }, Promise.resolve()); }, Promise.resolve()); params.video.forEach(p => p.sources.forEach(src => src.stream = readStream({ highWaterMark : 1 }, src.format, src.ms, src.streamIndex))); params.audio.forEach(p => p.sources.forEach(src => src.stream = readStream({ highWaterMark : 1 }, src.format, src.ms, src.streamIndex))); } function runStreams(streamType, sources, filterer, streams, mux, muxBalancer) { return new Promise((resolve, reject) => { if (!sources.length) return resolve(); const timeBaseStream = sources[0].format.streams[sources[0].streamIndex]; const filterBalancer = parallelBalancer({ name: 'filterBalance', highWaterMark : 1 }, streamType, sources.length); sources.forEach((src, srcIndex) => { const decStream = transformStream({ name: 'decode', highWaterMark : 1 }, pkts => src.decoder.decode(pkts), () => src.decoder.flush(), reject); const filterSource = writeStream({ name: 'filterSource', highWaterMark : 1 }, pkts => filterBalancer.pushPkts(pkts, src.format.streams[src.streamIndex], srcIndex), () => filterBalancer.pushPkts(null, src.format.streams[src.streamIndex], srcIndex, true), reject); src.stream.pipe(decStream).pipe(filterSource); }); const streamTee = teeBalancer({ name: 'streamTee', highWaterMark : 1 }, streams.length); const filtStream = transformStream({ name: 'filter', highWaterMark : 1 }, frms => { if (filterer.cb) filterer.cb(frms[0].frames[0].pts); return filterer.filter(frms); }, () => {}, reject); const streamSource = writeStream({ name: 'streamSource', highWaterMark : 1 }, frms => streamTee.pushFrames(frms), () => streamTee.pushFrames([], true), reject); filterBalancer.pipe(filtStream).pipe(streamSource); streams.forEach((str, i) => { const dicer = new frameDicer(str.encoder, 'audio' === streamType); const diceStream = transformStream({ name: 'dice', highWaterMark : 1 }, frms => dicer.dice(frms), () => dicer.dice([], true), reject); const encStream = transformStream({ name: 'encode', highWaterMark : 1 }, frms => str.encoder.encode(frms), () => str.encoder.flush(), reject); const muxStream = writeStream({ name: 'mux', highWaterMark : 1 }, pkts => muxBalancer.writePkts(pkts, timeBaseStream, str.stream, pkts => mux.writeFrame(pkts)), () => muxBalancer.writePkts(null, timeBaseStream, str.stream, pkts => mux.writeFrame(pkts), true), reject); muxStream.on('finish', resolve); streamTee[i].pipe(diceStream).pipe(encStream).pipe(muxStream); }); }); } async function makeStreams(params) { params.video.forEach(p => { p.sources.forEach(src => src.decoder = beamcoder.decoder({ demuxer: src.format, stream_index: src.streamIndex })); }); params.audio.forEach(p => { p.sources.forEach(src => src.decoder = beamcoder.decoder({ demuxer: src.format, stream_index: src.streamIndex })); }); params.video.forEach(p => { p.filter = beamcoder.filterer({ filterType: 'video', inputParams: p.sources.map((src, i) => { const stream = src.format.streams[src.streamIndex]; return { name: `in${i}:v`, width: stream.codecpar.width, height: stream.codecpar.height, pixelFormat: stream.codecpar.format, timeBase: stream.time_base, pixelAspect: stream.sample_aspect_ratio }; }), outputParams: p.streams.map((str, i) => { return { name: `out${i}:v`, pixelFormat: str.codecpar.format }; }), filterSpec: p.filterSpec }); }); const vidFilts = await Promise.all(params.video.map(p => p.filter)); params.video.forEach((p, i) => p.filter = vidFilts[i]); // params.video.forEach(p => console.log(p.filter.graph.dump())); params.audio.forEach(p => { p.filter = beamcoder.filterer({ filterType: 'audio', inputParams: p.sources.map((src, i) => { const stream = src.format.streams[src.streamIndex]; return { name: `in${i}:a`, sampleRate: src.decoder.sample_rate, sampleFormat: src.decoder.sample_fmt, channelLayout: src.decoder.channel_layout, timeBase: stream.time_base }; }), outputParams: p.streams.map((str, i) => { return { name: `out${i}:a`, sampleRate: str.codecpar.sample_rate, sampleFormat: str.codecpar.format, channelLayout: str.codecpar.channel_layout }; }), filterSpec: p.filterSpec }); }); const audFilts = await Promise.all(params.audio.map(p => p.filter)); params.audio.forEach((p, i) => p.filter = audFilts[i]); // params.audio.forEach(p => console.log(p.filter.graph.dump())); let mux; if (params.out.output_stream) { let muxerStream = beamcoder.muxerStream({ highwaterMark: 1024 }); muxerStream.pipe(params.out.output_stream); mux = muxerStream.muxer({ format_name: params.out.formatName }); } else mux = beamcoder.muxer({ format_name: params.out.formatName }); params.video.forEach(p => { p.streams.forEach((str, i) => { const encParams = p.filter.graph.filters.find(f => f.name === `out${i}:v`).inputs[0]; str.encoder = beamcoder.encoder({ name: str.name, width: encParams.w, height: encParams.h, pix_fmt: encParams.format, sample_aspect_ratio: encParams.sample_aspect_ratio, time_base: encParams.time_base, // framerate: [encParams.time_base[1], encParams.time_base[0]], // bit_rate: 2000000, // gop_size: 10, // max_b_frames: 1, // priv_data: { preset: 'slow' } priv_data: { crf: 23 } }); // ... more required ... }); }); params.audio.forEach(p => { p.streams.forEach((str, i) => { const encParams = p.filter.graph.filters.find(f => f.name === `out${i}:a`).inputs[0]; str.encoder = beamcoder.encoder({ name: str.name, sample_fmt: encParams.format, sample_rate: encParams.sample_rate, channel_layout: encParams.channel_layout, flags: { GLOBAL_HEADER: mux.oformat.flags.GLOBALHEADER } }); str.codecpar.frame_size = str.encoder.frame_size; }); }); params.video.forEach(p => { p.streams.forEach(str => { str.stream = mux.newStream({ name: str.name, time_base: str.time_base, interleaved: true }); // Set to false for manual interleaving, true for automatic Object.assign(str.stream.codecpar, str.codecpar); }); }); params.audio.forEach(p => { p.streams.forEach(str => { str.stream = mux.newStream({ name: str.name, time_base: str.time_base, interleaved: true }); // Set to false for manual interleaving, true for automatic Object.assign(str.stream.codecpar, str.codecpar); }); }); return { run: async () => { await mux.openIO({ url: params.out.url ? params.out.url : '', flags: params.out.flags ? params.out.flags : {} }); await mux.writeHeader({ options: params.out.options ? params.out.options : {} }); const muxBalancer = new serialBalancer(mux.streams.length); const muxStreamPromises = []; params.video.forEach(p => muxStreamPromises.push(runStreams('video', p.sources, p.filter, p.streams, mux, muxBalancer))); params.audio.forEach(p => muxStreamPromises.push(runStreams('audio', p.sources, p.filter, p.streams, mux, muxBalancer))); await Promise.all(muxStreamPromises); await mux.writeTrailer(); } }; } module.exports = { demuxerStream, muxerStream, makeSources, makeStreams };