Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pg queryNodeStream ignoring batchSize and highWaterMark #292

Open
droganov opened this issue May 24, 2023 · 2 comments
Open

Pg queryNodeStream ignoring batchSize and highWaterMark #292

droganov opened this issue May 24, 2023 · 2 comments

Comments

@droganov
Copy link

droganov commented May 24, 2023

In:

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of stream)....

I expect chunk to be an array of 10 items, but instead it is just one item as a plain object instead of array

@ForbesLindesay
Copy link
Owner

Yes, this is the intended behaviour. batchSize and highWaterMark control the behaviour in the underlying db queries, so tuning the batch size and high water mark can improve throughput/performance, but the "chunks" returned in the stream are still individual records. If you need to batch them, you can write a separate utility that pulls a set number of items from the stream and emits that array as a chunk. Something like:

async function* batchStream<T>(source: AsyncIterator<T>, batchSize: number): AsyncIterator<T[]> {
  let batch = []
  for await (const record of source) {
    batch.push(record)
    if (batch.length === batchSize) {
      yield batch
      batch = []
    }
  }
  if (batch.length) yield batch
}

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of batchStream(stream, 10))....

P.S. the batchSize is how many records to fetch at a time, the highWaterMark is how many records to allow the stream to buffer before applying back pressure (if you are consuming items slower than they are being returned form the database). 16 * 1024 would be a very large number of records to have as your high water mark if your batch size is only 10.

@droganov
Copy link
Author

droganov commented Jul 5, 2023

Hello @ForbesLindesay
Thank you for the update, yes, that's how I solved it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants