|
|
|
@ -448,7 +448,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
|
|
|
|
|
// configured value for inflow, that will be updated when we send a
|
|
|
|
|
// WINDOW_UPDATE shortly after sending SETTINGS.
|
|
|
|
|
sc.flow.add(initialWindowSize)
|
|
|
|
|
sc.inflow.add(initialWindowSize)
|
|
|
|
|
sc.inflow.init(initialWindowSize)
|
|
|
|
|
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
|
|
|
|
|
sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())
|
|
|
|
|
|
|
|
|
@ -563,8 +563,8 @@ type serverConn struct {
|
|
|
|
|
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
|
|
|
|
|
bodyReadCh chan bodyReadMsg // from handlers -> serve
|
|
|
|
|
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
|
|
|
|
|
flow flow // conn-wide (not stream-specific) outbound flow control
|
|
|
|
|
inflow flow // conn-wide inbound flow control
|
|
|
|
|
flow outflow // conn-wide (not stream-specific) outbound flow control
|
|
|
|
|
inflow inflow // conn-wide inbound flow control
|
|
|
|
|
tlsState *tls.ConnectionState // shared by all handlers, like net/http
|
|
|
|
|
remoteAddrStr string
|
|
|
|
|
writeSched WriteScheduler
|
|
|
|
@ -643,8 +643,8 @@ type stream struct {
|
|
|
|
|
// owned by serverConn's serve loop:
|
|
|
|
|
bodyBytes int64 // body bytes seen so far
|
|
|
|
|
declBodyBytes int64 // or -1 if undeclared
|
|
|
|
|
flow flow // limits writing from Handler to client
|
|
|
|
|
inflow flow // what the client is allowed to POST/etc to us
|
|
|
|
|
flow outflow // limits writing from Handler to client
|
|
|
|
|
inflow inflow // what the client is allowed to POST/etc to us
|
|
|
|
|
state streamState
|
|
|
|
|
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
|
|
|
|
|
gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
|
|
@ -1503,7 +1503,7 @@ func (sc *serverConn) processFrame(f Frame) error {
|
|
|
|
|
if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
|
|
|
|
|
|
|
|
|
|
if f, ok := f.(*DataFrame); ok {
|
|
|
|
|
if sc.inflow.available() < int32(f.Length) {
|
|
|
|
|
if !sc.inflow.take(f.Length) {
|
|
|
|
|
return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
|
|
|
|
|
}
|
|
|
|
|
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
@ -1775,14 +1775,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
|
|
|
// But still enforce their connection-level flow control,
|
|
|
|
|
// and return any flow control bytes since we're not going
|
|
|
|
|
// to consume them.
|
|
|
|
|
if sc.inflow.available() < int32(f.Length) {
|
|
|
|
|
if !sc.inflow.take(f.Length) {
|
|
|
|
|
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
|
|
|
|
|
}
|
|
|
|
|
// Deduct the flow control from inflow, since we're
|
|
|
|
|
// going to immediately add it back in
|
|
|
|
|
// sendWindowUpdate, which also schedules sending the
|
|
|
|
|
// frames.
|
|
|
|
|
sc.inflow.take(int32(f.Length))
|
|
|
|
|
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
|
|
|
|
|
|
if st != nil && st.resetQueued {
|
|
|
|
@ -1797,10 +1792,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
|
|
|
|
|
|
|
|
// Sender sending more than they'd declared?
|
|
|
|
|
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
|
|
|
|
|
if sc.inflow.available() < int32(f.Length) {
|
|
|
|
|
if !sc.inflow.take(f.Length) {
|
|
|
|
|
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
|
|
|
|
|
}
|
|
|
|
|
sc.inflow.take(int32(f.Length))
|
|
|
|
|
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
|
|
|
|
|
|
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
|
|
|
|
@ -1811,10 +1805,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
|
|
|
}
|
|
|
|
|
if f.Length > 0 {
|
|
|
|
|
// Check whether the client has flow control quota.
|
|
|
|
|
if st.inflow.available() < int32(f.Length) {
|
|
|
|
|
if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
|
|
|
|
|
return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
|
|
|
|
|
}
|
|
|
|
|
st.inflow.take(int32(f.Length))
|
|
|
|
|
|
|
|
|
|
if len(data) > 0 {
|
|
|
|
|
wrote, err := st.body.Write(data)
|
|
|
|
@ -1830,11 +1823,13 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
|
|
|
|
|
|
|
|
// Return any padded flow control now, since we won't
|
|
|
|
|
// refund it later on body reads.
|
|
|
|
|
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
|
|
|
|
|
// Call sendWindowUpdate even if there is no padding,
|
|
|
|
|
// to return buffered flow control credit if the sent
|
|
|
|
|
// window has shrunk.
|
|
|
|
|
pad := int32(f.Length) - int32(len(data))
|
|
|
|
|
sc.sendWindowUpdate32(nil, pad)
|
|
|
|
|
sc.sendWindowUpdate32(st, pad)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if f.StreamEnded() {
|
|
|
|
|
st.endStream()
|
|
|
|
|
}
|
|
|
|
@ -2105,8 +2100,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
|
|
|
|
|
st.cw.Init()
|
|
|
|
|
st.flow.conn = &sc.flow // link to conn-level counter
|
|
|
|
|
st.flow.add(sc.initialStreamSendWindowSize)
|
|
|
|
|
st.inflow.conn = &sc.inflow // link to conn-level counter
|
|
|
|
|
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
|
|
|
|
|
st.inflow.init(sc.srv.initialStreamRecvWindowSize())
|
|
|
|
|
if sc.hs.WriteTimeout != 0 {
|
|
|
|
|
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
|
|
|
|
|
}
|
|
|
|
@ -2388,47 +2382,28 @@ func (sc *serverConn) noteBodyRead(st *stream, n int) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// st may be nil for conn-level
|
|
|
|
|
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
|
|
sc.serveG.check()
|
|
|
|
|
// "The legal range for the increment to the flow control
|
|
|
|
|
// window is 1 to 2^31-1 (2,147,483,647) octets."
|
|
|
|
|
// A Go Read call on 64-bit machines could in theory read
|
|
|
|
|
// a larger Read than this. Very unlikely, but we handle it here
|
|
|
|
|
// rather than elsewhere for now.
|
|
|
|
|
const maxUint31 = 1<<31 - 1
|
|
|
|
|
for n > maxUint31 {
|
|
|
|
|
sc.sendWindowUpdate32(st, maxUint31)
|
|
|
|
|
n -= maxUint31
|
|
|
|
|
}
|
|
|
|
|
sc.sendWindowUpdate32(st, int32(n))
|
|
|
|
|
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
|
|
|
|
|
sc.sendWindowUpdate(st, int(n))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// st may be nil for conn-level
|
|
|
|
|
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
|
|
|
|
|
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
|
|
sc.serveG.check()
|
|
|
|
|
if n == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if n < 0 {
|
|
|
|
|
panic("negative update")
|
|
|
|
|
}
|
|
|
|
|
var streamID uint32
|
|
|
|
|
if st != nil {
|
|
|
|
|
var send int32
|
|
|
|
|
if st == nil {
|
|
|
|
|
send = sc.inflow.add(n)
|
|
|
|
|
} else {
|
|
|
|
|
streamID = st.id
|
|
|
|
|
send = st.inflow.add(n)
|
|
|
|
|
}
|
|
|
|
|
if send == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
sc.writeFrame(FrameWriteRequest{
|
|
|
|
|
write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
|
|
|
|
|
write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
|
|
|
|
|
stream: st,
|
|
|
|
|
})
|
|
|
|
|
var ok bool
|
|
|
|
|
if st == nil {
|
|
|
|
|
ok = sc.inflow.add(n)
|
|
|
|
|
} else {
|
|
|
|
|
ok = st.inflow.add(n)
|
|
|
|
|
}
|
|
|
|
|
if !ok {
|
|
|
|
|
panic("internal error; sent too many window updates without decrements?")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// requestBody is the Handler's Request.Body type.
|
|
|
|
|