From 4d2dbe68fa244ea100356ca1f104f7fb1c6ee2ae Mon Sep 17 00:00:00 2001 From: Brandon Simmons Date: Tue, 17 Mar 2020 21:31:22 -0400 Subject: [PATCH] Use AssertNF for leak prophylaxis when doing mutable writes. Closes #3937 (#4097) Writing to a mutable var is a particularly potent source of leaks since it mostly defeats GHC's analysis. Here we add assertions to all mutable writes, and fix a couple spots where we wrote some thunks to a mutable var (compiled with -O2). Some of these thunks were probably benign, but others looked liked they might be retaining big args. Didn't do much analysis, just fixed. Actually pretty happy with how easy this was to use and as a diagnostic, once I sorted out some issues. We should consider using it elsewhere, and maybe extending so that we can use it with tests, enable when `-fenable-assertsions` etc. Relates #3388 Also simplified codepaths that use `AcceptWith`, which has unnecessary `Maybe` fields. --- CHANGELOG.md | 1 + server/cabal.project.freeze | 1 + server/graphql-engine.cabal | 3 ++ server/src-lib/Hasura/App.hs | 8 ++++ server/src-lib/Hasura/Events/Lib.hs | 2 +- .../Hasura/GraphQL/Execute/LiveQuery/Poll.hs | 6 ++- .../Hasura/GraphQL/Execute/LiveQuery/State.hs | 14 +++++-- .../Hasura/GraphQL/Execute/LiveQuery/TMap.hs | 2 +- .../Hasura/GraphQL/Transport/WebSocket.hs | 39 ++++++++++++------- .../GraphQL/Transport/WebSocket/Server.hs | 34 +++++++++------- server/src-lib/Hasura/Server/Auth/JWT.hs | 7 +++- server/src-lib/Hasura/Server/SchemaUpdate.hs | 2 + 12 files changed, 79 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc68fb0..9173e762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,4 +83,5 @@ when there were previously no events to process. Potential space leak fixed. (#3839) - console: track runtime errors (#4083) - auto-include `__typename` field in custom types' objects (fix #4063) +- squash some potential space leaks (#3937) - docs: bump MarupSafe version (#4102) diff --git a/server/cabal.project.freeze b/server/cabal.project.freeze index 3f90eae4..1180cb26 100644 --- a/server/cabal.project.freeze +++ b/server/cabal.project.freeze @@ -132,6 +132,7 @@ constraints: any.Cabal ==2.4.0.1, any.generic-arbitrary ==0.1.0, any.ghc-boot-th ==8.6.5, any.ghc-prim ==0.5.3, + any.ghc-heap-view ==0.6.0, any.happy ==1.19.12, happy +small_base, any.hashable ==1.2.7.0, diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 825018ed..3bd17823 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -192,6 +192,9 @@ library -- testing , QuickCheck , generic-arbitrary + -- 0.6.1 is supposedly not okay for ghc 8.6: + -- https://github.com/nomeata/ghc-heap-view/issues/27 + , ghc-heap-view == 0.6.0 , directory diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index c210d94c..2b863047 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -9,6 +9,7 @@ import Control.Monad.STM (atomically) import Control.Monad.Trans.Control (MonadBaseControl (..)) import Data.Aeson ((.=)) import Data.Time.Clock (UTCTime, getCurrentTime) +import GHC.AssertNF import Options.Applicative import System.Environment (getEnvironment, lookupEnv) import System.Exit (exitFailure) @@ -203,6 +204,13 @@ runHGEServer -- ^ start time -> m () runHGEServer ServeOptions{..} InitCtx{..} initTime = do + -- Comment this to enable expensive assertions from "GHC.AssertNF". These will log lines to + -- STDOUT containing "not in normal form". In the future we could try to integrate this into + -- our tests. For now this is a development tool. + -- + -- NOTE: be sure to compile WITHOUT code coverage, for this to work properly. + liftIO disableAssertNF + let sqlGenCtx = SQLGenCtx soStringifyNum Loggers loggerCtx logger _ = _icLoggers diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index d87f4546..67ae0398 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -215,7 +215,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads: capacity <- readTVar _eeCtxEventThreadsCapacity check $ capacity > 0 - writeTVar _eeCtxEventThreadsCapacity (capacity - 1) + writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1) -- since there is some capacity in our worker threads, we can launch another: let restoreCapacity = liftIO $ atomically $ modifyTVar' _eeCtxEventThreadsCapacity (+ 1) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index 0ebcba0b..1a179cd4 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -50,6 +50,7 @@ import qualified StmContainers.Map as STMMap import qualified System.Metrics.Distribution as Metrics import Data.List.Split (chunksOf) +import GHC.AssertNF import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap @@ -186,12 +187,13 @@ pushResultToCohort -> LiveQueryMetadata -> CohortSnapshot -> IO () -pushResultToCohort result respHashM (LiveQueryMetadata dTime) cohortSnapshot = do +pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do prevRespHashM <- STM.readTVarIO respRef -- write to the current websockets if needed sinks <- if isExecError result || respHashM /= prevRespHashM then do + $assertNFHere respHashM -- so we don't write thunks to mutable vars STM.atomically $ STM.writeTVar respRef respHashM return (newSinks <> curSinks) else @@ -375,4 +377,4 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = -- from Postgres strictly and (2) even if we didn’t, hashing will have to force the -- whole thing anyway. respHash = mkRespHash (encJToBS result) - in (GQSuccess result, Just respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap + in (GQSuccess result, Just $! respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs index 67e96836..73d4717d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -21,6 +21,7 @@ import qualified StmContainers.Map as STMMap import Control.Concurrent.Extended (sleep, forkImmortal) import Control.Exception (mask_) import Data.String +import GHC.AssertNF import qualified Hasura.Logging as L import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap @@ -73,6 +74,8 @@ addLiveQuery logger lqState plan onResultAction = do responseId <- newCohortId sinkId <- newSinkId + $assertNFHere subscriber -- so we don't write thunks to mutable vars + -- a handler is returned only when it is newly created handlerM <- STM.atomically $ do handlerM <- STMMap.lookup handlerId lqMap @@ -84,7 +87,7 @@ addLiveQuery logger lqState plan onResultAction = do Nothing -> addToPoller sinkId responseId handler return Nothing Nothing -> do - poller <- newPoller + !poller <- newPoller addToPoller sinkId responseId poller STMMap.insert poller handlerId lqMap return $ Just poller @@ -96,7 +99,9 @@ addLiveQuery logger lqState plan onResultAction = do threadRef <- forkImmortal ("pollQuery."<>show sinkId) logger $ forever $ do pollQuery metrics batchSize pgExecCtx query handler sleep $ unRefetchInterval refetchInterval - STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics) + let !pState = PollerIOState threadRef metrics + $assertNFHere pState -- so we don't write thunks to mutable vars + STM.atomically $ STM.putTMVar (_pIOState handler) pState pure $ LiveQueryId handlerId cohortKey sinkId where @@ -106,11 +111,12 @@ addLiveQuery logger lqState plan onResultAction = do handlerId = PollerKey role query + !subscriber = Subscriber alias onResultAction addToCohort sinkId handlerC = - TMap.insert (Subscriber alias onResultAction) sinkId $ _cNewSubscribers handlerC + TMap.insert subscriber sinkId $ _cNewSubscribers handlerC addToPoller sinkId responseId handler = do - newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new + !newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new addToCohort sinkId newCohort TMap.insert newCohort cohortKey $ _pCohorts handler diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs index af2a93fb..942c134c 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs @@ -33,7 +33,7 @@ lookup :: (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v) lookup k = fmap (Map.lookup k) . readTVar . unTMap insert :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM () -insert v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v +insert !v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v delete :: (Eq k, Hashable k) => k -> TMap k v -> STM () delete k mapTv = modifyTVar' (unTMap mapTv) $ Map.delete k diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 6b7ef5f8..a8032286 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -35,6 +35,7 @@ import qualified StmContainers.Map as STMMap import Control.Concurrent.Extended (sleep) import Control.Exception.Lifted import Data.String +import GHC.AssertNF import qualified ListT import Hasura.EncJSON @@ -79,10 +80,10 @@ data ErrRespType data WSConnState -- headers from the client for websockets = CSNotInitialised !WsHeaders - | CSInitError Text + | CSInitError !Text -- headers from the client (in conn params) to forward to the remote schema -- and JWT expiry time if any - | CSInitialised UserInfo (Maybe TC.UTCTime) [H.Header] + | CSInitialised !UserInfo !(Maybe TC.UTCTime) ![H.Header] data WSConnData = WSConnData @@ -108,9 +109,9 @@ sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) = liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo where bs = encodeServerMsg msg - wsInfo = Just $ WS.WSEventInfo - { WS._wseiQueryExecutionTime = Just $ realToFrac execTime - , WS._wseiResponseSize = Just $ BL.length bs + wsInfo = Just $! WS.WSEventInfo + { WS._wseiQueryExecutionTime = Just $! realToFrac execTime + , WS._wseiResponseSize = Just $! BL.length bs } data OpDetail @@ -232,8 +233,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do <*> pure errType let acceptRequest = WS.defaultAcceptRequest { WS.acceptSubprotocol = Just "graphql-ws"} - return $ Right $ WS.AcceptWith connData acceptRequest - (Just keepAliveAction) (Just jwtExpiryHandler) + return $ Right $ WS.AcceptWith connData acceptRequest keepAliveAction jwtExpiryHandler reject qErr = do logger $ mkWsErrorLog Nothing (WsConnInfo wsId Nothing Nothing) (ERejected qErr) @@ -333,10 +333,13 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do L.unLogger logger $ QueryLog query Nothing reqId -- NOTE!: we mask async exceptions higher in the call stack, but it's -- crucial we don't lose lqId after addLiveQuery returns successfully. - lqId <- liftIO $ LQ.addLiveQuery logger lqMap lqOp liveQOnChange + !lqId <- liftIO $ LQ.addLiveQuery logger lqMap lqOp liveQOnChange + let !opName = _grOperationName q + liftIO $ $assertNFHere $! (lqId, opName) -- so we don't write thunks to mutable vars + liftIO $ STM.atomically $ -- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber: - STMMap.insert (lqId, _grOperationName q) opId opMap + STMMap.insert (lqId, opName) opId opMap logOpEv ODStarted (Just reqId) where @@ -534,14 +537,20 @@ onConnInit logger manager wsConn authMode connParamsM = do res <- resolveUserInfo logger manager headers authMode case res of Left e -> do - liftIO $ STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) $ - CSInitError $ qeError e + let !initErr = CSInitError $ qeError e + liftIO $ do + $assertNFHere initErr -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) initErr + let connErr = ConnErrMsg $ qeError e logWSEvent logger wsConn $ EConnErr connErr sendMsg wsConn $ SMConnErr connErr Right (userInfo, expTimeM) -> do - liftIO $ STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) $ - CSInitialised userInfo expTimeM paramHeaders + let !csInit = CSInitialised userInfo expTimeM paramHeaders + liftIO $ do + $assertNFHere csInit -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) csInit + sendMsg wsConn SMConnAck -- TODO: send it periodically? Why doesn't apollo's protocol use -- ping/pong frames of websocket spec? @@ -603,8 +612,8 @@ createWSServerApp -> WSServerEnv -> WS.PendingConnection -> m () -- ^ aka generalized 'WS.ServerApp' -createWSServerApp authMode serverEnv = - WS.createServerApp (_wseServer serverEnv) handlers +createWSServerApp authMode serverEnv = \ !pendingConn -> + WS.createServerApp (_wseServer serverEnv) handlers pendingConn where handlers = WS.WSHandlers diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs index 54224ca8..f8847492 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs @@ -41,6 +41,7 @@ import qualified Data.UUID.V4 as UUID import Data.Word (Word16) import GHC.Int (Int64) import Hasura.Prelude +import GHC.AssertNF import qualified ListT import qualified Network.WebSockets as WS import qualified StmContainers.Map as STMMap @@ -141,7 +142,9 @@ closeConnWithCode wsConn code bs = do -- writes to a queue instead of the raw connection -- so that sendMsg doesn't block sendMsg :: WSConn a -> WSQueueResponse -> IO () -sendMsg wsConn = STM.atomically . STM.writeTQueue (_wcSendQ wsConn) +sendMsg wsConn = \ !resp -> do + $assertNFHere resp -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTQueue (_wcSendQ wsConn) resp type ConnMap a = STMMap.Map WSId (WSConn a) @@ -193,8 +196,8 @@ data AcceptWith a = AcceptWith { _awData :: !a , _awReq :: !WS.AcceptRequest - , _awKeepAlive :: !(Maybe (WSConn a -> IO ())) - , _awOnJwtExpiry :: !(Maybe (WSConn a -> IO ())) + , _awKeepAlive :: !(WSConn a -> IO ()) + , _awOnJwtExpiry :: !(WSConn a -> IO ()) } type OnConnH m a = WSId -> WS.RequestHead -> m (Either WS.RejectRequest (AcceptWith a)) @@ -216,7 +219,8 @@ createServerApp -- aka WS.ServerApp -> WS.PendingConnection -> m () -createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pendingConn = do +{-# INLINE createServerApp #-} +createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !pendingConn = do wsId <- WSId <$> liftIO UUID.nextRandom writeLog $ WSLog wsId EConnectionRequest Nothing status <- liftIO $ STM.readTVarIO serverStatus @@ -247,11 +251,17 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pe liftIO $ WS.rejectRequestWith pendingConn rejectRequest writeLog $ WSLog wsId ERejected Nothing - onAccept wsId (AcceptWith a acceptWithParams keepAliveM onJwtExpiryM) = do + onAccept wsId (AcceptWith a acceptWithParams keepAlive onJwtExpiry) = do conn <- liftIO $ WS.acceptRequestWith pendingConn acceptWithParams writeLog $ WSLog wsId EAccepted Nothing sendQ <- liftIO STM.newTQueueIO - let wsConn = WSConn wsId logger conn sendQ a + let !wsConn = WSConn wsId logger conn sendQ a + -- TODO there are many thunks here. Difficult to trace how much is retained, and + -- how much of that would be shared anyway. + -- Requires a fork of 'wai-websockets' and 'websockets', it looks like. + -- Adding `package` stanzas with -Xstrict -XStrictData for those two packages + -- helped, cutting the number of thunks approximately in half. + liftIO $ $assertNFHere wsConn -- so we don't write thunks to mutable vars let whenAcceptingInsertConn = liftIO $ STM.atomically $ do status <- STM.readTVar serverStatus @@ -284,21 +294,15 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pe liftIO $ WS.sendTextData conn msg writeLog $ WSLog wsId (EMessageSent $ TBS.fromLBS msg) wsInfo - let withAsyncM mAction cont = case mAction of - Nothing -> cont Nothing - Just action -> LA.withAsync (liftIO $ action wsConn) $ - \actRef -> cont $ Just actRef - -- withAsync lets us be very sure that if e.g. an async exception is raised while we're -- forking that the threads we launched will be cleaned up. See also below. LA.withAsync rcv $ \rcvRef -> do LA.withAsync send $ \sendRef -> do - withAsyncM keepAliveM $ \keepAliveRefM -> do - withAsyncM onJwtExpiryM $ \onJwtExpiryRefM -> do + LA.withAsync (liftIO $ keepAlive wsConn) $ \keepAliveRef -> do + LA.withAsync (liftIO $ onJwtExpiry wsConn) $ \onJwtExpiryRef -> do -- terminates on WS.ConnectionException and JWT expiry - let waitOnRefs = catMaybes [keepAliveRefM, onJwtExpiryRefM] - <> [rcvRef, sendRef] + let waitOnRefs = [keepAliveRef, onJwtExpiryRef, rcvRef, sendRef] -- withAnyCancel re-raises exceptions from forkedThreads, and is guarenteed to cancel in -- case of async exceptions raised while blocking here: try (LA.waitAnyCancel waitOnRefs) >>= \case diff --git a/server/src-lib/Hasura/Server/Auth/JWT.hs b/server/src-lib/Hasura/Server/Auth/JWT.hs index feebc9dc..f035187b 100644 --- a/server/src-lib/Hasura/Server/Auth/JWT.hs +++ b/server/src-lib/Hasura/Server/Auth/JWT.hs @@ -20,6 +20,7 @@ import Data.Parser.CacheControl (parseMaxAge) import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Time.Format (defaultTimeLocale, parseTimeM) +import GHC.AssertNF import Network.URI (URI) import Hasura.HTTP @@ -148,8 +149,10 @@ updateJwkRef (Logger logger) manager url jwkRef = do logAndThrow err let parseErr e = JFEJwkParseError (T.pack e) $ "Error parsing JWK from url: " <> urlT - jwkset <- either (logAndThrow . parseErr) return $ J.eitherDecode respBody - liftIO $ writeIORef jwkRef jwkset + !jwkset <- either (logAndThrow . parseErr) return $ J.eitherDecode' respBody + liftIO $ do + $assertNFHere jwkset -- so we don't write thunks to mutable vars + writeIORef jwkRef jwkset -- first check for Cache-Control header to get max-age, if not found, look for Expires header let cacheHeader = resp ^? Wreq.responseHeader "Cache-Control" diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index af064df8..4d9f2959 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -17,6 +17,7 @@ import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH import Data.IORef +import GHC.AssertNF import qualified Control.Concurrent.Extended as C import qualified Control.Concurrent.STM as STM @@ -159,6 +160,7 @@ listener sqlGenCtx pool logger httpMgr updateEventRef Left e -> logError logger threadType $ TEJsonParse $ T.pack e Right payload -> do logInfo logger threadType $ object ["received_event" .= payload] + $assertNFHere payload -- so we don't write thunks to mutable vars -- Push a notify event to Queue STM.atomically $ STM.writeTVar updateEventRef $ Just payload