mirror of
https://github.com/zhigang1992/graphql-engine.git
synced 2026-06-15 18:17:53 +08:00
Writing to a mutable var is a particularly potent source of leaks since it mostly defeats GHC's analysis. Here we add assertions to all mutable writes, and fix a couple spots where we wrote some thunks to a mutable var (compiled with -O2). Some of these thunks were probably benign, but others looked liked they might be retaining big args. Didn't do much analysis, just fixed. Actually pretty happy with how easy this was to use and as a diagnostic, once I sorted out some issues. We should consider using it elsewhere, and maybe extending so that we can use it with tests, enable when `-fenable-assertsions` etc. Relates #3388 Also simplified codepaths that use `AcceptWith`, which has unnecessary `Maybe` fields.
381 lines
14 KiB
Haskell
381 lines
14 KiB
Haskell
-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details.
|
||
module Hasura.GraphQL.Execute.LiveQuery.Poll (
|
||
-- * Pollers
|
||
Poller(..)
|
||
, PollerIOState(..)
|
||
, pollQuery
|
||
|
||
, PollerKey(..)
|
||
, PollerMap
|
||
, dumpPollerMap
|
||
|
||
, RefetchMetrics
|
||
, initRefetchMetrics
|
||
|
||
-- * Cohorts
|
||
, Cohort(..)
|
||
, CohortId
|
||
, newCohortId
|
||
, CohortVariables
|
||
, CohortKey
|
||
, CohortMap
|
||
|
||
-- * Subscribers
|
||
, Subscriber(..)
|
||
, SubscriberId
|
||
, newSinkId
|
||
, SubscriberMap
|
||
, OnChange
|
||
, LGQResponse
|
||
, LiveQueryResponse(..)
|
||
, LiveQueryMetadata(..)
|
||
) where
|
||
|
||
import Hasura.Prelude
|
||
|
||
import qualified Control.Concurrent.Async as A
|
||
import qualified Control.Concurrent.STM as STM
|
||
import qualified Control.Immortal as Immortal
|
||
import qualified Crypto.Hash as CH
|
||
import qualified Data.Aeson.Extended as J
|
||
import qualified Data.ByteString as BS
|
||
import qualified Data.ByteString.Lazy as BL
|
||
import qualified Data.HashMap.Strict as Map
|
||
import qualified Data.Time.Clock as Clock
|
||
import qualified Data.UUID as UUID
|
||
import qualified Data.UUID.V4 as UUID
|
||
import qualified Language.GraphQL.Draft.Syntax as G
|
||
import qualified ListT
|
||
import qualified StmContainers.Map as STMMap
|
||
import qualified System.Metrics.Distribution as Metrics
|
||
|
||
import Data.List.Split (chunksOf)
|
||
import GHC.AssertNF
|
||
|
||
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
|
||
|
||
import Hasura.Db
|
||
import Hasura.EncJSON
|
||
import Hasura.GraphQL.Execute.LiveQuery.Options
|
||
import Hasura.GraphQL.Execute.LiveQuery.Plan
|
||
import Hasura.GraphQL.Transport.HTTP.Protocol
|
||
import Hasura.RQL.Types
|
||
|
||
-- -------------------------------------------------------------------------------------------------
|
||
-- Subscribers
|
||
|
||
data Subscriber
|
||
= Subscriber
|
||
{ _sRootAlias :: !G.Alias
|
||
, _sOnChangeCallback :: !OnChange
|
||
}
|
||
|
||
-- | live query onChange metadata, used for adding more extra analytics data
|
||
data LiveQueryMetadata
|
||
= LiveQueryMetadata
|
||
{ _lqmExecutionTime :: !Clock.DiffTime
|
||
-- ^ Time spent waiting on the generated query to execute on postgres or the remote.
|
||
}
|
||
|
||
data LiveQueryResponse
|
||
= LiveQueryResponse
|
||
{ _lqrPayload :: !BL.ByteString
|
||
, _lqrExecutionTime :: !Clock.DiffTime
|
||
}
|
||
|
||
type LGQResponse = GQResult LiveQueryResponse
|
||
|
||
type OnChange = LGQResponse -> IO ()
|
||
|
||
newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
|
||
deriving (Show, Eq, Hashable, J.ToJSON)
|
||
|
||
newSinkId :: IO SubscriberId
|
||
newSinkId = SubscriberId <$> UUID.nextRandom
|
||
|
||
type SubscriberMap = TMap.TMap SubscriberId Subscriber
|
||
|
||
-- -------------------------------------------------------------------------------------------------
|
||
-- Cohorts
|
||
|
||
-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
|
||
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
|
||
-- each of its 'Subscriber's.
|
||
--
|
||
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
|
||
-- therefore a single row in the query result).
|
||
--
|
||
-- See also 'CohortMap'.
|
||
data Cohort
|
||
= Cohort
|
||
{ _cCohortId :: !CohortId
|
||
-- ^ a unique identifier used to identify the cohort in the generated query
|
||
, _cPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
|
||
-- ^ a hash of the previous query result, if any, used to determine if we need to push an updated
|
||
-- result to the subscribers or not
|
||
, _cExistingSubscribers :: !SubscriberMap
|
||
-- ^ the subscribers we’ve already pushed a result to; we push new results to them iff the
|
||
-- response changes
|
||
, _cNewSubscribers :: !SubscriberMap
|
||
-- ^ subscribers we haven’t yet pushed any results to; we push results to them regardless if the
|
||
-- result changed, then merge them in the map of existing subscribers
|
||
}
|
||
|
||
{- Note [Blake2b faster than SHA-256]
|
||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||
At the time of writing, from https://blake2.net, it is stated,
|
||
"BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
|
||
yet is at least as secure as the latest standard SHA-3".
|
||
-}
|
||
|
||
-- | A hash used to determine if the result changed without having to keep the entire result in
|
||
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
|
||
-- bits, even if a subscription changes once per second for an entire year, the probability of a
|
||
-- hash collision is ~4.294417×10-63. See Note [Blake2b faster than SHA-256].
|
||
newtype ResponseHash = ResponseHash { unResponseHash :: CH.Digest CH.Blake2b_256 }
|
||
deriving (Show, Eq)
|
||
|
||
instance J.ToJSON ResponseHash where
|
||
toJSON = J.toJSON . show . unResponseHash
|
||
|
||
mkRespHash :: BS.ByteString -> ResponseHash
|
||
mkRespHash = ResponseHash . CH.hash
|
||
|
||
-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort' (assuming they
|
||
-- already meet the criteria to be in the same 'Poller'). Note the distinction between this and
|
||
-- 'CohortId'; the latter is a completely synthetic key used only to identify the cohort in the
|
||
-- generated SQL query.
|
||
type CohortKey = CohortVariables
|
||
-- | This has the invariant, maintained in 'removeLiveQuery', that it contains no 'Cohort' with
|
||
-- zero total (existing + new) subscribers.
|
||
type CohortMap = TMap.TMap CohortKey Cohort
|
||
|
||
dumpCohortMap :: CohortMap -> IO J.Value
|
||
dumpCohortMap cohortMap = do
|
||
cohorts <- STM.atomically $ TMap.toList cohortMap
|
||
fmap J.toJSON . forM cohorts $ \(variableValues, cohort) -> do
|
||
cohortJ <- dumpCohort cohort
|
||
return $ J.object
|
||
[ "variables" J..= variableValues
|
||
, "cohort" J..= cohortJ
|
||
]
|
||
where
|
||
dumpCohort (Cohort respId respTV curOps newOps) =
|
||
STM.atomically $ do
|
||
prevResHash <- STM.readTVar respTV
|
||
curOpIds <- TMap.toList curOps
|
||
newOpIds <- TMap.toList newOps
|
||
return $ J.object
|
||
[ "resp_id" J..= respId
|
||
, "current_ops" J..= map fst curOpIds
|
||
, "new_ops" J..= map fst newOpIds
|
||
, "previous_result_hash" J..= prevResHash
|
||
]
|
||
|
||
data CohortSnapshot
|
||
= CohortSnapshot
|
||
{ _csVariables :: !CohortVariables
|
||
, _csPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
|
||
, _csExistingSubscribers :: ![Subscriber]
|
||
, _csNewSubscribers :: ![Subscriber]
|
||
}
|
||
|
||
pushResultToCohort
|
||
:: GQResult EncJSON
|
||
-- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias'
|
||
-> Maybe ResponseHash
|
||
-> LiveQueryMetadata
|
||
-> CohortSnapshot
|
||
-> IO ()
|
||
pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
|
||
prevRespHashM <- STM.readTVarIO respRef
|
||
-- write to the current websockets if needed
|
||
sinks <-
|
||
if isExecError result || respHashM /= prevRespHashM
|
||
then do
|
||
$assertNFHere respHashM -- so we don't write thunks to mutable vars
|
||
STM.atomically $ STM.writeTVar respRef respHashM
|
||
return (newSinks <> curSinks)
|
||
else
|
||
return newSinks
|
||
pushResultToSubscribers sinks
|
||
where
|
||
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
|
||
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
|
||
let aliasText = G.unName $ G.unAlias alias
|
||
wrapWithAlias response = LiveQueryResponse
|
||
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
|
||
, _lqrExecutionTime = dTime
|
||
}
|
||
in action (wrapWithAlias <$> result)
|
||
|
||
-- -------------------------------------------------------------------------------------------------
|
||
-- Pollers
|
||
|
||
-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that periodically polls
|
||
-- Postgres and pushes results to each of its listening 'Cohort's.
|
||
--
|
||
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in practice, 'Poller's
|
||
-- with large numbers of 'Cohort's are batched into multiple concurrent queries for performance
|
||
-- reasons.
|
||
data Poller
|
||
= Poller
|
||
{ _pCohorts :: !CohortMap
|
||
, _pIOState :: !(STM.TMVar PollerIOState)
|
||
-- ^ This is in a separate 'STM.TMVar' because it’s important that we are able to construct
|
||
-- 'Poller' values in 'STM.STM' --- we need the insertion into the 'PollerMap' to be atomic to
|
||
-- ensure that we don’t accidentally create two for the same query due to a race. However, we
|
||
-- can’t spawn the worker thread or create the metrics store in 'STM.STM', so we insert it into
|
||
-- the 'Poller' only after we’re certain we won’t create any duplicates.
|
||
--
|
||
-- This var is "write once", moving monotonically from empty to full.
|
||
-- TODO this could probably be tightened up to something like :: STM PollerIOState
|
||
}
|
||
data PollerIOState
|
||
= PollerIOState
|
||
{ _pThread :: !(Immortal.Thread)
|
||
-- ^ a handle on the poller’s worker thread that can be used to 'Immortal.stop' it if all its
|
||
-- cohorts stop listening
|
||
, _pMetrics :: !RefetchMetrics
|
||
}
|
||
|
||
data RefetchMetrics
|
||
= RefetchMetrics
|
||
{ _rmSnapshot :: !Metrics.Distribution
|
||
, _rmPush :: !Metrics.Distribution
|
||
, _rmQuery :: !Metrics.Distribution
|
||
, _rmTotal :: !Metrics.Distribution
|
||
}
|
||
|
||
initRefetchMetrics :: IO RefetchMetrics
|
||
initRefetchMetrics = RefetchMetrics <$> Metrics.new <*> Metrics.new <*> Metrics.new <*> Metrics.new
|
||
|
||
data PollerKey
|
||
-- we don't need operation name here as a subscription will
|
||
-- only have a single top level field
|
||
= PollerKey
|
||
{ _lgRole :: !RoleName
|
||
, _lgQuery :: !MultiplexedQuery
|
||
} deriving (Show, Eq, Generic)
|
||
|
||
instance Hashable PollerKey
|
||
|
||
instance J.ToJSON PollerKey where
|
||
toJSON (PollerKey role query) =
|
||
J.object [ "role" J..= role
|
||
, "query" J..= query
|
||
]
|
||
|
||
type PollerMap = STMMap.Map PollerKey Poller
|
||
|
||
dumpPollerMap :: Bool -> PollerMap -> IO J.Value
|
||
dumpPollerMap extended lqMap =
|
||
fmap J.toJSON $ do
|
||
entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap
|
||
forM entries $ \(PollerKey role query, Poller cohortsMap ioState) -> do
|
||
PollerIOState threadId metrics <- STM.atomically $ STM.readTMVar ioState
|
||
metricsJ <- dumpRefetchMetrics metrics
|
||
cohortsJ <-
|
||
if extended
|
||
then Just <$> dumpCohortMap cohortsMap
|
||
else return Nothing
|
||
return $ J.object
|
||
[ "role" J..= role
|
||
, "thread_id" J..= show (Immortal.threadId threadId)
|
||
, "multiplexed_query" J..= query
|
||
, "cohorts" J..= cohortsJ
|
||
, "metrics" J..= metricsJ
|
||
]
|
||
where
|
||
dumpRefetchMetrics metrics = do
|
||
snapshotS <- Metrics.read $ _rmSnapshot metrics
|
||
queryS <- Metrics.read $ _rmQuery metrics
|
||
pushS <- Metrics.read $ _rmPush metrics
|
||
totalS <- Metrics.read $ _rmTotal metrics
|
||
return $ J.object
|
||
[ "snapshot" J..= dumpStats snapshotS
|
||
, "query" J..= dumpStats queryS
|
||
, "push" J..= dumpStats pushS
|
||
, "total" J..= dumpStats totalS
|
||
]
|
||
|
||
dumpStats stats =
|
||
J.object
|
||
[ "mean" J..= Metrics.mean stats
|
||
, "variance" J..= Metrics.variance stats
|
||
, "count" J..= Metrics.count stats
|
||
, "min" J..= Metrics.min stats
|
||
, "max" J..= Metrics.max stats
|
||
]
|
||
|
||
-- | Where the magic happens: the top-level action run periodically by each active 'Poller'.
|
||
--
|
||
-- This needs to be async exception safe.
|
||
pollQuery
|
||
:: RefetchMetrics
|
||
-> BatchSize
|
||
-> PGExecCtx
|
||
-> MultiplexedQuery
|
||
-> Poller
|
||
-> IO ()
|
||
pollQuery metrics batchSize pgExecCtx pgQuery handler =
|
||
void $ timing _rmTotal $ do
|
||
(_, (cohortSnapshotMap, queryVarsBatches)) <- timing _rmSnapshot $ do
|
||
-- get a snapshot of all the cohorts
|
||
-- this need not be done in a transaction
|
||
cohorts <- STM.atomically $ TMap.toList cohortMap
|
||
cohortSnapshotMap <- Map.fromList <$> mapM (STM.atomically . getCohortSnapshot) cohorts
|
||
|
||
let queryVarsBatches = chunksOf (unBatchSize batchSize) $ getQueryVars cohortSnapshotMap
|
||
return (cohortSnapshotMap, queryVarsBatches)
|
||
|
||
flip A.mapConcurrently_ queryVarsBatches $ \queryVars -> do
|
||
(dt, mxRes) <- timing _rmQuery $
|
||
runExceptT $ runLazyTx' pgExecCtx $ executeMultiplexedQuery pgQuery queryVars
|
||
let lqMeta = LiveQueryMetadata $ fromUnits dt
|
||
operations = getCohortOperations cohortSnapshotMap lqMeta mxRes
|
||
|
||
void $ timing _rmPush $ do
|
||
-- concurrently push each unique result
|
||
A.mapConcurrently_ (uncurry4 pushResultToCohort) operations
|
||
|
||
where
|
||
timing :: (RefetchMetrics -> Metrics.Distribution) -> IO b -> IO (DiffTime, b)
|
||
timing f m = do
|
||
(dt, a) <- withElapsedTime m
|
||
Metrics.add (f metrics) $ realToFrac dt
|
||
return (dt, a)
|
||
|
||
Poller cohortMap _ = handler
|
||
|
||
uncurry4 :: (a -> b -> c -> d -> e) -> (a, b, c, d) -> e
|
||
uncurry4 f (a, b, c, d) = f a b c d
|
||
|
||
getCohortSnapshot (cohortVars, handlerC) = do
|
||
let Cohort resId respRef curOpsTV newOpsTV = handlerC
|
||
curOpsL <- TMap.toList curOpsTV
|
||
newOpsL <- TMap.toList newOpsTV
|
||
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
|
||
TMap.reset newOpsTV
|
||
let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
|
||
return (resId, cohortSnapshot)
|
||
|
||
getQueryVars cohortSnapshotMap =
|
||
Map.toList $ fmap _csVariables cohortSnapshotMap
|
||
|
||
getCohortOperations cohortSnapshotMap actionMeta = \case
|
||
Left e ->
|
||
-- TODO: this is internal error
|
||
let resp = GQExecError [encodeGQErr False e]
|
||
in [ (resp, Nothing, actionMeta, snapshot)
|
||
| (_, snapshot) <- Map.toList cohortSnapshotMap
|
||
]
|
||
Right responses ->
|
||
flip mapMaybe responses $ \(respId, result) ->
|
||
-- TODO: change it to use bytestrings directly
|
||
let -- No reason to use lazy bytestrings here, since (1) we fetch the entire result set
|
||
-- from Postgres strictly and (2) even if we didn’t, hashing will have to force the
|
||
-- whole thing anyway.
|
||
respHash = mkRespHash (encJToBS result)
|
||
in (GQSuccess result, Just $! respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap
|