forked from microsoft/azurelinux
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AUTO-CHERRYPICK] [3.0] Fix CVE-2023-39325 and CVE-2023-44487 for con…
…tainerized-data-importer - branch 3.0-dev (microsoft#12085) Co-authored-by: Henry Li <[email protected]>
- Loading branch information
1 parent
7cef86d
commit b06765c
Showing
3 changed files
with
381 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go | ||
index 8cb14f3..6000140 100644 | ||
--- a/vendor/golang.org/x/net/http2/server.go | ||
+++ b/vendor/golang.org/x/net/http2/server.go | ||
@@ -581,9 +581,11 @@ type serverConn struct { | ||
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client | ||
curClientStreams uint32 // number of open streams initiated by the client | ||
curPushedStreams uint32 // number of open streams initiated by server push | ||
+ curHandlers uint32 // number of running handler goroutines | ||
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests | ||
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes | ||
streams map[uint32]*stream | ||
+ unstartedHandlers []unstartedHandler | ||
initialStreamSendWindowSize int32 | ||
maxFrameSize int32 | ||
peerMaxHeaderListSize uint32 // zero means unknown (default) | ||
@@ -981,6 +983,8 @@ func (sc *serverConn) serve() { | ||
return | ||
case gracefulShutdownMsg: | ||
sc.startGracefulShutdownInternal() | ||
+ case handlerDoneMsg: | ||
+ sc.handlerDone() | ||
default: | ||
panic("unknown timer") | ||
} | ||
@@ -1028,6 +1032,7 @@ var ( | ||
idleTimerMsg = new(serverMessage) | ||
shutdownTimerMsg = new(serverMessage) | ||
gracefulShutdownMsg = new(serverMessage) | ||
+ handlerDoneMsg = new(serverMessage) | ||
) | ||
|
||
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } | ||
@@ -2022,8 +2027,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { | ||
} | ||
} | ||
|
||
- go sc.runHandler(rw, req, handler) | ||
- return nil | ||
+ return sc.scheduleHandler(id, rw, req, handler) | ||
} | ||
|
||
func (sc *serverConn) upgradeRequest(req *http.Request) { | ||
@@ -2043,6 +2047,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) { | ||
sc.conn.SetReadDeadline(time.Time{}) | ||
} | ||
|
||
+ // This is the first request on the connection, | ||
+ // so start the handler directly rather than going | ||
+ // through scheduleHandler. | ||
+ sc.curHandlers++ | ||
go sc.runHandler(rw, req, sc.handler.ServeHTTP) | ||
} | ||
|
||
@@ -2283,8 +2291,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response | ||
return &responseWriter{rws: rws} | ||
} | ||
|
||
+type unstartedHandler struct { | ||
+ streamID uint32 | ||
+ rw *responseWriter | ||
+ req *http.Request | ||
+ handler func(http.ResponseWriter, *http.Request) | ||
+} | ||
+ | ||
+// scheduleHandler starts a handler goroutine, | ||
+// or schedules one to start as soon as an existing handler finishes. | ||
+func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error { | ||
+ sc.serveG.check() | ||
+ maxHandlers := sc.advMaxStreams | ||
+ if sc.curHandlers < maxHandlers { | ||
+ sc.curHandlers++ | ||
+ go sc.runHandler(rw, req, handler) | ||
+ return nil | ||
+ } | ||
+ if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) { | ||
+ return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm)) | ||
+ } | ||
+ sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{ | ||
+ streamID: streamID, | ||
+ rw: rw, | ||
+ req: req, | ||
+ handler: handler, | ||
+ }) | ||
+ return nil | ||
+} | ||
+ | ||
+func (sc *serverConn) handlerDone() { | ||
+ sc.serveG.check() | ||
+ sc.curHandlers-- | ||
+ i := 0 | ||
+ maxHandlers := sc.advMaxStreams | ||
+ for ; i < len(sc.unstartedHandlers); i++ { | ||
+ u := sc.unstartedHandlers[i] | ||
+ if sc.streams[u.streamID] == nil { | ||
+ // This stream was reset before its goroutine had a chance to start. | ||
+ continue | ||
+ } | ||
+ if sc.curHandlers >= maxHandlers { | ||
+ break | ||
+ } | ||
+ sc.curHandlers++ | ||
+ go sc.runHandler(u.rw, u.req, u.handler) | ||
+ sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references | ||
+ } | ||
+ sc.unstartedHandlers = sc.unstartedHandlers[i:] | ||
+ if len(sc.unstartedHandlers) == 0 { | ||
+ sc.unstartedHandlers = nil | ||
+ } | ||
+} | ||
+ | ||
// Run on its own goroutine. | ||
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { | ||
+ defer sc.sendServeMsg(handlerDoneMsg) | ||
didPanic := true | ||
defer func() { | ||
rw.rws.stream.cancelCtx() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go | ||
index 3dd1564..9d9a3fd 100644 | ||
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go | ||
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go | ||
@@ -165,15 +165,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, | ||
ID: http2.SettingMaxFrameSize, | ||
Val: http2MaxFrameLen, | ||
}} | ||
- // TODO(zhaoq): Have a better way to signal "no limit" because 0 is | ||
- // permitted in the HTTP2 spec. | ||
- maxStreams := config.MaxStreams | ||
- if maxStreams == 0 { | ||
- maxStreams = math.MaxUint32 | ||
- } else { | ||
+ if config.MaxStreams != math.MaxUint32 { | ||
isettings = append(isettings, http2.Setting{ | ||
ID: http2.SettingMaxConcurrentStreams, | ||
- Val: maxStreams, | ||
+ Val: config.MaxStreams, | ||
}) | ||
} | ||
dynamicWindow := true | ||
@@ -252,7 +247,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, | ||
framer: framer, | ||
readerDone: make(chan struct{}), | ||
writerDone: make(chan struct{}), | ||
- maxStreams: maxStreams, | ||
+ maxStreams: config.MaxStreams, | ||
inTapHandle: config.InTapHandle, | ||
fc: &trInFlow{limit: uint32(icwz)}, | ||
state: reachable, | ||
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go | ||
index f4dde72..98839ad 100644 | ||
--- a/vendor/google.golang.org/grpc/server.go | ||
+++ b/vendor/google.golang.org/grpc/server.go | ||
@@ -43,7 +43,6 @@ import ( | ||
"google.golang.org/grpc/internal" | ||
"google.golang.org/grpc/internal/binarylog" | ||
"google.golang.org/grpc/internal/channelz" | ||
- "google.golang.org/grpc/internal/grpcrand" | ||
"google.golang.org/grpc/internal/grpcsync" | ||
"google.golang.org/grpc/internal/transport" | ||
"google.golang.org/grpc/keepalive" | ||
@@ -74,10 +73,10 @@ func init() { | ||
srv.drainServerTransports(addr) | ||
} | ||
internal.AddGlobalServerOptions = func(opt ...ServerOption) { | ||
- extraServerOptions = append(extraServerOptions, opt...) | ||
+ globalServerOptions = append(globalServerOptions, opt...) | ||
} | ||
internal.ClearGlobalServerOptions = func() { | ||
- extraServerOptions = nil | ||
+ globalServerOptions = nil | ||
} | ||
internal.BinaryLogger = binaryLogger | ||
internal.JoinServerOptions = newJoinServerOption | ||
@@ -115,12 +114,6 @@ type serviceInfo struct { | ||
mdata interface{} | ||
} | ||
|
||
-type serverWorkerData struct { | ||
- st transport.ServerTransport | ||
- wg *sync.WaitGroup | ||
- stream *transport.Stream | ||
-} | ||
- | ||
// Server is a gRPC server to serve RPC requests. | ||
type Server struct { | ||
opts serverOptions | ||
@@ -145,7 +138,7 @@ type Server struct { | ||
channelzID *channelz.Identifier | ||
czData *channelzData | ||
|
||
- serverWorkerChannels []chan *serverWorkerData | ||
+ serverWorkerChannel chan func() | ||
} | ||
|
||
type serverOptions struct { | ||
@@ -177,13 +170,14 @@ type serverOptions struct { | ||
} | ||
|
||
var defaultServerOptions = serverOptions{ | ||
+ maxConcurrentStreams: math.MaxUint32, | ||
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, | ||
maxSendMessageSize: defaultServerMaxSendMessageSize, | ||
connectionTimeout: 120 * time.Second, | ||
writeBufferSize: defaultWriteBufSize, | ||
readBufferSize: defaultReadBufSize, | ||
} | ||
-var extraServerOptions []ServerOption | ||
+var globalServerOptions []ServerOption | ||
|
||
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. | ||
type ServerOption interface { | ||
@@ -387,6 +381,9 @@ func MaxSendMsgSize(m int) ServerOption { | ||
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number | ||
// of concurrent streams to each ServerTransport. | ||
func MaxConcurrentStreams(n uint32) ServerOption { | ||
+ if n == 0 { | ||
+ n = math.MaxUint32 | ||
+ } | ||
return newFuncServerOption(func(o *serverOptions) { | ||
o.maxConcurrentStreams = n | ||
}) | ||
@@ -565,42 +562,35 @@ const serverWorkerResetThreshold = 1 << 16 | ||
// re-allocations (see the runtime.morestack problem [1]). | ||
// | ||
// [1] https://github.com/golang/go/issues/18138 | ||
-func (s *Server) serverWorker(ch chan *serverWorkerData) { | ||
- // To make sure all server workers don't reset at the same time, choose a | ||
- // random number of iterations before resetting. | ||
- threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) | ||
- for completed := 0; completed < threshold; completed++ { | ||
- data, ok := <-ch | ||
+func (s *Server) serverWorker() { | ||
+ for completed := 0; completed < serverWorkerResetThreshold; completed++ { | ||
+ f, ok := <-s.serverWorkerChannel | ||
if !ok { | ||
return | ||
} | ||
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) | ||
- data.wg.Done() | ||
+ f() | ||
} | ||
- go s.serverWorker(ch) | ||
+ go s.serverWorker() | ||
} | ||
|
||
// initServerWorkers creates worker goroutines and channels to process incoming | ||
// connections to reduce the time spent overall on runtime.morestack. | ||
func (s *Server) initServerWorkers() { | ||
- s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) | ||
+ s.serverWorkerChannel = make(chan func()) | ||
for i := uint32(0); i < s.opts.numServerWorkers; i++ { | ||
- s.serverWorkerChannels[i] = make(chan *serverWorkerData) | ||
- go s.serverWorker(s.serverWorkerChannels[i]) | ||
+ go s.serverWorker() | ||
} | ||
} | ||
|
||
func (s *Server) stopServerWorkers() { | ||
- for i := uint32(0); i < s.opts.numServerWorkers; i++ { | ||
- close(s.serverWorkerChannels[i]) | ||
- } | ||
+ close(s.serverWorkerChannel) | ||
} | ||
|
||
// NewServer creates a gRPC server which has no service registered and has not | ||
// started to accept requests yet. | ||
func NewServer(opt ...ServerOption) *Server { | ||
opts := defaultServerOptions | ||
- for _, o := range extraServerOptions { | ||
+ for _, o := range globalServerOptions { | ||
o.apply(&opts) | ||
} | ||
for _, o := range opt { | ||
@@ -945,25 +935,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) { | ||
defer st.Close() | ||
var wg sync.WaitGroup | ||
|
||
- var roundRobinCounter uint32 | ||
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) | ||
st.HandleStreams(func(stream *transport.Stream) { | ||
wg.Add(1) | ||
+ | ||
+ streamQuota.acquire() | ||
+ f := func() { | ||
+ defer streamQuota.release() | ||
+ defer wg.Done() | ||
+ s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
+ } | ||
+ | ||
if s.opts.numServerWorkers > 0 { | ||
- data := &serverWorkerData{st: st, wg: &wg, stream: stream} | ||
select { | ||
- case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: | ||
+ case s.serverWorkerChannel <- f: | ||
+ return | ||
default: | ||
// If all stream workers are busy, fallback to the default code path. | ||
- go func() { | ||
- s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
- wg.Done() | ||
- }() | ||
} | ||
} else { | ||
- go func() { | ||
- defer wg.Done() | ||
- s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
- }() | ||
+ go f() | ||
} | ||
}, func(ctx context.Context, method string) context.Context { | ||
if !EnableTracing { | ||
@@ -1978,3 +1969,34 @@ type channelzServer struct { | ||
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { | ||
return c.s.channelzMetric() | ||
} | ||
+ | ||
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be | ||
+// called synchronously; release may be called asynchronously. | ||
+type atomicSemaphore struct { | ||
+ n atomic.Int64 | ||
+ wait chan struct{} | ||
+} | ||
+ | ||
+func (q *atomicSemaphore) acquire() { | ||
+ if q.n.Add(-1) < 0 { | ||
+ // We ran out of quota. Block until a release happens. | ||
+ <-q.wait | ||
+ } | ||
+} | ||
+ | ||
+func (q *atomicSemaphore) release() { | ||
+ // N.B. the "<= 0" check below should allow for this to work with multiple | ||
+ // concurrent calls to acquire, but also note that with synchronous calls to | ||
+ // acquire, as our system does, n will never be less than -1. There are | ||
+ // fairness issues (queuing) to consider if this was to be generalized. | ||
+ if q.n.Add(1) <= 0 { | ||
+ // An acquire was waiting on us. Unblock it. | ||
+ q.wait <- struct{}{} | ||
+ } | ||
+} | ||
+ | ||
+func newHandlerQuota(n uint32) *atomicSemaphore { | ||
+ a := &atomicSemaphore{wait: make(chan struct{}, 1)} | ||
+ a.n.Store(int64(n)) | ||
+ return a | ||
+} | ||
\ No newline at end of file | ||
diff --git a/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go | ||
index d738725..3674914 100644 | ||
--- a/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go | ||
+++ b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go | ||
@@ -126,14 +126,17 @@ type rudimentaryErrorBackoff struct { | ||
// OnError will block if it is called more often than the embedded period time. | ||
// This will prevent overly tight hot error loops. | ||
func (r *rudimentaryErrorBackoff) OnError(error) { | ||
+ now := time.Now() // start the timer before acquiring the lock | ||
r.lastErrorTimeLock.Lock() | ||
- defer r.lastErrorTimeLock.Unlock() | ||
- d := time.Since(r.lastErrorTime) | ||
- if d < r.minPeriod { | ||
- // If the time moves backwards for any reason, do nothing | ||
- time.Sleep(r.minPeriod - d) | ||
- } | ||
+ d := now.Sub(r.lastErrorTime) | ||
r.lastErrorTime = time.Now() | ||
+ r.lastErrorTimeLock.Unlock() | ||
+ | ||
+ // Do not sleep with the lock held because that causes all callers of HandleError to block. | ||
+ // We only want the current goroutine to block. | ||
+ // A negative or zero duration causes time.Sleep to return immediately. | ||
+ // If the time moves backwards for any reason, do nothing. | ||
+ time.Sleep(r.minPeriod - d) | ||
} | ||
|
||
// GetCaller returns the caller of the function that calls it. |
Oops, something went wrong.