@@ -23,6 +23,8 @@ import Crypto_ from "node:crypto";
2323import FS_ from "node:fs" ;
2424import Path_ from "node:path" ;
2525import Stream_ from "node:stream" ;
26+ import { StringDecoder } from "node:string_decoder" ;
27+
2628//import * as I from "immutable";
2729
2830
@@ -3803,12 +3805,22 @@ Ait.filter = p => async function* (ix) {
38033805
38043806// strict left-associative fold
38053807
3806- Ait . foldl = f => acc => async function ( ix ) {
3808+ Ait . foldl = f => acc => async ix => {
38073809 for await ( const x of ix ) acc = f ( acc , x ) ;
38083810 return acc ;
38093811} ;
38103812
38113813
3814+ // non-strict left-associative fold
3815+
3816+ Ait . accuml = f => acc => async function * ( ix ) {
3817+ for await ( const x of ix ) {
3818+ acc = f ( acc , x ) ;
3819+ yield ( acc ) ;
3820+ }
3821+ } ;
3822+
3823+
38123824Ait . append = ix => async function * ( iy ) {
38133825 for await ( const x of ix ) yield x ;
38143826 for await ( const y of iy ) yield y ;
@@ -3976,34 +3988,45 @@ Ait.interpolate = ({sep, trailing = false}) => async function* (ix) {
39763988//█████ Stream Files ██████████████████████████████████████████████████████████
39773989
39783990
3979- /* Take a path and read the respective file chunk by chunk from a readable
3980- stream, parse each chunk and yield each parsing result to a downstream process.
3981- Works with promises internally but returns an asynchronous continuation type as
3982- its result. */
3991+ // asynchronously delegate a file stream to a downstream process
3992+
3993+ Ait . streamFile = async function * ( path ) {
3994+ const stream = Node . FS . createReadStream ( path , { encoding : "utf8" } ) ;
3995+
3996+ try {
3997+ yield * stream ;
3998+ }
3999+
4000+ catch ( e ) {
4001+ stream . destroy ( e ) ;
4002+ throw e ;
4003+ }
4004+ } ;
4005+
4006+
4007+ /* Asynchronously delegate a virtual file stream that is composed of multiple
4008+ individual file streams to a downstream process in a sequential manner. */
39834009
3984- Ait . streamFile = ( { chunk : { read, parse} , rootPath = "" } ) => path => {
3985- async function * go ( ) {
3986- const stream = fs . createReadStream ( rootPath + path , { encoding : "utf8" } ) ,
3987- ix = read ( Ait . from ( stream ) ) ;
4010+ Ait . streamFiles = async function * ( paths ) {
4011+ for ( const path of paths ) {
4012+ const stream = Node . FS . createReadStream ( path , { encoding : "utf8" } ) ;
39884013
3989- try {
3990- for await ( const chunk of ix ) yield parse ( chunk ) ;
4014+ try {
4015+ yield * stream ;
39914016 }
39924017
39934018 catch ( e ) {
3994- stream . destroy ( )
4019+ stream . destroy ( e ) ;
39954020 throw e ;
39964021 }
3997- } ;
3998-
3999- return Cont . fromPromise ( go ( ) ) ;
4022+ }
40004023} ;
40014024
40024025
4003- /* Take a writable stream and an async iterator and write the yielded chunks to
4004- a file using the stream. The function is meant to be used with `Ait.streamFile` . */
4026+ /* Take a writable stream and an async iterator and write the yielded chunks of
4027+ the latter to a file specified by the stream . */
40054028
4006- Ait . writeFile = stream => async ix => {
4029+ Ait . writeStream = stream => async ix => {
40074030 try {
40084031 for await ( const chunk of ix ) {
40094032
@@ -4023,17 +4046,6 @@ Ait.writeFile = stream => async ix => {
40234046} ;
40244047
40254048
4026- /* Merge individual files to a single virtual file by asynchronously yielding
4027- each file stream one by one. */
4028-
4029- Ait . mergeFiles = async function * ( paths ) {
4030- for ( const path of paths ) {
4031- const stream = createReadStream ( path ) ;
4032- yield * stream ;
4033- }
4034- } ;
4035-
4036-
40374049/*█████████████████████████████████████████████████████████████████████████████
40384050███████████████████████████████████████████████████████████████████████████████
40394051███████████████████████████ ITERATOR :: IDEMPOTENT ████████████████████████████
0 commit comments