diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc68fb0..9173e762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,4 +83,5 @@ when there were previously no events to process. Potential space leak fixed. (#3839) - console: track runtime errors (#4083) - auto-include `__typename` field in custom types' objects (fix #4063) +- squash some potential space leaks (#3937) - docs: bump MarupSafe version (#4102) diff --git a/server/cabal.project.freeze b/server/cabal.project.freeze index 3f90eae4..1180cb26 100644 --- a/server/cabal.project.freeze +++ b/server/cabal.project.freeze @@ -132,6 +132,7 @@ constraints: any.Cabal ==2.4.0.1, any.generic-arbitrary ==0.1.0, any.ghc-boot-th ==8.6.5, any.ghc-prim ==0.5.3, + any.ghc-heap-view ==0.6.0, any.happy ==1.19.12, happy +small_base, any.hashable ==1.2.7.0, diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 825018ed..3bd17823 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -192,6 +192,9 @@ library -- testing , QuickCheck , generic-arbitrary + -- 0.6.1 is supposedly not okay for ghc 8.6: + -- https://github.com/nomeata/ghc-heap-view/issues/27 + , ghc-heap-view == 0.6.0 , directory diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index c210d94c..2b863047 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -9,6 +9,7 @@ import Control.Monad.STM (atomically) import Control.Monad.Trans.Control (MonadBaseControl (..)) import Data.Aeson ((.=)) import Data.Time.Clock (UTCTime, getCurrentTime) +import GHC.AssertNF import Options.Applicative import System.Environment (getEnvironment, lookupEnv) import System.Exit (exitFailure) @@ -203,6 +204,13 @@ runHGEServer -- ^ start time -> m () runHGEServer ServeOptions{..} InitCtx{..} initTime = do + -- Comment this to enable expensive assertions from "GHC.AssertNF". These will log lines to + -- STDOUT containing "not in normal form". In the future we could try to integrate this into + -- our tests. For now this is a development tool. + -- + -- NOTE: be sure to compile WITHOUT code coverage, for this to work properly. + liftIO disableAssertNF + let sqlGenCtx = SQLGenCtx soStringifyNum Loggers loggerCtx logger _ = _icLoggers diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index d87f4546..67ae0398 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -215,7 +215,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads: capacity <- readTVar _eeCtxEventThreadsCapacity check $ capacity > 0 - writeTVar _eeCtxEventThreadsCapacity (capacity - 1) + writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1) -- since there is some capacity in our worker threads, we can launch another: let restoreCapacity = liftIO $ atomically $ modifyTVar' _eeCtxEventThreadsCapacity (+ 1) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index 0ebcba0b..1a179cd4 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -50,6 +50,7 @@ import qualified StmContainers.Map as STMMap import qualified System.Metrics.Distribution as Metrics import Data.List.Split (chunksOf) +import GHC.AssertNF import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap @@ -186,12 +187,13 @@ pushResultToCohort -> LiveQueryMetadata -> CohortSnapshot -> IO () -pushResultToCohort result respHashM (LiveQueryMetadata dTime) cohortSnapshot = do +pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do prevRespHashM <- STM.readTVarIO respRef -- write to the current websockets if needed sinks <- if isExecError result || respHashM /= prevRespHashM then do + $assertNFHere respHashM -- so we don't write thunks to mutable vars STM.atomically $ STM.writeTVar respRef respHashM return (newSinks <> curSinks) else @@ -375,4 +377,4 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = -- from Postgres strictly and (2) even if we didn’t, hashing will have to force the -- whole thing anyway. respHash = mkRespHash (encJToBS result) - in (GQSuccess result, Just respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap + in (GQSuccess result, Just $! respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs index 67e96836..73d4717d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -21,6 +21,7 @@ import qualified StmContainers.Map as STMMap import Control.Concurrent.Extended (sleep, forkImmortal) import Control.Exception (mask_) import Data.String +import GHC.AssertNF import qualified Hasura.Logging as L import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap @@ -73,6 +74,8 @@ addLiveQuery logger lqState plan onResultAction = do responseId <- newCohortId sinkId <- newSinkId + $assertNFHere subscriber -- so we don't write thunks to mutable vars + -- a handler is returned only when it is newly created handlerM <- STM.atomically $ do handlerM <- STMMap.lookup handlerId lqMap @@ -84,7 +87,7 @@ addLiveQuery logger lqState plan onResultAction = do Nothing -> addToPoller sinkId responseId handler return Nothing Nothing -> do - poller <- newPoller + !poller <- newPoller addToPoller sinkId responseId poller STMMap.insert poller handlerId lqMap return $ Just poller @@ -96,7 +99,9 @@ addLiveQuery logger lqState plan onResultAction = do threadRef <- forkImmortal ("pollQuery."<>show sinkId) logger $ forever $ do pollQuery metrics batchSize pgExecCtx query handler sleep $ unRefetchInterval refetchInterval - STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics) + let !pState = PollerIOState threadRef metrics + $assertNFHere pState -- so we don't write thunks to mutable vars + STM.atomically $ STM.putTMVar (_pIOState handler) pState pure $ LiveQueryId handlerId cohortKey sinkId where @@ -106,11 +111,12 @@ addLiveQuery logger lqState plan onResultAction = do handlerId = PollerKey role query + !subscriber = Subscriber alias onResultAction addToCohort sinkId handlerC = - TMap.insert (Subscriber alias onResultAction) sinkId $ _cNewSubscribers handlerC + TMap.insert subscriber sinkId $ _cNewSubscribers handlerC addToPoller sinkId responseId handler = do - newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new + !newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new addToCohort sinkId newCohort TMap.insert newCohort cohortKey $ _pCohorts handler diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs index af2a93fb..942c134c 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs @@ -33,7 +33,7 @@ lookup :: (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v) lookup k = fmap (Map.lookup k) . readTVar . unTMap insert :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM () -insert v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v +insert !v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v delete :: (Eq k, Hashable k) => k -> TMap k v -> STM () delete k mapTv = modifyTVar' (unTMap mapTv) $ Map.delete k diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 6b7ef5f8..a8032286 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -35,6 +35,7 @@ import qualified StmContainers.Map as STMMap import Control.Concurrent.Extended (sleep) import Control.Exception.Lifted import Data.String +import GHC.AssertNF import qualified ListT import Hasura.EncJSON @@ -79,10 +80,10 @@ data ErrRespType data WSConnState -- headers from the client for websockets = CSNotInitialised !WsHeaders - | CSInitError Text + | CSInitError !Text -- headers from the client (in conn params) to forward to the remote schema -- and JWT expiry time if any - | CSInitialised UserInfo (Maybe TC.UTCTime) [H.Header] + | CSInitialised !UserInfo !(Maybe TC.UTCTime) ![H.Header] data WSConnData = WSConnData @@ -108,9 +109,9 @@ sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) = liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo where bs = encodeServerMsg msg - wsInfo = Just $ WS.WSEventInfo - { WS._wseiQueryExecutionTime = Just $ realToFrac execTime - , WS._wseiResponseSize = Just $ BL.length bs + wsInfo = Just $! WS.WSEventInfo + { WS._wseiQueryExecutionTime = Just $! realToFrac execTime + , WS._wseiResponseSize = Just $! BL.length bs } data OpDetail @@ -232,8 +233,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do <*> pure errType let acceptRequest = WS.defaultAcceptRequest { WS.acceptSubprotocol = Just "graphql-ws"} - return $ Right $ WS.AcceptWith connData acceptRequest - (Just keepAliveAction) (Just jwtExpiryHandler) + return $ Right $ WS.AcceptWith connData acceptRequest keepAliveAction jwtExpiryHandler reject qErr = do logger $ mkWsErrorLog Nothing (WsConnInfo wsId Nothing Nothing) (ERejected qErr) @@ -333,10 +333,13 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do L.unLogger logger $ QueryLog query Nothing reqId -- NOTE!: we mask async exceptions higher in the call stack, but it's -- crucial we don't lose lqId after addLiveQuery returns successfully. - lqId <- liftIO $ LQ.addLiveQuery logger lqMap lqOp liveQOnChange + !lqId <- liftIO $ LQ.addLiveQuery logger lqMap lqOp liveQOnChange + let !opName = _grOperationName q + liftIO $ $assertNFHere $! (lqId, opName) -- so we don't write thunks to mutable vars + liftIO $ STM.atomically $ -- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber: - STMMap.insert (lqId, _grOperationName q) opId opMap + STMMap.insert (lqId, opName) opId opMap logOpEv ODStarted (Just reqId) where @@ -534,14 +537,20 @@ onConnInit logger manager wsConn authMode connParamsM = do res <- resolveUserInfo logger manager headers authMode case res of Left e -> do - liftIO $ STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) $ - CSInitError $ qeError e + let !initErr = CSInitError $ qeError e + liftIO $ do + $assertNFHere initErr -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) initErr + let connErr = ConnErrMsg $ qeError e logWSEvent logger wsConn $ EConnErr connErr sendMsg wsConn $ SMConnErr connErr Right (userInfo, expTimeM) -> do - liftIO $ STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) $ - CSInitialised userInfo expTimeM paramHeaders + let !csInit = CSInitialised userInfo expTimeM paramHeaders + liftIO $ do + $assertNFHere csInit -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTVar (_wscUser $ WS.getData wsConn) csInit + sendMsg wsConn SMConnAck -- TODO: send it periodically? Why doesn't apollo's protocol use -- ping/pong frames of websocket spec? @@ -603,8 +612,8 @@ createWSServerApp -> WSServerEnv -> WS.PendingConnection -> m () -- ^ aka generalized 'WS.ServerApp' -createWSServerApp authMode serverEnv = - WS.createServerApp (_wseServer serverEnv) handlers +createWSServerApp authMode serverEnv = \ !pendingConn -> + WS.createServerApp (_wseServer serverEnv) handlers pendingConn where handlers = WS.WSHandlers diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs index 54224ca8..f8847492 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs @@ -41,6 +41,7 @@ import qualified Data.UUID.V4 as UUID import Data.Word (Word16) import GHC.Int (Int64) import Hasura.Prelude +import GHC.AssertNF import qualified ListT import qualified Network.WebSockets as WS import qualified StmContainers.Map as STMMap @@ -141,7 +142,9 @@ closeConnWithCode wsConn code bs = do -- writes to a queue instead of the raw connection -- so that sendMsg doesn't block sendMsg :: WSConn a -> WSQueueResponse -> IO () -sendMsg wsConn = STM.atomically . STM.writeTQueue (_wcSendQ wsConn) +sendMsg wsConn = \ !resp -> do + $assertNFHere resp -- so we don't write thunks to mutable vars + STM.atomically $ STM.writeTQueue (_wcSendQ wsConn) resp type ConnMap a = STMMap.Map WSId (WSConn a) @@ -193,8 +196,8 @@ data AcceptWith a = AcceptWith { _awData :: !a , _awReq :: !WS.AcceptRequest - , _awKeepAlive :: !(Maybe (WSConn a -> IO ())) - , _awOnJwtExpiry :: !(Maybe (WSConn a -> IO ())) + , _awKeepAlive :: !(WSConn a -> IO ()) + , _awOnJwtExpiry :: !(WSConn a -> IO ()) } type OnConnH m a = WSId -> WS.RequestHead -> m (Either WS.RejectRequest (AcceptWith a)) @@ -216,7 +219,8 @@ createServerApp -- aka WS.ServerApp -> WS.PendingConnection -> m () -createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pendingConn = do +{-# INLINE createServerApp #-} +createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !pendingConn = do wsId <- WSId <$> liftIO UUID.nextRandom writeLog $ WSLog wsId EConnectionRequest Nothing status <- liftIO $ STM.readTVarIO serverStatus @@ -247,11 +251,17 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pe liftIO $ WS.rejectRequestWith pendingConn rejectRequest writeLog $ WSLog wsId ERejected Nothing - onAccept wsId (AcceptWith a acceptWithParams keepAliveM onJwtExpiryM) = do + onAccept wsId (AcceptWith a acceptWithParams keepAlive onJwtExpiry) = do conn <- liftIO $ WS.acceptRequestWith pendingConn acceptWithParams writeLog $ WSLog wsId EAccepted Nothing sendQ <- liftIO STM.newTQueueIO - let wsConn = WSConn wsId logger conn sendQ a + let !wsConn = WSConn wsId logger conn sendQ a + -- TODO there are many thunks here. Difficult to trace how much is retained, and + -- how much of that would be shared anyway. + -- Requires a fork of 'wai-websockets' and 'websockets', it looks like. + -- Adding `package` stanzas with -Xstrict -XStrictData for those two packages + -- helped, cutting the number of thunks approximately in half. + liftIO $ $assertNFHere wsConn -- so we don't write thunks to mutable vars let whenAcceptingInsertConn = liftIO $ STM.atomically $ do status <- STM.readTVar serverStatus @@ -284,21 +294,15 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers pe liftIO $ WS.sendTextData conn msg writeLog $ WSLog wsId (EMessageSent $ TBS.fromLBS msg) wsInfo - let withAsyncM mAction cont = case mAction of - Nothing -> cont Nothing - Just action -> LA.withAsync (liftIO $ action wsConn) $ - \actRef -> cont $ Just actRef - -- withAsync lets us be very sure that if e.g. an async exception is raised while we're -- forking that the threads we launched will be cleaned up. See also below. LA.withAsync rcv $ \rcvRef -> do LA.withAsync send $ \sendRef -> do - withAsyncM keepAliveM $ \keepAliveRefM -> do - withAsyncM onJwtExpiryM $ \onJwtExpiryRefM -> do + LA.withAsync (liftIO $ keepAlive wsConn) $ \keepAliveRef -> do + LA.withAsync (liftIO $ onJwtExpiry wsConn) $ \onJwtExpiryRef -> do -- terminates on WS.ConnectionException and JWT expiry - let waitOnRefs = catMaybes [keepAliveRefM, onJwtExpiryRefM] - <> [rcvRef, sendRef] + let waitOnRefs = [keepAliveRef, onJwtExpiryRef, rcvRef, sendRef] -- withAnyCancel re-raises exceptions from forkedThreads, and is guarenteed to cancel in -- case of async exceptions raised while blocking here: try (LA.waitAnyCancel waitOnRefs) >>= \case diff --git a/server/src-lib/Hasura/Server/Auth/JWT.hs b/server/src-lib/Hasura/Server/Auth/JWT.hs index feebc9dc..f035187b 100644 --- a/server/src-lib/Hasura/Server/Auth/JWT.hs +++ b/server/src-lib/Hasura/Server/Auth/JWT.hs @@ -20,6 +20,7 @@ import Data.Parser.CacheControl (parseMaxAge) import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Time.Format (defaultTimeLocale, parseTimeM) +import GHC.AssertNF import Network.URI (URI) import Hasura.HTTP @@ -148,8 +149,10 @@ updateJwkRef (Logger logger) manager url jwkRef = do logAndThrow err let parseErr e = JFEJwkParseError (T.pack e) $ "Error parsing JWK from url: " <> urlT - jwkset <- either (logAndThrow . parseErr) return $ J.eitherDecode respBody - liftIO $ writeIORef jwkRef jwkset + !jwkset <- either (logAndThrow . parseErr) return $ J.eitherDecode' respBody + liftIO $ do + $assertNFHere jwkset -- so we don't write thunks to mutable vars + writeIORef jwkRef jwkset -- first check for Cache-Control header to get max-age, if not found, look for Expires header let cacheHeader = resp ^? Wreq.responseHeader "Cache-Control" diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index af064df8..4d9f2959 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -17,6 +17,7 @@ import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH import Data.IORef +import GHC.AssertNF import qualified Control.Concurrent.Extended as C import qualified Control.Concurrent.STM as STM @@ -159,6 +160,7 @@ listener sqlGenCtx pool logger httpMgr updateEventRef Left e -> logError logger threadType $ TEJsonParse $ T.pack e Right payload -> do logInfo logger threadType $ object ["received_event" .= payload] + $assertNFHere payload -- so we don't write thunks to mutable vars -- Push a notify event to Queue STM.atomically $ STM.writeTVar updateEventRef $ Just payload