Skip to content

Commit b310767

Browse files
revise async iterators
1 parent aa062d4 commit b310767

File tree

1 file changed

+73
-57
lines changed

1 file changed

+73
-57
lines changed

scriptum.js

Lines changed: 73 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3841,18 +3841,20 @@ Ait.empty = async function* () {} ();
38413841
Allows processing of vast amounts of data that don't fit in memory. */
38423842

38433843
Ait.chunk = ({sep, threshold = 65536, skipRest = false}) => ix => {
3844+
const decoder = new StringDecoder("utf8");
38443845
let buf = "";
38453846

38463847
return async function* () {
38473848
for await (const x of ix) {
3848-
const s = buf + x, chunks = s.split(sep);
3849+
const s = buf + decoder.write(x), chunks = s.split(sep);
38493850

38503851
buf = chunks.pop() || "";
38513852
if (buf.length > threshold) throw new Err("buffer overflow");
38523853
for (const chunk of chunks) yield chunk;
38533854
}
38543855

3855-
if (buf.length > 0 && !skipRest) yield buf;
3856+
const rest = buf + decoder.end();
3857+
if (rest.length > 0 && !skipRest) yield rest;
38563858
} ();
38573859
};
38583860

@@ -3971,6 +3973,67 @@ Ait.interpolate = ({sep, trailing = false}) => async function* (ix) {
39713973
};
39723974

39733975

3976+
//█████ Chunk-Wise Traversal ██████████████████████████████████████████████████
3977+
3978+
3979+
/* Take a path and read the respective file chunk by chunk from a readable
3980+
stream, parse each chunk and yield each parsing result to a downstream process.
3981+
Works with promises internally but returns an asynchronous continuation type as
3982+
its result. */
3983+
3984+
Ait.streamFile = ({chunk: {read, parse}, rootPath = ""}) => path => {
3985+
async function* go() {
3986+
const stream = fs.createReadStream(rootPath + path, {encoding: "utf8"}),
3987+
ix = read(Ait.from(stream));
3988+
3989+
try {
3990+
for await (const chunk of ix) yield parse(chunk);
3991+
}
3992+
3993+
catch (e) {
3994+
stream.destroy()
3995+
throw e;
3996+
}
3997+
};
3998+
3999+
return Cont.fromPromise(go());
4000+
};
4001+
4002+
4003+
/* Take a writable stream and an async iterator and write the yielded chunks to
4004+
a file using a the stream. Function is meant to be used with `Ait.streamFile`. */
4005+
4006+
Ait.writeFile = stream => async ix => {
4007+
try {
4008+
for await (const chunk of ix) {
4009+
4010+
// backpressure
4011+
4012+
if (!stream.write(chunk))
4013+
await new Promise(resolve => stream.once("drain", resolve));
4014+
}
4015+
4016+
stream.end();
4017+
}
4018+
4019+
catch (e) {
4020+
stream.destroy(e);
4021+
throw e;
4022+
}
4023+
};
4024+
4025+
4026+
/* Merge individual files to a single virtual file by asynchronously yielding
4027+
each file stream one by one. */
4028+
4029+
Ait.mergeFiles = async function* (paths) {
4030+
for (const path of paths) {
4031+
const stream = createReadStream(path);
4032+
yield* stream;
4033+
}
4034+
};
4035+
4036+
39744037
/*█████████████████████████████████████████████████████████████████████████████
39754038
███████████████████████████████████████████████████████████████████████████████
39764039
███████████████████████████ ITERATOR :: IDEMPOTENT ████████████████████████████
@@ -6010,6 +6073,13 @@ Parser.bic = ({_throw = false}) => s => {
60106073
};
60116074

60126075

6076+
/* TODO:
6077+
* email
6078+
* phone (cellular, landline)
6079+
* po-box
6080+
* street
6081+
6082+
60136083
//█████ Number ████████████████████████████████████████████████████████████████
60146084
60156085
@@ -11806,60 +11876,6 @@ S.Word.parseAbbr = ({locale, abbrs, trigrams, context}) => abbr => {
1180611876
};
1180711877

1180811878

11809-
/* Parse part of speech. A part of speech category is assigned by the degree
11810-
of conformity of the word's trigrams with the distribution of trigrams of a
11811-
category. Another factor is the order of word types in the context of the given
11812-
word, provided their part of speech is already known. */
11813-
11814-
S.Word.parsePos = trigramsPerPos => word => {
11815-
const queryTrigram = S.trigram(word);
11816-
11817-
const score = {
11818-
noun: 0,
11819-
verb: 0,
11820-
adj: 0,
11821-
};
11822-
11823-
for (const [pos, trigrams] of O.entries(trigramsPerPos)) {
11824-
for (const triple of queryTrigram) {
11825-
if (!(pos in score)) throw new Err(`unexpcted pos "${pos}"`);
11826-
else if (trigrams.has(triple)) score[pos] += trigrams.get(triple);
11827-
}
11828-
}
11829-
11830-
// normalization
11831-
11832-
score.noun *= trigramsPerPos.noun.size;
11833-
score.verb *= trigramsPerPos.verb.size;
11834-
score.adj *= trigramsPerPos.adj.size;
11835-
11836-
const pairs = Object.entries(score)
11837-
.sort((pair, pair2) => pair2[1] - pair[1]);
11838-
11839-
const result = [];
11840-
11841-
for (let i = 0; i < pairs.length; i++) {
11842-
if (i === pairs.length - 1) result.push(Parser.Maybe({
11843-
value: pairs[i] [0],
11844-
kind: "pos",
11845-
confidence: pairs[i] [1],
11846-
delta: 0,
11847-
}));
11848-
11849-
else result.push(Parser.Maybe({
11850-
value: pairs[i] [0],
11851-
kind: "pos",
11852-
confidence: pairs[i] [1],
11853-
delta: pairs[i] [1] - pairs[i + 1] [1],
11854-
}));
11855-
}
11856-
11857-
// TODO: incorporate context (pos order)
11858-
11859-
return result;
11860-
};
11861-
11862-
1186311879
/* Split compound nouns, verbs, adjectives, and numerals unsing a corpus of
1186411880
well-known words. */
1186511881

@@ -13355,7 +13371,7 @@ Node.SQL = {};
1335513371
//█████ Types █████████████████████████████████████████████████████████████████
1335613372

1335713373

13358-
Node.SQL.sqlQuery = ({query, meta = null}) => ({
13374+
Node.SQL.sqlQuery = ({query, meta = {}}) => ({
1335913375
[$]: "SqlQuery",
1336013376
[$$]: "SqlQuery",
1336113377
query,

0 commit comments

Comments
 (0)