Don't compact the receive buffer unless we've actually run out of space

Previously, we would compact the buffer (moving unread data to the
start of the buffer) as follows:

- after processing a message, if there are zero unread bytes, just reset
  the indices for first and last unread byte to zero
- else, if at least 1/8th of the buffer is used, copy remaining data to the beginning of the buffer

The second option is never actually necessary, as before inserting new data
into the array, we already check if there's enough free space, and
compact the buffer first if necessary. So we've been doing a lot of
copies that weren't actually needed. Let's not do that any more.
pull/36/head
Jesper Alf Dam 6 years ago committed by Lauri Kasanen
parent d4747a8c80
commit 09de4b8349

@ -27,7 +27,6 @@ export default class Websock {
this._rQi = 0; // Receive queue index this._rQi = 0; // Receive queue index
this._rQlen = 0; // Next write position in the receive queue this._rQlen = 0; // Next write position in the receive queue
this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB) this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB)
this._rQmax = this._rQbufferSize / 8;
// called in init: this._rQ = new Uint8Array(this._rQbufferSize); // called in init: this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ = null; // Receive queue this._rQ = null; // Receive queue
@ -226,15 +225,15 @@ export default class Websock {
} }
_expand_compact_rQ(min_fit) { _expand_compact_rQ(min_fit) {
const resizeNeeded = min_fit || this.rQlen > this._rQbufferSize / 2; // if we're using less than 1/8th of the buffer even with the incoming bytes, compact in place
// instead of resizing
const required_buffer_size = (this._rQlen - this._rQi + min_fit) * 8;
const resizeNeeded = this._rQbufferSize < required_buffer_size;
if (resizeNeeded) { if (resizeNeeded) {
if (!min_fit) { // Make sure we always *at least* double the buffer size, and have at least space for 8x
// just double the size if we need to do compaction // the current amount of data
this._rQbufferSize *= 2; this._rQbufferSize = Math.max(this._rQbufferSize * 2, required_buffer_size);
} else {
// otherwise, make sure we satisy rQlen - rQi + min_fit < rQbufferSize / 8
this._rQbufferSize = (this.rQlen + min_fit) * 8;
}
} }
// we don't want to grow unboundedly // we don't want to grow unboundedly
@ -247,7 +246,6 @@ export default class Websock {
if (resizeNeeded) { if (resizeNeeded) {
const old_rQbuffer = this._rQ.buffer; const old_rQbuffer = this._rQ.buffer;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize); this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi)); this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
} else { } else {
@ -280,8 +278,6 @@ export default class Websock {
if (this._rQlen == this._rQi) { if (this._rQlen == this._rQi) {
this._rQlen = 0; this._rQlen = 0;
this._rQi = 0; this._rQi = 0;
} else if (this._rQlen > this._rQmax) {
this._expand_compact_rQ();
} }
} else { } else {
Log.Debug("Ignoring empty message"); Log.Debug("Ignoring empty message");

@ -384,26 +384,35 @@ describe('Websock', function () {
expect(sock._eventHandlers.message).not.to.have.been.called; expect(sock._eventHandlers.message).not.to.have.been.called;
}); });
it('should compact the receive queue', function () { it('should compact the receive queue when a message handler empties it', function () {
// NB(sross): while this is an internal implementation detail, it's important to sock._eventHandlers.message = () => { sock.rQi = sock._rQlen; };
// test, otherwise the receive queue could become very large very quickly
sock._rQ = new Uint8Array([0, 1, 2, 3, 4, 5, 0, 0, 0, 0]); sock._rQ = new Uint8Array([0, 1, 2, 3, 4, 5, 0, 0, 0, 0]);
sock._rQlen = 6; sock._rQlen = 6;
sock.rQi = 6; sock.rQi = 6;
sock._rQmax = 3;
const msg = { data: new Uint8Array([1, 2, 3]).buffer }; const msg = { data: new Uint8Array([1, 2, 3]).buffer };
sock._mode = 'binary'; sock._mode = 'binary';
sock._recv_message(msg); sock._recv_message(msg);
expect(sock._rQlen).to.equal(3); expect(sock._rQlen).to.equal(0);
expect(sock.rQi).to.equal(0); expect(sock.rQi).to.equal(0);
}); });
it('should automatically resize the receive queue if the incoming message is too large', function () { it('should compact the receive queue when we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQbufferSize = 20;
sock._rQlen = 20;
sock.rQi = 10;
const msg = { data: new Uint8Array([1, 2]).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(12);
expect(sock.rQi).to.equal(0);
});
it('should automatically resize the receive queue if the incoming message is larger than the buffer', function () {
sock._rQ = new Uint8Array(20); sock._rQ = new Uint8Array(20);
sock._rQlen = 0; sock._rQlen = 0;
sock.rQi = 0; sock.rQi = 0;
sock._rQbufferSize = 20; sock._rQbufferSize = 20;
sock._rQmax = 2;
const msg = { data: new Uint8Array(30).buffer }; const msg = { data: new Uint8Array(30).buffer };
sock._mode = 'binary'; sock._mode = 'binary';
sock._recv_message(msg); sock._recv_message(msg);
@ -411,6 +420,19 @@ describe('Websock', function () {
expect(sock.rQi).to.equal(0); expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen
}); });
it('should automatically resize the receive queue if the incoming message is larger than 1/8th of the buffer and we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 16;
sock.rQi = 16;
sock._rQbufferSize = 20;
const msg = { data: new Uint8Array(6).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(6);
expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(48);
});
}); });
describe('Data encoding', function () { describe('Data encoding', function () {

Loading…
Cancel
Save