Files
ProfileMatching/verify_twitter_matches_v2.py
Andrew Jiang 5319d4d868 Initial commit: Twitter-Telegram Profile Matching System
This module provides comprehensive Twitter-to-Telegram profile matching
and verification using 10 different matching methods and LLM verification.

Features:
- 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names)
- URL resolution integration for t.co → t.me links
- Async LLM verification with GPT-5-mini
- Interactive menu system with real-time stats
- Threaded candidate finding (~1.5 contacts/sec)
- Comprehensive documentation and guides

Key Components:
- find_twitter_candidates.py: Core matching logic (10 methods)
- find_twitter_candidates_threaded.py: Threaded implementation
- verify_twitter_matches_v2.py: LLM verification (V5 prompt)
- review_match_quality.py: Analysis and quality review
- main.py: Interactive menu system
- Complete documentation (README, CHANGELOG, QUICKSTART)

Performance:
- Candidate finding: ~16-18 hours for 43K contacts
- LLM verification: ~23 hours for 43K users
- Cost: ~$130 for full verification

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 22:56:25 -08:00

792 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Twitter-Telegram Match Verifier V2 (Confidence-Based with Batch Evaluation)
Uses LLM with confidence scoring (0-1) and evaluates all candidates for a TG user together
"""
import sys
import asyncio
import json
from pathlib import Path
from typing import List, Dict
import psycopg2
from psycopg2.extras import DictCursor, RealDictCursor
import openai
from datetime import datetime
import re
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from db_config import SessionLocal
from models import Contact
# Twitter database connection
TWITTER_DB_CONFIG = {
'dbname': 'twitter_data',
'user': 'andrewjiang',
'host': 'localhost',
'port': 5432
}
# Checkpoint file
CHECKPOINT_FILE = Path(__file__).parent.parent / 'llm_verification_v2_checkpoint.json'
class CheckpointManager:
"""Manage checkpoints for resumable verification"""
def __init__(self, checkpoint_file):
self.checkpoint_file = checkpoint_file
self.data = self.load()
def load(self):
if self.checkpoint_file.exists():
with open(self.checkpoint_file, 'r') as f:
return json.load(f)
return {
'last_processed_telegram_id': None,
'processed_count': 0,
'total_matches_saved': 0,
'total_cost': 0.0,
'total_tokens': 0,
'started_at': None
}
def save(self):
self.data['last_updated_at'] = datetime.now().isoformat()
with open(self.checkpoint_file, 'w') as f:
json.dump(self.data, f, indent=2)
def update(self, telegram_user_id, matches_saved, tokens_used, cost):
self.data['last_processed_telegram_id'] = telegram_user_id
self.data['processed_count'] += 1
self.data['total_matches_saved'] += matches_saved
self.data['total_tokens'] += tokens_used
self.data['total_cost'] += cost
# Save every 20 telegram users
if self.data['processed_count'] % 20 == 0:
self.save()
def calculate_username_specificity(username: str) -> float:
"""
Calculate how specific/unique a username is (0-1)
Generic usernames get lower scores, unique ones get higher scores
"""
if not username:
return 0.5
username_lower = username.lower()
# Very generic patterns
generic_patterns = [
r'^admin\d*$', r'^user\d+$', r'^crypto\d*$', r'^nft\d*$',
r'^web3\d*$', r'^trader\d*$', r'^dev\d*$', r'^official\d*$',
r'^team\d*$', r'^support\d*$', r'^info\d*$'
]
for pattern in generic_patterns:
if re.match(pattern, username_lower):
return 0.3
# Length-based scoring
length = len(username)
if length < 4:
return 0.4
elif length < 6:
return 0.6
elif length < 8:
return 0.75
else:
return 0.9
class LLMVerifier:
"""Use GPT-5 Nano for confidence-based verification"""
def __init__(self):
self.client = openai.AsyncOpenAI()
self.model = "gpt-5-mini" # GPT-5 Mini - balanced performance and cost
# Cost calculation for gpt-5-mini
self.input_cost_per_1m = 0.25 # $0.25 per 1M input tokens
self.output_cost_per_1m = 2.00 # $2.00 per 1M output tokens
def build_batch_prompt(self, telegram_profile: Dict, candidates: List[Dict]) -> str:
"""Build LLM prompt for batch evaluation of all candidates"""
# Format Telegram profile
tg_bio = telegram_profile.get('bio', 'none')
if tg_bio and len(tg_bio) > 300:
tg_bio = tg_bio[:300] + '...'
# Format chat context
chat_context = telegram_profile.get('chat_context', {})
chat_info = ""
if chat_context.get('chat_titles'):
chat_info = f"\nGroup Chats: {chat_context['chat_titles']}"
if chat_context.get('is_crypto_focused'):
chat_info += " [CRYPTO-FOCUSED GROUPS]"
prompt = f"""TELEGRAM PROFILE:
Username: @{telegram_profile.get('username') or 'none'}
Display Name: {telegram_profile.get('name', 'none')} (first_name + last_name combined)
Bio: {tg_bio}{chat_info}
TWITTER CANDIDATES (evaluate all together):
"""
for i, candidate in enumerate(candidates, 1):
tw_bio = candidate.get('twitter_bio', 'none')
if tw_bio and len(tw_bio) > 250:
tw_bio = tw_bio[:250] + '...'
# Check if phash match info exists
phash_info = ""
if candidate['match_method'] == 'phash_match':
phash_distance = candidate.get('match_signals', {}).get('phash_distance', 'unknown')
phash_info = f"\n⭐ Phash Match: distance={phash_distance} (identical profile pictures!)"
prompt += f"""
[Candidate {i}]
Twitter Username: @{candidate['twitter_username']}
Twitter Display Name: {candidate.get('twitter_name', 'Unknown')}
Twitter Bio: {tw_bio}
Location: {candidate.get('twitter_location') or 'none'}
Verified: {candidate.get('twitter_verified', False)} (Blue: {candidate.get('twitter_blue_verified', False)})
Followers: {candidate.get('twitter_followers_count', 0):,}
Match Method: {candidate['match_method']}{phash_info}
Baseline Confidence: {candidate.get('baseline_confidence', 0):.2f}
"""
return prompt
async def verify_batch(self, telegram_profile: Dict, candidates: List[Dict], semaphore, log_file=None) -> Dict:
"""Verify all candidates for a single Telegram user"""
async with semaphore:
prompt = self.build_batch_prompt(telegram_profile, candidates)
if log_file:
log_file.write(f"\n{'=' * 100}\n")
log_file.write(f"TELEGRAM USER: {telegram_profile.get('username', 'N/A')} (ID: {telegram_profile['user_id']})\n")
log_file.write(f"{'=' * 100}\n\n")
system_prompt = """You are an expert at determining if two social media profiles belong to the same person.
# TASK
Determine confidence (0.0-1.0) that each Twitter candidate is the same person as the Telegram profile.
**CRITICAL: Evaluate ALL candidates together, not in isolation. Compare them against each other to identify which has the STRONGEST evidence.**
# SIGNAL STRENGTH GUIDE
Evaluate the FULL CONTEXT of all available signals together. Individual signals can be strong or weak, but the overall picture matters most.
## VERY STRONG SIGNALS (Can individually suggest high confidence)
- **Explicit bio mention**: TG bio says "x.com/username" or "Follow me @username"
- ⚠️ EXCEPTION: If that account is clearly a company/project (not personal), this is NOT definitive
- Example: TG bio "x.com/gems_gun" but @gems_gun is company account → Look for personal account like @lucahl0 "Building @gems_gun"
- **Unique username exact match**: Unusual/long username (like @kupermind, @schellinger_k) that matches exactly
- Generic usernames (@mike, @crypto123) don't qualify as "unique"
## STRONG SUPPORTING SIGNALS (Good indicators when combined)
Each of these helps build confidence, especially when multiple align:
- Full name match (after normalization: remove .eth, emojis, separators)
- Same profile picture (phash match)
- Aligned bio themes/context (both in crypto, both mention same projects/interests)
- Very similar username (not exact, but close: @kevin vs @k_kevin)
## WEAK SIGNALS (Need multiple strong signals to be meaningful)
- Generic name match only (Alex, Mike, David, John, Baz)
- Same general field but no specifics
- Partial username similarity with generic name
## RED FLAGS (Lower confidence significantly)
- Context mismatch: TG is crypto/tech, TW is chef/athlete/journalist
- Company account when looking for personal profile
- Famous person/celebrity (unless clear evidence it's actually them)
# CONFIDENCE BANDS
## 0.90-1.0: NEARLY CERTAIN
Very strong signal (bio mention of personal account OR unique username match) + supporting signals align
Examples:
- TG bio: "https://x.com/kupermind" + TW @kupermind personal account → 0.97
- TG @olliten + TW @olliten + same name + same pic → 0.98
## 0.70-0.89: LIKELY
Multiple strong supporting signals converge, or one very strong signal with some gap
Examples:
- Very similar username + name match + context: @schellinger → @k_schellinger "Kevin Schellinger" → 0.85
- Exact username on moderately unique name: @alexcrypto + "Alex Smith" crypto → 0.78
## 0.40-0.69: POSSIBLE
Some evidence but significant uncertainty
- Generic name + same field but no username/pic match
- Weak username similarity with generic name
- Profile pic match but name is very generic
## 0.10-0.39: UNLIKELY
Minimal evidence or contradictions
- Only generic name match (David, Alex, Mike)
- Context mismatch (crypto person vs chef)
## 0.0-0.09: EXTREMELY UNLIKELY
No meaningful evidence or clear contradiction
# COMPARATIVE EVALUATION PROCESS
**Step 1: Review ALL candidates together**
Don't score each in isolation. Look at the full set to understand which has the strongest evidence.
**Step 2: Identify the strongest signals present**
- Is there a bio mention? (Check if it's personal vs company account!)
- Is there a unique username match?
- Do multiple supporting signals converge for one candidate?
**Step 3: Apply differential scoring**
- The candidate with STRONGEST evidence should get meaningfully higher score
- If Candidate A has unique username + name match, and Candidate B only has generic name → A gets 0.85+, B gets 0.40 max
- If ALL candidates only have weak signals (generic name only) → ALL score 0.20-0.40
**Step 4: Sanity checks**
- Could this evidence match thousands of people? → Lower confidence
- Is there a context mismatch? → Max 0.50
- Is this a company account when we need personal? → Not the right match
**Key principle: Only ONE candidate can be "most likely" - differentiate clearly between them.**
# TECHNICAL NOTES
**Name Normalization**: Before comparing, remove .eth/.ton/.sol suffixes, emojis, "| company" separators, and ignore capitalization
**Profile Picture (phash)**: Phash match alone → MAX 0.70 (supporting signal). Use to break ties or add confidence to other signals.
# OUTPUT FORMAT
Return ONLY valid JSON (no markdown, no explanation):
{{
"candidates": [
{{
"candidate_index": 1,
"confidence": 0.85,
"reasoning": "Brief explanation"
}},
...
]
}}"""
try:
if log_file:
log_file.write("SYSTEM PROMPT:\n")
log_file.write("-" * 100 + "\n")
log_file.write(system_prompt + "\n\n")
log_file.write("USER PROMPT:\n")
log_file.write("-" * 100 + "\n")
log_file.write(prompt + "\n\n")
log_file.flush()
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
content = response.choices[0].message.content.strip()
if log_file:
log_file.write("LLM RESPONSE:\n")
log_file.write("-" * 100 + "\n")
log_file.write(content + "\n\n")
log_file.flush()
# Parse JSON
try:
result = json.loads(content)
except json.JSONDecodeError:
print(f" ⚠️ Failed to parse JSON response")
return {
'success': False,
'error': 'json_parse_error',
'tokens_used': response.usage.total_tokens,
'cost': self.calculate_cost(response.usage)
}
tokens_used = response.usage.total_tokens
cost = self.calculate_cost(response.usage)
return {
'success': True,
'results': result.get('candidates', []),
'tokens_used': tokens_used,
'cost': cost,
'error': None
}
except Exception as e:
print(f" ⚠️ LLM error: {str(e)[:100]}")
return {
'success': False,
'error': str(e),
'tokens_used': 0,
'cost': 0.0
}
def calculate_cost(self, usage) -> float:
"""Calculate cost for this API call"""
input_cost = (usage.prompt_tokens / 1_000_000) * self.input_cost_per_1m
output_cost = (usage.completion_tokens / 1_000_000) * self.output_cost_per_1m
return input_cost + output_cost
def get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit=None):
"""Get list of telegram_user_ids that have unprocessed candidates"""
with telegram_conn.cursor() as cur:
query = """
SELECT DISTINCT telegram_user_id
FROM twitter_match_candidates
WHERE needs_llm_review = TRUE
AND llm_processed = FALSE
"""
if checkpoint_manager.data['last_processed_telegram_id']:
query += f" AND telegram_user_id > {checkpoint_manager.data['last_processed_telegram_id']}"
query += " ORDER BY telegram_user_id"
if limit:
query += f" LIMIT {limit}"
cur.execute(query)
return [row[0] for row in cur.fetchall()]
def get_candidates_for_telegram_user(telegram_user_id: int, telegram_conn):
"""Get all candidates for a specific Telegram user"""
with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT *
FROM twitter_match_candidates
WHERE telegram_user_id = %s
AND needs_llm_review = TRUE
AND llm_processed = FALSE
ORDER BY baseline_confidence DESC
""", (telegram_user_id,))
return [dict(row) for row in cur.fetchall()]
def get_user_chat_context(user_id: int, telegram_conn) -> Dict:
"""Get chat participation context for a user"""
with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
STRING_AGG(DISTINCT c.title, ' | ') FILTER (WHERE c.title IS NOT NULL) as chat_titles,
COUNT(DISTINCT cp.chat_id) as chat_count
FROM chat_participants cp
JOIN chats c ON cp.chat_id = c.chat_id
WHERE cp.user_id = %s
AND c.title IS NOT NULL
AND c.chat_type != 'private'
""", (user_id,))
result = cur.fetchone()
if result and result['chat_titles']:
# Check if chats indicate crypto/web3 interest
chat_titles_lower = result['chat_titles'].lower()
crypto_keywords = ['crypto', 'bitcoin', 'eth', 'defi', 'nft', 'dao', 'web3', 'blockchain',
'solana', 'near', 'avalanche', 'polygon', 'base', 'arbitrum', 'optimism',
'cosmos', 'builders', 'degen', 'lobster']
is_crypto_focused = any(keyword in chat_titles_lower for keyword in crypto_keywords)
return {
'chat_titles': result['chat_titles'],
'chat_count': result['chat_count'],
'is_crypto_focused': is_crypto_focused
}
return {'chat_titles': None, 'chat_count': 0, 'is_crypto_focused': False}
async def verify_telegram_user(telegram_user_id: int, verifier: LLMVerifier, checkpoint_manager: CheckpointManager,
telegram_db, telegram_conn, log_file=None) -> Dict:
"""Verify all candidates for a single Telegram user"""
# Get Telegram profile
telegram_profile = telegram_db.query(Contact).filter(
Contact.user_id == telegram_user_id
).first()
if not telegram_profile:
return {'matches_saved': 0, 'tokens': 0, 'cost': 0}
# Construct display name from first_name + last_name
display_name = (telegram_profile.first_name or '') + (' ' + telegram_profile.last_name if telegram_profile.last_name else '')
display_name = display_name.strip() or None
# Get chat participation context
chat_context = get_user_chat_context(telegram_user_id, telegram_conn)
telegram_dict = {
'user_id': telegram_profile.user_id,
'account_id': telegram_profile.account_id,
'username': telegram_profile.username,
'name': display_name,
'bio': telegram_profile.bio,
'chat_context': chat_context
}
# Get all candidates
candidates = get_candidates_for_telegram_user(telegram_user_id, telegram_conn)
if not candidates:
return {'matches_saved': 0, 'tokens': 0, 'cost': 0}
# Verify with LLM
semaphore = asyncio.Semaphore(1) # One at a time for now
llm_result = await verifier.verify_batch(telegram_dict, candidates, semaphore, log_file)
if not llm_result['success']:
# Mark as processed even on error so we don't retry infinitely
mark_candidates_processed([c['id'] for c in candidates], telegram_conn)
return {
'matches_saved': 0,
'tokens': llm_result['tokens_used'],
'cost': llm_result['cost']
}
# Save matches
matches_saved = save_verified_matches(
telegram_dict,
candidates,
llm_result['results'],
telegram_conn
)
# Mark candidates as processed
mark_candidates_processed([c['id'] for c in candidates], telegram_conn)
return {
'matches_saved': matches_saved,
'tokens': llm_result['tokens_used'],
'cost': llm_result['cost']
}
def save_verified_matches(telegram_profile: Dict, candidates: List[Dict], llm_results: List[Dict], telegram_conn):
"""Save verified matches with confidence scores"""
matches_to_save = []
# CRITICAL: Post-process to fix generic name false positives
tg_name = telegram_profile.get('name', '')
tg_username = telegram_profile.get('username', '')
tg_bio = telegram_profile.get('bio', '')
# Check if profile has generic characteristics
has_generic_name = len(tg_name) <= 7 # Short display name
has_generic_username = len(tg_username) <= 8 # Short username
has_empty_bio = not tg_bio or len(tg_bio) <= 20
is_generic_profile = has_empty_bio and (has_generic_name or has_generic_username)
for llm_result in llm_results:
candidate_idx = llm_result.get('candidate_index', 0) - 1 # Convert to 0-indexed
if candidate_idx < 0 or candidate_idx >= len(candidates):
continue
confidence = llm_result.get('confidence', 0)
reasoning = llm_result.get('reasoning', '')
# Only save if confidence >= 0.5 (moderate or higher)
if confidence < 0.5:
continue
candidate = candidates[candidate_idx]
# CRITICAL FIX: Cap confidence for generic profiles with weak match methods
# Weak match methods are those based purely on name/username containment
weak_match_methods = [
'display_name_containment',
'fuzzy_name',
'tg_username_in_twitter_name',
'twitter_username_in_tg_name'
]
if is_generic_profile and candidate['match_method'] in weak_match_methods:
# Cap at 0.70 unless it's a strong signal (phash, exact_username, exact_bio_handle)
if confidence > 0.70:
confidence = 0.70
reasoning += " [Confidence capped at 0.70: generic profile + weak match method]"
match_details = {
'match_method': candidate['match_method'],
'baseline_confidence': candidate['baseline_confidence'],
'llm_confidence': confidence,
'llm_reasoning': reasoning
}
matches_to_save.append((
telegram_profile['account_id'],
telegram_profile['user_id'],
telegram_profile.get('username'),
telegram_profile.get('name'),
telegram_profile.get('bio'),
candidate['twitter_id'],
candidate['twitter_username'],
candidate.get('twitter_name'),
candidate.get('twitter_bio'),
candidate.get('twitter_location'),
candidate.get('twitter_verified', False),
candidate.get('twitter_blue_verified', False),
candidate.get('twitter_followers_count', 0),
candidate['match_method'],
candidate['baseline_confidence'],
'CONFIDENT' if confidence >= 0.8 else 'MODERATE' if confidence >= 0.6 else 'UNSURE',
confidence,
json.dumps(match_details),
llm_result.get('tokens_used', 0),
0, # cost will be aggregated at the batch level
confidence < 0.75 # needs_manual_review
))
if matches_to_save:
with telegram_conn.cursor() as cur:
cur.executemany("""
INSERT INTO twitter_telegram_matches (
account_id,
telegram_user_id,
telegram_username,
telegram_name,
telegram_bio,
twitter_id,
twitter_username,
twitter_name,
twitter_bio,
twitter_location,
twitter_verified,
twitter_blue_verified,
twitter_followers_count,
match_method,
baseline_confidence,
llm_verdict,
final_confidence,
match_details,
llm_tokens_used,
llm_cost,
needs_manual_review
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (telegram_user_id, twitter_id) DO UPDATE SET
llm_verdict = EXCLUDED.llm_verdict,
final_confidence = EXCLUDED.final_confidence,
matched_at = NOW()
""", matches_to_save)
telegram_conn.commit()
return len(matches_to_save)
def mark_candidates_processed(candidate_ids: List[int], telegram_conn):
"""Mark candidates as LLM processed"""
with telegram_conn.cursor() as cur:
cur.execute("""
UPDATE twitter_match_candidates
SET llm_processed = TRUE
WHERE id = ANY(%s)
""", (candidate_ids,))
telegram_conn.commit()
async def main():
print()
print("=" * 70)
print("🤖 Twitter-Telegram Match Verifier V2 (Confidence-Based)")
print("=" * 70)
print()
# Check arguments
test_mode = '--test' in sys.argv
verbose = '--verbose' in sys.argv
limit = 100 if test_mode else None
# Check for concurrency argument
concurrent_requests = 10 # Default
for i, arg in enumerate(sys.argv):
if arg == '--concurrent' and i + 1 < len(sys.argv):
try:
concurrent_requests = int(sys.argv[i + 1])
except ValueError:
pass
# Setup verbose logging
log_file = None
if verbose:
log_file = open(Path(__file__).parent.parent / 'verification_v2_log.txt', 'w')
log_file.write("=" * 100 + "\n")
log_file.write("VERIFICATION V2 LOG\n")
log_file.write("=" * 100 + "\n\n")
if test_mode:
print("🧪 TEST MODE: Processing first 100 Telegram users only")
print()
# Initialize checkpoint
checkpoint_manager = CheckpointManager(CHECKPOINT_FILE)
if checkpoint_manager.data['last_processed_telegram_id']:
print(f"📍 Resuming from telegram_user_id {checkpoint_manager.data['last_processed_telegram_id']}")
print(f" Already processed: {checkpoint_manager.data['processed_count']:,}")
print(f" Cost so far: ${checkpoint_manager.data['total_cost']:.4f}")
print()
else:
checkpoint_manager.data['started_at'] = datetime.now().isoformat()
# Connect to databases
print("📡 Connecting to databases...")
telegram_db = SessionLocal()
try:
telegram_conn = psycopg2.connect(dbname='telegram_contacts', user='andrewjiang', host='localhost', port=5432)
telegram_conn.autocommit = False
except Exception as e:
print(f"❌ Failed to connect to Telegram database: {e}")
return False
print("✅ Connected")
print()
try:
# Load telegram users needing verification
print("🔍 Loading telegram users with candidates...")
telegram_user_ids = get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit)
if not telegram_user_ids:
print("✅ No users to verify!")
return True
print(f"✅ Found {len(telegram_user_ids):,} telegram users to verify")
print()
# Estimate cost (rough)
estimated_cost = len(telegram_user_ids) * 0.003 # ~$0.003 per user
print(f"💰 Estimated cost: ${estimated_cost:.4f}")
print()
# Initialize verifier
verifier = LLMVerifier()
print("🚀 Starting LLM verification...")
print(f"⚡ Concurrent requests: {concurrent_requests}")
print()
# Configuration for parallel processing
CONCURRENT_REQUESTS = concurrent_requests # Process N users at a time
BATCH_SIZE = 50 # Save checkpoint every 50 users
# Process users in parallel batches
total_users = len(telegram_user_ids)
processed_count = 0
for batch_start in range(0, total_users, BATCH_SIZE):
batch_end = min(batch_start + BATCH_SIZE, total_users)
batch = telegram_user_ids[batch_start:batch_end]
print(f"📦 Processing batch {batch_start//BATCH_SIZE + 1}/{(total_users + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} users)...")
# Create tasks for concurrent processing
tasks = []
for telegram_user_id in batch:
task = verify_telegram_user(
telegram_user_id,
verifier,
checkpoint_manager,
telegram_db,
telegram_conn,
log_file
)
tasks.append((telegram_user_id, task))
# Process batch concurrently with limit
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
async def process_with_semaphore(user_id, task):
async with semaphore:
return user_id, await task
results = await asyncio.gather(
*[process_with_semaphore(user_id, task) for user_id, task in tasks],
return_exceptions=True
)
# Process results and update checkpoints
for i, result in enumerate(results):
processed_count += 1
user_id = batch[i]
if isinstance(result, Exception):
print(f"[{processed_count}/{total_users}] ❌ User {user_id} failed: {result}")
continue
user_id_result, verification_result = result
print(f"[{processed_count}/{total_users}] ✅ User {user_id_result}: {verification_result['matches_saved']} matches | ${verification_result['cost']:.4f}")
# Update checkpoint
checkpoint_manager.update(
user_id_result,
verification_result['matches_saved'],
verification_result['tokens'],
verification_result['cost']
)
print(f" Batch complete. Total processed: {processed_count}/{total_users}")
print()
# Final stats
print()
print("=" * 70)
print("✅ VERIFICATION COMPLETE")
print("=" * 70)
print()
print(f"📊 Statistics:")
print(f" Processed: {checkpoint_manager.data['processed_count']:,} telegram users")
print(f" 💾 Saved matches: {checkpoint_manager.data['total_matches_saved']:,}")
print()
print(f"💰 Cost:")
print(f" Total tokens: {checkpoint_manager.data['total_tokens']:,}")
print(f" Total cost: ${checkpoint_manager.data['total_cost']:.4f}")
print()
# Clean up checkpoint
CHECKPOINT_FILE.unlink(missing_ok=True)
return True
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
return False
finally:
if log_file:
log_file.close()
telegram_db.close()
telegram_conn.close()
checkpoint_manager.save()
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
print("💾 Progress saved - you can resume by running this script again")
sys.exit(1)