mirror of
https://github.com/lockin-bot/ProfileMatching.git
synced 2026-01-12 09:44:30 +08:00
This module provides comprehensive Twitter-to-Telegram profile matching and verification using 10 different matching methods and LLM verification. Features: - 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names) - URL resolution integration for t.co → t.me links - Async LLM verification with GPT-5-mini - Interactive menu system with real-time stats - Threaded candidate finding (~1.5 contacts/sec) - Comprehensive documentation and guides Key Components: - find_twitter_candidates.py: Core matching logic (10 methods) - find_twitter_candidates_threaded.py: Threaded implementation - verify_twitter_matches_v2.py: LLM verification (V5 prompt) - review_match_quality.py: Analysis and quality review - main.py: Interactive menu system - Complete documentation (README, CHANGELOG, QUICKSTART) Performance: - Candidate finding: ~16-18 hours for 43K contacts - LLM verification: ~23 hours for 43K users - Cost: ~$130 for full verification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
792 lines
29 KiB
Python
Executable File
792 lines
29 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Twitter-Telegram Match Verifier V2 (Confidence-Based with Batch Evaluation)
|
|
Uses LLM with confidence scoring (0-1) and evaluates all candidates for a TG user together
|
|
"""
|
|
|
|
import sys
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Dict
|
|
import psycopg2
|
|
from psycopg2.extras import DictCursor, RealDictCursor
|
|
import openai
|
|
from datetime import datetime
|
|
import re
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
|
|
|
|
from db_config import SessionLocal
|
|
from models import Contact
|
|
|
|
# Twitter database connection
|
|
TWITTER_DB_CONFIG = {
|
|
'dbname': 'twitter_data',
|
|
'user': 'andrewjiang',
|
|
'host': 'localhost',
|
|
'port': 5432
|
|
}
|
|
|
|
# Checkpoint file
|
|
CHECKPOINT_FILE = Path(__file__).parent.parent / 'llm_verification_v2_checkpoint.json'
|
|
|
|
|
|
class CheckpointManager:
|
|
"""Manage checkpoints for resumable verification"""
|
|
|
|
def __init__(self, checkpoint_file):
|
|
self.checkpoint_file = checkpoint_file
|
|
self.data = self.load()
|
|
|
|
def load(self):
|
|
if self.checkpoint_file.exists():
|
|
with open(self.checkpoint_file, 'r') as f:
|
|
return json.load(f)
|
|
return {
|
|
'last_processed_telegram_id': None,
|
|
'processed_count': 0,
|
|
'total_matches_saved': 0,
|
|
'total_cost': 0.0,
|
|
'total_tokens': 0,
|
|
'started_at': None
|
|
}
|
|
|
|
def save(self):
|
|
self.data['last_updated_at'] = datetime.now().isoformat()
|
|
with open(self.checkpoint_file, 'w') as f:
|
|
json.dump(self.data, f, indent=2)
|
|
|
|
def update(self, telegram_user_id, matches_saved, tokens_used, cost):
|
|
self.data['last_processed_telegram_id'] = telegram_user_id
|
|
self.data['processed_count'] += 1
|
|
self.data['total_matches_saved'] += matches_saved
|
|
self.data['total_tokens'] += tokens_used
|
|
self.data['total_cost'] += cost
|
|
|
|
# Save every 20 telegram users
|
|
if self.data['processed_count'] % 20 == 0:
|
|
self.save()
|
|
|
|
|
|
def calculate_username_specificity(username: str) -> float:
|
|
"""
|
|
Calculate how specific/unique a username is (0-1)
|
|
|
|
Generic usernames get lower scores, unique ones get higher scores
|
|
"""
|
|
if not username:
|
|
return 0.5
|
|
|
|
username_lower = username.lower()
|
|
|
|
# Very generic patterns
|
|
generic_patterns = [
|
|
r'^admin\d*$', r'^user\d+$', r'^crypto\d*$', r'^nft\d*$',
|
|
r'^web3\d*$', r'^trader\d*$', r'^dev\d*$', r'^official\d*$',
|
|
r'^team\d*$', r'^support\d*$', r'^info\d*$'
|
|
]
|
|
|
|
for pattern in generic_patterns:
|
|
if re.match(pattern, username_lower):
|
|
return 0.3
|
|
|
|
# Length-based scoring
|
|
length = len(username)
|
|
if length < 4:
|
|
return 0.4
|
|
elif length < 6:
|
|
return 0.6
|
|
elif length < 8:
|
|
return 0.75
|
|
else:
|
|
return 0.9
|
|
|
|
|
|
class LLMVerifier:
|
|
"""Use GPT-5 Nano for confidence-based verification"""
|
|
|
|
def __init__(self):
|
|
self.client = openai.AsyncOpenAI()
|
|
self.model = "gpt-5-mini" # GPT-5 Mini - balanced performance and cost
|
|
|
|
# Cost calculation for gpt-5-mini
|
|
self.input_cost_per_1m = 0.25 # $0.25 per 1M input tokens
|
|
self.output_cost_per_1m = 2.00 # $2.00 per 1M output tokens
|
|
|
|
def build_batch_prompt(self, telegram_profile: Dict, candidates: List[Dict]) -> str:
|
|
"""Build LLM prompt for batch evaluation of all candidates"""
|
|
|
|
# Format Telegram profile
|
|
tg_bio = telegram_profile.get('bio', 'none')
|
|
if tg_bio and len(tg_bio) > 300:
|
|
tg_bio = tg_bio[:300] + '...'
|
|
|
|
# Format chat context
|
|
chat_context = telegram_profile.get('chat_context', {})
|
|
chat_info = ""
|
|
if chat_context.get('chat_titles'):
|
|
chat_info = f"\nGroup Chats: {chat_context['chat_titles']}"
|
|
if chat_context.get('is_crypto_focused'):
|
|
chat_info += " [CRYPTO-FOCUSED GROUPS]"
|
|
|
|
prompt = f"""TELEGRAM PROFILE:
|
|
Username: @{telegram_profile.get('username') or 'none'}
|
|
Display Name: {telegram_profile.get('name', 'none')} (first_name + last_name combined)
|
|
Bio: {tg_bio}{chat_info}
|
|
|
|
TWITTER CANDIDATES (evaluate all together):
|
|
"""
|
|
|
|
for i, candidate in enumerate(candidates, 1):
|
|
tw_bio = candidate.get('twitter_bio', 'none')
|
|
if tw_bio and len(tw_bio) > 250:
|
|
tw_bio = tw_bio[:250] + '...'
|
|
|
|
# Check if phash match info exists
|
|
phash_info = ""
|
|
if candidate['match_method'] == 'phash_match':
|
|
phash_distance = candidate.get('match_signals', {}).get('phash_distance', 'unknown')
|
|
phash_info = f"\n⭐ Phash Match: distance={phash_distance} (identical profile pictures!)"
|
|
|
|
prompt += f"""
|
|
[Candidate {i}]
|
|
Twitter Username: @{candidate['twitter_username']}
|
|
Twitter Display Name: {candidate.get('twitter_name', 'Unknown')}
|
|
Twitter Bio: {tw_bio}
|
|
Location: {candidate.get('twitter_location') or 'none'}
|
|
Verified: {candidate.get('twitter_verified', False)} (Blue: {candidate.get('twitter_blue_verified', False)})
|
|
Followers: {candidate.get('twitter_followers_count', 0):,}
|
|
Match Method: {candidate['match_method']}{phash_info}
|
|
Baseline Confidence: {candidate.get('baseline_confidence', 0):.2f}
|
|
"""
|
|
|
|
return prompt
|
|
|
|
async def verify_batch(self, telegram_profile: Dict, candidates: List[Dict], semaphore, log_file=None) -> Dict:
|
|
"""Verify all candidates for a single Telegram user"""
|
|
|
|
async with semaphore:
|
|
prompt = self.build_batch_prompt(telegram_profile, candidates)
|
|
|
|
if log_file:
|
|
log_file.write(f"\n{'=' * 100}\n")
|
|
log_file.write(f"TELEGRAM USER: {telegram_profile.get('username', 'N/A')} (ID: {telegram_profile['user_id']})\n")
|
|
log_file.write(f"{'=' * 100}\n\n")
|
|
|
|
system_prompt = """You are an expert at determining if two social media profiles belong to the same person.
|
|
|
|
# TASK
|
|
Determine confidence (0.0-1.0) that each Twitter candidate is the same person as the Telegram profile.
|
|
|
|
**CRITICAL: Evaluate ALL candidates together, not in isolation. Compare them against each other to identify which has the STRONGEST evidence.**
|
|
|
|
# SIGNAL STRENGTH GUIDE
|
|
|
|
Evaluate the FULL CONTEXT of all available signals together. Individual signals can be strong or weak, but the overall picture matters most.
|
|
|
|
## VERY STRONG SIGNALS (Can individually suggest high confidence)
|
|
- **Explicit bio mention**: TG bio says "x.com/username" or "Follow me @username"
|
|
- ⚠️ EXCEPTION: If that account is clearly a company/project (not personal), this is NOT definitive
|
|
- Example: TG bio "x.com/gems_gun" but @gems_gun is company account → Look for personal account like @lucahl0 "Building @gems_gun"
|
|
- **Unique username exact match**: Unusual/long username (like @kupermind, @schellinger_k) that matches exactly
|
|
- Generic usernames (@mike, @crypto123) don't qualify as "unique"
|
|
|
|
## STRONG SUPPORTING SIGNALS (Good indicators when combined)
|
|
Each of these helps build confidence, especially when multiple align:
|
|
- Full name match (after normalization: remove .eth, emojis, separators)
|
|
- Same profile picture (phash match)
|
|
- Aligned bio themes/context (both in crypto, both mention same projects/interests)
|
|
- Very similar username (not exact, but close: @kevin vs @k_kevin)
|
|
|
|
## WEAK SIGNALS (Need multiple strong signals to be meaningful)
|
|
- Generic name match only (Alex, Mike, David, John, Baz)
|
|
- Same general field but no specifics
|
|
- Partial username similarity with generic name
|
|
|
|
## RED FLAGS (Lower confidence significantly)
|
|
- Context mismatch: TG is crypto/tech, TW is chef/athlete/journalist
|
|
- Company account when looking for personal profile
|
|
- Famous person/celebrity (unless clear evidence it's actually them)
|
|
|
|
# CONFIDENCE BANDS
|
|
|
|
## 0.90-1.0: NEARLY CERTAIN
|
|
Very strong signal (bio mention of personal account OR unique username match) + supporting signals align
|
|
Examples:
|
|
- TG bio: "https://x.com/kupermind" + TW @kupermind personal account → 0.97
|
|
- TG @olliten + TW @olliten + same name + same pic → 0.98
|
|
|
|
## 0.70-0.89: LIKELY
|
|
Multiple strong supporting signals converge, or one very strong signal with some gap
|
|
Examples:
|
|
- Very similar username + name match + context: @schellinger → @k_schellinger "Kevin Schellinger" → 0.85
|
|
- Exact username on moderately unique name: @alexcrypto + "Alex Smith" crypto → 0.78
|
|
|
|
## 0.40-0.69: POSSIBLE
|
|
Some evidence but significant uncertainty
|
|
- Generic name + same field but no username/pic match
|
|
- Weak username similarity with generic name
|
|
- Profile pic match but name is very generic
|
|
|
|
## 0.10-0.39: UNLIKELY
|
|
Minimal evidence or contradictions
|
|
- Only generic name match (David, Alex, Mike)
|
|
- Context mismatch (crypto person vs chef)
|
|
|
|
## 0.0-0.09: EXTREMELY UNLIKELY
|
|
No meaningful evidence or clear contradiction
|
|
|
|
# COMPARATIVE EVALUATION PROCESS
|
|
|
|
**Step 1: Review ALL candidates together**
|
|
Don't score each in isolation. Look at the full set to understand which has the strongest evidence.
|
|
|
|
**Step 2: Identify the strongest signals present**
|
|
- Is there a bio mention? (Check if it's personal vs company account!)
|
|
- Is there a unique username match?
|
|
- Do multiple supporting signals converge for one candidate?
|
|
|
|
**Step 3: Apply differential scoring**
|
|
- The candidate with STRONGEST evidence should get meaningfully higher score
|
|
- If Candidate A has unique username + name match, and Candidate B only has generic name → A gets 0.85+, B gets 0.40 max
|
|
- If ALL candidates only have weak signals (generic name only) → ALL score 0.20-0.40
|
|
|
|
**Step 4: Sanity checks**
|
|
- Could this evidence match thousands of people? → Lower confidence
|
|
- Is there a context mismatch? → Max 0.50
|
|
- Is this a company account when we need personal? → Not the right match
|
|
|
|
**Key principle: Only ONE candidate can be "most likely" - differentiate clearly between them.**
|
|
|
|
# TECHNICAL NOTES
|
|
|
|
**Name Normalization**: Before comparing, remove .eth/.ton/.sol suffixes, emojis, "| company" separators, and ignore capitalization
|
|
|
|
**Profile Picture (phash)**: Phash match alone → MAX 0.70 (supporting signal). Use to break ties or add confidence to other signals.
|
|
|
|
# OUTPUT FORMAT
|
|
Return ONLY valid JSON (no markdown, no explanation):
|
|
{{
|
|
"candidates": [
|
|
{{
|
|
"candidate_index": 1,
|
|
"confidence": 0.85,
|
|
"reasoning": "Brief explanation"
|
|
}},
|
|
...
|
|
]
|
|
}}"""
|
|
|
|
try:
|
|
if log_file:
|
|
log_file.write("SYSTEM PROMPT:\n")
|
|
log_file.write("-" * 100 + "\n")
|
|
log_file.write(system_prompt + "\n\n")
|
|
log_file.write("USER PROMPT:\n")
|
|
log_file.write("-" * 100 + "\n")
|
|
log_file.write(prompt + "\n\n")
|
|
log_file.flush()
|
|
|
|
response = await self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
response_format={"type": "json_object"}
|
|
)
|
|
|
|
content = response.choices[0].message.content.strip()
|
|
|
|
if log_file:
|
|
log_file.write("LLM RESPONSE:\n")
|
|
log_file.write("-" * 100 + "\n")
|
|
log_file.write(content + "\n\n")
|
|
log_file.flush()
|
|
|
|
# Parse JSON
|
|
try:
|
|
result = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
print(f" ⚠️ Failed to parse JSON response")
|
|
return {
|
|
'success': False,
|
|
'error': 'json_parse_error',
|
|
'tokens_used': response.usage.total_tokens,
|
|
'cost': self.calculate_cost(response.usage)
|
|
}
|
|
|
|
tokens_used = response.usage.total_tokens
|
|
cost = self.calculate_cost(response.usage)
|
|
|
|
return {
|
|
'success': True,
|
|
'results': result.get('candidates', []),
|
|
'tokens_used': tokens_used,
|
|
'cost': cost,
|
|
'error': None
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ LLM error: {str(e)[:100]}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'tokens_used': 0,
|
|
'cost': 0.0
|
|
}
|
|
|
|
def calculate_cost(self, usage) -> float:
|
|
"""Calculate cost for this API call"""
|
|
input_cost = (usage.prompt_tokens / 1_000_000) * self.input_cost_per_1m
|
|
output_cost = (usage.completion_tokens / 1_000_000) * self.output_cost_per_1m
|
|
return input_cost + output_cost
|
|
|
|
|
|
def get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit=None):
|
|
"""Get list of telegram_user_ids that have unprocessed candidates"""
|
|
with telegram_conn.cursor() as cur:
|
|
query = """
|
|
SELECT DISTINCT telegram_user_id
|
|
FROM twitter_match_candidates
|
|
WHERE needs_llm_review = TRUE
|
|
AND llm_processed = FALSE
|
|
"""
|
|
|
|
if checkpoint_manager.data['last_processed_telegram_id']:
|
|
query += f" AND telegram_user_id > {checkpoint_manager.data['last_processed_telegram_id']}"
|
|
|
|
query += " ORDER BY telegram_user_id"
|
|
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
cur.execute(query)
|
|
return [row[0] for row in cur.fetchall()]
|
|
|
|
|
|
def get_candidates_for_telegram_user(telegram_user_id: int, telegram_conn):
|
|
"""Get all candidates for a specific Telegram user"""
|
|
with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT *
|
|
FROM twitter_match_candidates
|
|
WHERE telegram_user_id = %s
|
|
AND needs_llm_review = TRUE
|
|
AND llm_processed = FALSE
|
|
ORDER BY baseline_confidence DESC
|
|
""", (telegram_user_id,))
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def get_user_chat_context(user_id: int, telegram_conn) -> Dict:
|
|
"""Get chat participation context for a user"""
|
|
with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
STRING_AGG(DISTINCT c.title, ' | ') FILTER (WHERE c.title IS NOT NULL) as chat_titles,
|
|
COUNT(DISTINCT cp.chat_id) as chat_count
|
|
FROM chat_participants cp
|
|
JOIN chats c ON cp.chat_id = c.chat_id
|
|
WHERE cp.user_id = %s
|
|
AND c.title IS NOT NULL
|
|
AND c.chat_type != 'private'
|
|
""", (user_id,))
|
|
result = cur.fetchone()
|
|
|
|
if result and result['chat_titles']:
|
|
# Check if chats indicate crypto/web3 interest
|
|
chat_titles_lower = result['chat_titles'].lower()
|
|
crypto_keywords = ['crypto', 'bitcoin', 'eth', 'defi', 'nft', 'dao', 'web3', 'blockchain',
|
|
'solana', 'near', 'avalanche', 'polygon', 'base', 'arbitrum', 'optimism',
|
|
'cosmos', 'builders', 'degen', 'lobster']
|
|
is_crypto_focused = any(keyword in chat_titles_lower for keyword in crypto_keywords)
|
|
|
|
return {
|
|
'chat_titles': result['chat_titles'],
|
|
'chat_count': result['chat_count'],
|
|
'is_crypto_focused': is_crypto_focused
|
|
}
|
|
|
|
return {'chat_titles': None, 'chat_count': 0, 'is_crypto_focused': False}
|
|
|
|
|
|
async def verify_telegram_user(telegram_user_id: int, verifier: LLMVerifier, checkpoint_manager: CheckpointManager,
|
|
telegram_db, telegram_conn, log_file=None) -> Dict:
|
|
"""Verify all candidates for a single Telegram user"""
|
|
|
|
# Get Telegram profile
|
|
telegram_profile = telegram_db.query(Contact).filter(
|
|
Contact.user_id == telegram_user_id
|
|
).first()
|
|
|
|
if not telegram_profile:
|
|
return {'matches_saved': 0, 'tokens': 0, 'cost': 0}
|
|
|
|
# Construct display name from first_name + last_name
|
|
display_name = (telegram_profile.first_name or '') + (' ' + telegram_profile.last_name if telegram_profile.last_name else '')
|
|
display_name = display_name.strip() or None
|
|
|
|
# Get chat participation context
|
|
chat_context = get_user_chat_context(telegram_user_id, telegram_conn)
|
|
|
|
telegram_dict = {
|
|
'user_id': telegram_profile.user_id,
|
|
'account_id': telegram_profile.account_id,
|
|
'username': telegram_profile.username,
|
|
'name': display_name,
|
|
'bio': telegram_profile.bio,
|
|
'chat_context': chat_context
|
|
}
|
|
|
|
# Get all candidates
|
|
candidates = get_candidates_for_telegram_user(telegram_user_id, telegram_conn)
|
|
|
|
if not candidates:
|
|
return {'matches_saved': 0, 'tokens': 0, 'cost': 0}
|
|
|
|
# Verify with LLM
|
|
semaphore = asyncio.Semaphore(1) # One at a time for now
|
|
llm_result = await verifier.verify_batch(telegram_dict, candidates, semaphore, log_file)
|
|
|
|
if not llm_result['success']:
|
|
# Mark as processed even on error so we don't retry infinitely
|
|
mark_candidates_processed([c['id'] for c in candidates], telegram_conn)
|
|
return {
|
|
'matches_saved': 0,
|
|
'tokens': llm_result['tokens_used'],
|
|
'cost': llm_result['cost']
|
|
}
|
|
|
|
# Save matches
|
|
matches_saved = save_verified_matches(
|
|
telegram_dict,
|
|
candidates,
|
|
llm_result['results'],
|
|
telegram_conn
|
|
)
|
|
|
|
# Mark candidates as processed
|
|
mark_candidates_processed([c['id'] for c in candidates], telegram_conn)
|
|
|
|
return {
|
|
'matches_saved': matches_saved,
|
|
'tokens': llm_result['tokens_used'],
|
|
'cost': llm_result['cost']
|
|
}
|
|
|
|
|
|
def save_verified_matches(telegram_profile: Dict, candidates: List[Dict], llm_results: List[Dict], telegram_conn):
|
|
"""Save verified matches with confidence scores"""
|
|
|
|
matches_to_save = []
|
|
|
|
# CRITICAL: Post-process to fix generic name false positives
|
|
tg_name = telegram_profile.get('name', '')
|
|
tg_username = telegram_profile.get('username', '')
|
|
tg_bio = telegram_profile.get('bio', '')
|
|
|
|
# Check if profile has generic characteristics
|
|
has_generic_name = len(tg_name) <= 7 # Short display name
|
|
has_generic_username = len(tg_username) <= 8 # Short username
|
|
has_empty_bio = not tg_bio or len(tg_bio) <= 20
|
|
|
|
is_generic_profile = has_empty_bio and (has_generic_name or has_generic_username)
|
|
|
|
for llm_result in llm_results:
|
|
candidate_idx = llm_result.get('candidate_index', 0) - 1 # Convert to 0-indexed
|
|
|
|
if candidate_idx < 0 or candidate_idx >= len(candidates):
|
|
continue
|
|
|
|
confidence = llm_result.get('confidence', 0)
|
|
reasoning = llm_result.get('reasoning', '')
|
|
|
|
# Only save if confidence >= 0.5 (moderate or higher)
|
|
if confidence < 0.5:
|
|
continue
|
|
|
|
candidate = candidates[candidate_idx]
|
|
|
|
# CRITICAL FIX: Cap confidence for generic profiles with weak match methods
|
|
# Weak match methods are those based purely on name/username containment
|
|
weak_match_methods = [
|
|
'display_name_containment',
|
|
'fuzzy_name',
|
|
'tg_username_in_twitter_name',
|
|
'twitter_username_in_tg_name'
|
|
]
|
|
|
|
if is_generic_profile and candidate['match_method'] in weak_match_methods:
|
|
# Cap at 0.70 unless it's a strong signal (phash, exact_username, exact_bio_handle)
|
|
if confidence > 0.70:
|
|
confidence = 0.70
|
|
reasoning += " [Confidence capped at 0.70: generic profile + weak match method]"
|
|
|
|
match_details = {
|
|
'match_method': candidate['match_method'],
|
|
'baseline_confidence': candidate['baseline_confidence'],
|
|
'llm_confidence': confidence,
|
|
'llm_reasoning': reasoning
|
|
}
|
|
|
|
matches_to_save.append((
|
|
telegram_profile['account_id'],
|
|
telegram_profile['user_id'],
|
|
telegram_profile.get('username'),
|
|
telegram_profile.get('name'),
|
|
telegram_profile.get('bio'),
|
|
candidate['twitter_id'],
|
|
candidate['twitter_username'],
|
|
candidate.get('twitter_name'),
|
|
candidate.get('twitter_bio'),
|
|
candidate.get('twitter_location'),
|
|
candidate.get('twitter_verified', False),
|
|
candidate.get('twitter_blue_verified', False),
|
|
candidate.get('twitter_followers_count', 0),
|
|
candidate['match_method'],
|
|
candidate['baseline_confidence'],
|
|
'CONFIDENT' if confidence >= 0.8 else 'MODERATE' if confidence >= 0.6 else 'UNSURE',
|
|
confidence,
|
|
json.dumps(match_details),
|
|
llm_result.get('tokens_used', 0),
|
|
0, # cost will be aggregated at the batch level
|
|
confidence < 0.75 # needs_manual_review
|
|
))
|
|
|
|
if matches_to_save:
|
|
with telegram_conn.cursor() as cur:
|
|
cur.executemany("""
|
|
INSERT INTO twitter_telegram_matches (
|
|
account_id,
|
|
telegram_user_id,
|
|
telegram_username,
|
|
telegram_name,
|
|
telegram_bio,
|
|
twitter_id,
|
|
twitter_username,
|
|
twitter_name,
|
|
twitter_bio,
|
|
twitter_location,
|
|
twitter_verified,
|
|
twitter_blue_verified,
|
|
twitter_followers_count,
|
|
match_method,
|
|
baseline_confidence,
|
|
llm_verdict,
|
|
final_confidence,
|
|
match_details,
|
|
llm_tokens_used,
|
|
llm_cost,
|
|
needs_manual_review
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (telegram_user_id, twitter_id) DO UPDATE SET
|
|
llm_verdict = EXCLUDED.llm_verdict,
|
|
final_confidence = EXCLUDED.final_confidence,
|
|
matched_at = NOW()
|
|
""", matches_to_save)
|
|
telegram_conn.commit()
|
|
|
|
return len(matches_to_save)
|
|
|
|
|
|
def mark_candidates_processed(candidate_ids: List[int], telegram_conn):
|
|
"""Mark candidates as LLM processed"""
|
|
with telegram_conn.cursor() as cur:
|
|
cur.execute("""
|
|
UPDATE twitter_match_candidates
|
|
SET llm_processed = TRUE
|
|
WHERE id = ANY(%s)
|
|
""", (candidate_ids,))
|
|
telegram_conn.commit()
|
|
|
|
|
|
async def main():
|
|
print()
|
|
print("=" * 70)
|
|
print("🤖 Twitter-Telegram Match Verifier V2 (Confidence-Based)")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Check arguments
|
|
test_mode = '--test' in sys.argv
|
|
verbose = '--verbose' in sys.argv
|
|
limit = 100 if test_mode else None
|
|
|
|
# Check for concurrency argument
|
|
concurrent_requests = 10 # Default
|
|
for i, arg in enumerate(sys.argv):
|
|
if arg == '--concurrent' and i + 1 < len(sys.argv):
|
|
try:
|
|
concurrent_requests = int(sys.argv[i + 1])
|
|
except ValueError:
|
|
pass
|
|
|
|
# Setup verbose logging
|
|
log_file = None
|
|
if verbose:
|
|
log_file = open(Path(__file__).parent.parent / 'verification_v2_log.txt', 'w')
|
|
log_file.write("=" * 100 + "\n")
|
|
log_file.write("VERIFICATION V2 LOG\n")
|
|
log_file.write("=" * 100 + "\n\n")
|
|
|
|
if test_mode:
|
|
print("🧪 TEST MODE: Processing first 100 Telegram users only")
|
|
print()
|
|
|
|
# Initialize checkpoint
|
|
checkpoint_manager = CheckpointManager(CHECKPOINT_FILE)
|
|
|
|
if checkpoint_manager.data['last_processed_telegram_id']:
|
|
print(f"📍 Resuming from telegram_user_id {checkpoint_manager.data['last_processed_telegram_id']}")
|
|
print(f" Already processed: {checkpoint_manager.data['processed_count']:,}")
|
|
print(f" Cost so far: ${checkpoint_manager.data['total_cost']:.4f}")
|
|
print()
|
|
else:
|
|
checkpoint_manager.data['started_at'] = datetime.now().isoformat()
|
|
|
|
# Connect to databases
|
|
print("📡 Connecting to databases...")
|
|
telegram_db = SessionLocal()
|
|
|
|
try:
|
|
telegram_conn = psycopg2.connect(dbname='telegram_contacts', user='andrewjiang', host='localhost', port=5432)
|
|
telegram_conn.autocommit = False
|
|
except Exception as e:
|
|
print(f"❌ Failed to connect to Telegram database: {e}")
|
|
return False
|
|
|
|
print("✅ Connected")
|
|
print()
|
|
|
|
try:
|
|
# Load telegram users needing verification
|
|
print("🔍 Loading telegram users with candidates...")
|
|
telegram_user_ids = get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit)
|
|
|
|
if not telegram_user_ids:
|
|
print("✅ No users to verify!")
|
|
return True
|
|
|
|
print(f"✅ Found {len(telegram_user_ids):,} telegram users to verify")
|
|
print()
|
|
|
|
# Estimate cost (rough)
|
|
estimated_cost = len(telegram_user_ids) * 0.003 # ~$0.003 per user
|
|
print(f"💰 Estimated cost: ${estimated_cost:.4f}")
|
|
print()
|
|
|
|
# Initialize verifier
|
|
verifier = LLMVerifier()
|
|
|
|
print("🚀 Starting LLM verification...")
|
|
print(f"⚡ Concurrent requests: {concurrent_requests}")
|
|
print()
|
|
|
|
# Configuration for parallel processing
|
|
CONCURRENT_REQUESTS = concurrent_requests # Process N users at a time
|
|
BATCH_SIZE = 50 # Save checkpoint every 50 users
|
|
|
|
# Process users in parallel batches
|
|
total_users = len(telegram_user_ids)
|
|
processed_count = 0
|
|
|
|
for batch_start in range(0, total_users, BATCH_SIZE):
|
|
batch_end = min(batch_start + BATCH_SIZE, total_users)
|
|
batch = telegram_user_ids[batch_start:batch_end]
|
|
|
|
print(f"📦 Processing batch {batch_start//BATCH_SIZE + 1}/{(total_users + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} users)...")
|
|
|
|
# Create tasks for concurrent processing
|
|
tasks = []
|
|
for telegram_user_id in batch:
|
|
task = verify_telegram_user(
|
|
telegram_user_id,
|
|
verifier,
|
|
checkpoint_manager,
|
|
telegram_db,
|
|
telegram_conn,
|
|
log_file
|
|
)
|
|
tasks.append((telegram_user_id, task))
|
|
|
|
# Process batch concurrently with limit
|
|
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
|
|
|
|
async def process_with_semaphore(user_id, task):
|
|
async with semaphore:
|
|
return user_id, await task
|
|
|
|
results = await asyncio.gather(
|
|
*[process_with_semaphore(user_id, task) for user_id, task in tasks],
|
|
return_exceptions=True
|
|
)
|
|
|
|
# Process results and update checkpoints
|
|
for i, result in enumerate(results):
|
|
processed_count += 1
|
|
user_id = batch[i]
|
|
|
|
if isinstance(result, Exception):
|
|
print(f"[{processed_count}/{total_users}] ❌ User {user_id} failed: {result}")
|
|
continue
|
|
|
|
user_id_result, verification_result = result
|
|
print(f"[{processed_count}/{total_users}] ✅ User {user_id_result}: {verification_result['matches_saved']} matches | ${verification_result['cost']:.4f}")
|
|
|
|
# Update checkpoint
|
|
checkpoint_manager.update(
|
|
user_id_result,
|
|
verification_result['matches_saved'],
|
|
verification_result['tokens'],
|
|
verification_result['cost']
|
|
)
|
|
|
|
print(f" Batch complete. Total processed: {processed_count}/{total_users}")
|
|
print()
|
|
|
|
# Final stats
|
|
print()
|
|
print("=" * 70)
|
|
print("✅ VERIFICATION COMPLETE")
|
|
print("=" * 70)
|
|
print()
|
|
print(f"📊 Statistics:")
|
|
print(f" Processed: {checkpoint_manager.data['processed_count']:,} telegram users")
|
|
print(f" 💾 Saved matches: {checkpoint_manager.data['total_matches_saved']:,}")
|
|
print()
|
|
print(f"💰 Cost:")
|
|
print(f" Total tokens: {checkpoint_manager.data['total_tokens']:,}")
|
|
print(f" Total cost: ${checkpoint_manager.data['total_cost']:.4f}")
|
|
print()
|
|
|
|
# Clean up checkpoint
|
|
CHECKPOINT_FILE.unlink(missing_ok=True)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
finally:
|
|
if log_file:
|
|
log_file.close()
|
|
telegram_db.close()
|
|
telegram_conn.close()
|
|
checkpoint_manager.save()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
success = asyncio.run(main())
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
print("💾 Progress saved - you can resume by running this script again")
|
|
sys.exit(1)
|