fix(core): memory leak in PeersService

This commit is contained in:
alina sireneva
2025-03-28 03:10:01 +03:00
parent 1c3cd2c6be
commit 812a191110
4 changed files with 64 additions and 23 deletions

View File

@@ -157,6 +157,7 @@ export class BaseTelegramClient implements ITelegramClient {
this._connected = false
await this.mt.close()
this.updates?.stopLoop()
await this.storage.close()
this._prepare.reset()
this._connect.reset()
}

View File

@@ -3,7 +3,7 @@ import type { ServiceOptions } from '../../../storage/service/base.js'
import type { IPeersRepository } from '../repository/peers.js'
import type { RefMessagesService } from './ref-messages.js'
import { LruMap } from '@fuman/utils'
import { asyncPool, LruMap, timers } from '@fuman/utils'
import Long from 'long'
import { BaseService } from '../../../storage/service/base.js'
import { longFromFastString, longToFastString } from '../../../utils/long-utils.js'
@@ -50,7 +50,7 @@ function getInputPeer(dto: IPeersRepository.PeerInfo): tl.TypeInputPeer {
export class PeersService extends BaseService {
private _cache: LruMap<number, CacheItem>
private _pendingWrites = new Map<number, IPeersRepository.PeerInfo>()
private _pendingWritesTimer: timers.Timer | null = null
constructor(
private options: PeersServiceOptions,
private _peers: IPeersRepository,
@@ -60,6 +60,24 @@ export class PeersService extends BaseService {
super(common)
this._cache = new LruMap(options.cacheSize ?? 100)
this._writePending = this._writePending.bind(this)
}
async close(): Promise<void> {
if (this._pendingWritesTimer) {
timers.clearTimeout(this._pendingWritesTimer)
this._pendingWritesTimer = null
}
await this._writePending()
}
private _writePendingLater() {
if (this._pendingWritesTimer) return
this._pendingWritesTimer = timers.setTimeout(
this._writePending,
this.options.updatesWriteInterval ?? 30_000,
)
}
async updatePeersFrom(obj: tl.TlObject | tl.TlObject[]): Promise<boolean> {
@@ -148,33 +166,13 @@ export class PeersService extends BaseService {
const cached = this._cache.get(peer.id)
if (cached && this.options.updatesWriteInterval !== 0) {
const oldAccessHash = (cached.peer as Extract<tl.TypeInputPeer, { accessHash?: unknown }>).accessHash
if (oldAccessHash?.eq(accessHash)) {
// when entity is cached and hash is the same, an update query is needed,
// since some field in the full entity might have changed, or the username/phone
//
// to avoid too many DB calls, and since these updates are pretty common,
// they are grouped and applied in batches no more than once every 30sec (or user-defined).
//
// until then, they are either served from in-memory cache,
// or an older version is fetched from DB
this._pendingWrites.set(peer.id, dto)
cached.complete = peer
return
}
}
let newComplete = peer
if ((peer as Extract<typeof peer, { min?: unknown }>).min) {
// we need to be careful with saving min peers,
// as we only need to update *some* fields of the `complete` object.
const existing = this._cache.get(peer.id)?.complete ?? await this.getCompleteById(peer.id)
const existing = cached?.complete ?? await this.getCompleteById(peer.id)
if (existing && !(existing as Extract<typeof existing, { min?: unknown }>).min) {
if (existing._ === 'channel' && peer._ === 'channel') {
// ref: https://corefork.telegram.org/constructor/channel
@@ -240,6 +238,27 @@ export class PeersService extends BaseService {
}
}
if (cached && this.options.updatesWriteInterval !== 0) {
const oldAccessHash = (cached.peer as Extract<tl.TypeInputPeer, { accessHash?: unknown }>).accessHash
if (oldAccessHash?.eq(accessHash)) {
// when entity is cached and hash is the same, an update query is needed,
// since some field in the full entity might have changed, or the username/phone
//
// to avoid too many DB calls, and since these updates are pretty common,
// they are grouped and applied in batches no more than once every 30sec (or user-defined).
//
// until then, they are either served from in-memory cache,
// or an older version is fetched from DB
this._pendingWrites.set(peer.id, dto)
this._writePendingLater()
cached.complete = newComplete
return
}
}
// entity is not cached in memory, or the access hash has changed
// we need to update it in the DB asap, and also update the in-memory cache
await this._peers.store(dto)
@@ -254,6 +273,19 @@ export class PeersService extends BaseService {
await this._refs.deleteByPeer(peer.id)
}
private async _writePending() {
try {
await asyncPool(this._pendingWrites.values(), async (dto) => {
await this._peers.store(dto)
await this._refs.deleteByPeer(dto.id)
})
this._pendingWrites.clear()
} catch (err) {
this._log.warn('failed to write pending updates: %e', err)
}
}
private _returnCaching(id: number, dto: IPeersRepository.PeerInfo) {
const peer = getInputPeer(dto)
const complete = this._deserializeTl(dto.complete)

View File

@@ -56,6 +56,10 @@ export class TelegramStorageManager {
)
}
async close(): Promise<void> {
await this.peers.close()
}
async clear(withAuthKeys = false): Promise<void> {
await this.provider.peers.deleteAll()
await this.provider.refMessages.deleteAll()

View File

@@ -67,6 +67,7 @@ class PeersServiceProxy implements PublicPart<PeersService> {
readonly getByUsername: PeersService['getByUsername']
readonly getCompleteById: PeersService['getCompleteById']
readonly getMinAccessHash: PeersService['getMinAccessHash']
readonly close: PeersService['close']
constructor(private _invoker: WorkerInvoker) {
const bind = this._invoker.makeBinder<PeersService>('storage-peers')
@@ -78,6 +79,7 @@ class PeersServiceProxy implements PublicPart<PeersService> {
this.getByUsername = bind('getByUsername')
this.getCompleteById = bind('getCompleteById')
this.getMinAccessHash = bind('getMinAccessHash')
this.close = bind('close')
}
}
@@ -86,6 +88,7 @@ export class TelegramStorageProxy implements PublicPart<TelegramStorageManager>
readonly peers: PeersServiceProxy
readonly clear: TelegramStorageManager['clear']
readonly close: TelegramStorageManager['close']
constructor(private _invoker: WorkerInvoker) {
const bind = this._invoker.makeBinder<TelegramStorageManager>('storage')
@@ -94,6 +97,7 @@ export class TelegramStorageProxy implements PublicPart<TelegramStorageManager>
this.peers = new PeersServiceProxy(this._invoker)
this.clear = bind('clear')
this.close = bind('close')
}
// todo - remove once we move these to updates manager