Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy-from performance with lots of chunks #127

Open
byrney opened this issue Dec 13, 2021 · 15 comments
Open

copy-from performance with lots of chunks #127

byrney opened this issue Dec 13, 2021 · 15 comments

Comments

@byrney
Copy link

byrney commented Dec 13, 2021

We have a node process that generates a lot of data1 and we need to insert it into postgres. Doing an insert per item was terrible (no suprise there), creating big inserts with many rows is better but still very bad. copy-from is better still but not as much of an improvement as I thought it would be. None of them come close to piping the data into a psql child process.

I created a bench to show the problem here: byrney@1086f50#diff-1b32c28cb38c05480eccc1bd60ff97029b57a05c96718b96dad7e9d84894f549

Like our process, this calls stream.write(...) once per row and then pipes that to copy-from. Our real process sends much wider rows, but we see the same effect.

As a workaround we can pipe through a stream which batches up the chunks before sending them to copy-from. Making a very small number of calls to copy-from.write (at the cost of memory) helps a lot with speed.

A potential improvement to copy-from is in that branch too: https://github.com/byrney/node-pg-copy-streams/tree/private/copy-from-performance. It moves the batching into the copy-from itself.

It needs a bit of a tidy-up2 but if you think the approach is sound and you're open to a PR, I can do that and submit it.

Footnotes

  1. This is a gross simplification but that's the gist of it

  2. There is still some debugging logic in there to count the number of calls

@jeromew
Copy link
Collaborator

jeromew commented Dec 13, 2021

Hello thanks for the report. I need to spend some time on this.

Did you try to optimize the calls through the _writev mechanism ? From my vague understanding, this is a use case where _writev would be useful to batch several raws and was why I added the _writev implementation in the first place.

I never used it myself, but according to https://stackoverflow.com/questions/41333617/correct-usage-of-writev-in-node-js, you would need to cork the stream, write your rows, and then uncork it.

@jeromew
Copy link
Collaborator

jeromew commented Dec 13, 2021

@byrney
Copy link
Author

byrney commented Dec 13, 2021

I need to spend some time on this.

I appreciate it's not a simple change, thanks for looking.

Did you try to optimize the calls through the _writev mechanism ? From my vague understanding, this is a use case where _writev would be useful to batch several raws and was why I added the _writev implementation in the first place.

I looked at when _writev was being called (using the code on master) and tried various values of highWaterMark. It only
gets called once when the stream gets uncorked in handleCopyResponse and thereafter individual calls to
write happen. I found the elastic search blog you reference and I've used writev before, but in
this case it only gets called once following the uncork.

That said, copy-from's writev still sends each chunk to the socket (connection.stream) one at a time. It's useful,
but sending down the socket is the bottleneck so it isn't having a big impact on speed. The batching logic
(in the private branch) combines all the buffered chunks into a single Buffer, so only one socket.write happens.
After adding that, writev became redundant (all writes are batched up anyway) so I removed it rather than
try to make it respect backpressure.

I never used it myself, but according to https://stackoverflow.com/questions/41333617/correct-usage-of-writev-in-node-js, you would need to cork the stream, write your rows, and then uncork it.

That's hard to make work from the application code when piping.
With a source.pipe(copyFromStream) the pipe calls uncork on the destination stream and there's no way to get inside the pipe to cork/uncork when you decide you want to send some data (except at the every end).

There's does not seem to be a way to do it inside copy-fromeither. Once corked you don't get any write or writev calls. So, there's no place to uncork when you want to start sending except during _final.

@byrney
Copy link
Author

byrney commented Dec 13, 2021

Had another look at the blog post.

The reason writev gets called in that elastic search stream is because
write (and writev) waits until the query has been processed:

  async _write(body, enc, next) {
    try {
      await this.send()        // I made this just sleep for 5s
      next()                          // this gets called when es has finished the query
    } catch(err) {
      next(err)
    }
  }

because of that, the elastic search stream falls behind very quickly and
node starts calling writev in order to help it catch up.

In copy-from, we this.connection.stream.write(...) and we don't wait for a response from the server. The destination doesn't fall behind, so node keeps calling write. Copy-from waits if there is backpressure from the socket and at the end for handleReadyForQuery to ensure everything has been sent. That seems like the right behaviour to me for this case.

@jeromew
Copy link
Collaborator

jeromew commented Dec 13, 2021

hum so this could be the explanation

  • there is no backpressure from the socket (throughput could be increased) so the writev mechanism is never called
  • and as a consequence the "1 row - 1 write - 1 send" pipeline has a high cycle cost can sometimes be lowered by forcing batches as in your branch

We never receive ACKs from postgres so the elastic search solution for batching could not work here except if we were doing mini-COPY operations maybe (some people in the past have thought about batching the COPYs). This sounds not easy though because we would need to always meet the row boundaries.

I'll look at your benchmarks to see if I can reproduce the problem on my laptop.
If the reasoning above is correct, a connection receiving backpressure would imply calls to _writev. I am not sure if I see how how to test that and what is the best solution.

at this time I don't know what is the best choice between implementing the batching mechanism internally or documenting an official way to batch the rows externally for those who need to push it to eleven ;-) - https://www.youtube.com/watch?v=hW008FcKr3Q

Maybe batching externally could bring both optimizations (batching when backpressure is never met, and _writev when backpressure is met)

Could it be that you meet this problem because you have a very high quality postgres architecture that never triggers backpressure ?

@byrney
Copy link
Author

byrney commented Dec 13, 2021

at this time I don't know what is the best choice between implementing the batching mechanism internally or documenting an official way to batch the rows externally

Absolutely, your call.

If you want I'll default to batches of 1 (backward compatible) and add the tests to make sure it's working correctly when batch > 1. If not, well, it's been fun figuring it out.

Could it be that you meet this problem because you have a very high quality postgres architecture that never triggers backpressure ?

I am testing with a local postgres on a laptop, so it's unix domain socket rather than a tcp connection. I reckon the network latency is very low but the postgres is below par.

This is the same copy-from.js as on master:

Existing benchmark:

1086f50!node-pg-copy-streams/bench *> PGPORT=5432 PGDATABASE=postgres node copy-from.js
unix pipe into psql COPY x 1.32 ops/sec ±12.74% (12 runs sampled)
pipe into psql COPY x 1.33 ops/sec ±5.88% (11 runs sampled)
pipe into pg-copy-stream COPY x 1.72 ops/sec ±4.42% (13 runs sampled)

new benchmark:

1086f50!node-pg-copy-streams/bench *> PGPORT=5432 PGDATABASE=postgres node copy-from-pull.js
pipe into psql COPY x 0.18 ops/sec ±5.72% (5 runs sampled)
pipe into pg-copy-stream COPY x 0.04 ops/sec ±2.86% (5 runs sampled)

Using the batched one:

Existing benchmark:

private/copy-from-performance!node-pg-copy-streams/bench *> PGPORT=5432 PGDATABASE=postgres node copy-from.js
unix pipe into psql COPY x 1.53 ops/sec ±4.98% (12 runs sampled)
pipe into psql COPY x 1.47 ops/sec ±2.93% (12 runs sampled)
pipe into pg-copy-stream COPY x 1.79 ops/sec ±3.36% (13 runs sampled)

new benchmark:

private/copy-from-performance!node-pg-copy-streams/bench *> PGPORT=5432 PGDATABASE=postgres node copy-from-pull.js
pipe into psql COPY x 0.18 ops/sec ±8.89% (5 runs sampled)
pipe into pg-copy-stream COPY x 0.54 ops/sec ±3.03% (7 runs sampled)

From this run it would seem that batching improves the existing benchmark as well. Instead of 11 it's like we made 10 a bit louder1.

Footnotes

  1. those numbers move around a bit I wouldn't take that too seriously.

@jeromew
Copy link
Collaborator

jeromew commented Dec 13, 2021

OK I have to grasp the 2 scenarios you mention in you benchmark.

the initial benchmark (wrapping seq in a stream) was supposed to mimic a real world scenario using streams but you are maybe right that it does not mimic a scenario with an in-js-loop stream that never ends generating data. Is it the scenario you are having ? In my frame of mind, data usually comes from an external source (a db, a file, ..) and there are breathing rooms from time to time.

Do you know of a tool that would draw/debug the dynamics of a stream pipeline ? I have been looking for one for a long time it would be handy to pinpoint the bottlenecks.

I'll do some tests to try and to understand what happens in the scenario of the new benchmark.

@byrney
Copy link
Author

byrney commented Dec 14, 2021

does not mimic a scenario with an in-js-loop stream that never ends generating data. Is it the scenario you are having ? In my frame of mind, data usually comes from an external source (a db, a file, ..) and there are breathing rooms from time to time.

The real code reads an mbtiles file (thats a sqlite database full of gzipped protocol buffers), passes each row through guznip stream, then we do some custom processing to generate the csv rows to pass to copy-from. The data does come from an external source but... each of the compressed pbf generates multiple rows of csv so it reads a lot less data than it sends.

The benchmark is a result of refining that process down so I could find the bottlenecks and then construct a test case we could focus on. It's not a precise repro of what we have, more an extreme example of the behaviour (what benchmark isn't?).

There are a bunch of factors, and cycle time is no doubt one of them, but I think the main one is the way copy-from writes to the socket. Sending many small chunks just has a high overhead:

If I write 1,000,000 characters to a socket we get:

> node socket-send.js 

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  1
sending: 6981.958ms

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  10
sending: 1003.448ms

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  100
sending: 159.518ms

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  1000
sending: 26.390ms

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  10000
sending: 6.330ms

Connected: 127.0.0.1 9999 Sending  1000000  in batches of  100000
sending: 4.551ms

The top one will make 1,000,000 calls to socket.write, the last one will make 10 but they send the same amount of data in total. The test script is here: https://gist.github.com/byrney/f4df10bd85292368f4b7a91bc3705c8b

If writev makes one call to socket.write for each row, it will still have the socket overhead. We don't see that problem in bench/copy-from.js because the reads from stdin will put lots of rows into each chunk.

ps. I have not found any easy way to view this stuff, I wish I had. The chrome profiler is just a mess of tiny calls. In the end I added instrumentation manually to count how many calls we're being made to write, writev, flush etc.

@jeromew
Copy link
Collaborator

jeromew commented Dec 16, 2021

I looked a bit into the benchmarks.
I agree that the old benchmarks are not correct for the use case where a user produces chunks row-by-row. As you noted, this use case is probably too heavy on the socket writes (too many small chunks)

I tested with the stream-chunker module and it seems to be more or less on-par, timewise with you implementation. The internals of this module could be improved because it has systematic row-by-row buffer concat whereas your implementation is more lazy on the concat which is better memory-wise and probably avoids memory fragmentation for large payloads.

I usually prefer stream composition over adding more features to an implementation, because I am not totally sure that the feature we would add would meet all use-cases. For instance, some people could prefer some kind of time-based chunker : aggregate and produce size-based chunks, but still produce lower-sized chunks if needed every ms-interval. I am not sure that would make sense because the data is not visible inside postgres before commit but yet I have the impression that we have not yet seen all use cases and that it is a bit too early too integrate the chunking mechanism inside copy-from implementation.

So at this stage I would probably document the 2 use cases on the README + give the name of a good module for chunk aggregation that can be used in the row-by-row use case. What is your opinion on this ?

During your tests, did you write a custom aggregator or did you use an existing module ?

@jeromew
Copy link
Collaborator

jeromew commented Dec 16, 2021

@jeromew
Copy link
Collaborator

jeromew commented Dec 16, 2021

pipe into psql COPY x 6.16 ops/sec ±15.45% (37 runs sampled)
pipe into pg-copy-stream COPY x 1.22 ops/sec ±5.62% (11 runs sampled)
pipe into pg-copy-stream COPY (batched version) x 37.35 ops/sec ±7.27% (60 runs sampled)
pipe stream-chunked data into pg-copy-stream COPY x 5.55 ops/sec ±4.26% (31 runs sampled)
pipe rebuffer data into pg-copy-stream COPY x 30.11 ops/sec ±10.92% (53 runs sampled)
pipe block-stream2 data into pg-copy-stream COPY x 39.06 ops/sec ±8.27% (64 runs sampled)
pipe stream-chunkify data into pg-copy-stream COPY x 37.27 ops/sec ±10.17% (63 runs sampled)

all libs setup with size = 512 * 1024.

stream-chunked does not behave well because of the chunk-by-chunk concat.
block-stream2 seems to have the best profile, not far from your batched version.

for a comparison of the 2 I can see that

the batched version does not try to force the exact size of the output buffer + is integrated in the overall Writable mechanism so it avoids the piping mechanism overhead.

the block-stream2 maybe benefits from using the Buffer.concat(., totalLength) API (this could also be added to the batched version) but has to do a little buffer dance to extract fixed sized buffers, which has a cost. I am surprised that the impact of the piping mechanism overhead does not seem to have a noticeable impact.

I would have liked to compare to something using bl under the hood as it is supposed to be optimized for this kind of buffer list management but could not find a module that uses it for the purpose we need.

@jeromew
Copy link
Collaborator

jeromew commented Dec 17, 2021

I did some more tests on this. It looks like I lowered the number of generated lines and that it gave wrong results on the benchmarks (only 9999 lines were generated). If I increase the number of lines to 99999, block-stream2 hits a problem. I did not try to understand why, but it is probably due to the buffer dance it does (too many calls to push).

the best results I have so far is with a custom chunker which is pretty close to what you integrated in your modified copy-from.js. On my machine the performance peaks at 128 * 1024.

class MyChunker extends stream.Transform {
  constructor(options) {
    super(options)
    this.buf = []
    this.bufLength = 0
    this.size = (options && options.size) || 1024 * 128
  }
  _transform(chunk, encoding, callback) {
    this.buf.push(chunk)
    this.bufLength += Buffer.byteLength(chunk)

    if (this.bufLength >= this.size) {
      const buf = Buffer.concat(this.buf, this.bufLength)
      this.buf.length = 0
      this.bufLength = 0
      this.push(buf)
    }

    callback()
  }
  _flush(callback) {
    if (this.bufLength > 0) {
      const buf = Buffer.concat(this.buf, this.bufLength)
      this.buf = []
      this.bufLength = 0
      this.push(buf)
    }
    callback()
  }
}

@byrney
Copy link
Author

byrney commented Dec 17, 2021

Thanks for the thorough investigation!

I did write a batching transform initially, but it didn't concat the chunks so it didn't have much impact. That's when I started looking at the internals of copy-from and the changes in my branch were the result of that investigation.

If you prefer not to incorporate the chunking into copy-from (totally understandable), then I will do a chunker transform like yours. Anyone else with a similar problem might find this issue and get a good answer too. The only downside I can see is that it won't be incorporated into the copy-from benchmarks but that's a minor thing.

Thanks again.

@jeromew
Copy link
Collaborator

jeromew commented Dec 17, 2021

I agree that there are advantages to have it incorporated into copy-from. It is not so easy for new users of the module to know if and when adding the interim chunker brings a benefit.

Do you have ideas about how we could ease things even more for the network layer ? Here we build chunks with a minimum size of 512 x 1024 or 128 x 1024 bytes, which seems to boost things up in your case. This is fast because we just concat the buffers without rebuffering to a specific size. Do you observe the ~6x speedup that we observe in the benchmarks during your real world scenario ?

But we could try to make things really easy on the network layer by exactly matching the size that the network layer expects for its optimum path. Is there such a thing and can we reach it through node.js ?

@jeromew
Copy link
Collaborator

jeromew commented Dec 17, 2021

for future reference: In case we need an integrated implementation that exactly respect the chunk size, the stream-chunkify module seems to have an implementation that has a good performance (nearly on par with MyChunker above, and a lot better than rebuffer, block-stream2 and stream-chunked

https://github.com/ItalyPaleAle/node-stream-chunkify/blob/master/index.js

the idea is to create buffers of the expected size as a plaholder for copies of the received chunked data. We could probably adapt it to integrate the 5 bytes copyData+length inside the requested size and thus have 1 call to socket.write instead of 2 per sent copyData.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants