Files
ProfileMatching/find_twitter_candidates_threaded.py
Andrew Jiang 5319d4d868 Initial commit: Twitter-Telegram Profile Matching System
This module provides comprehensive Twitter-to-Telegram profile matching
and verification using 10 different matching methods and LLM verification.

Features:
- 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names)
- URL resolution integration for t.co → t.me links
- Async LLM verification with GPT-5-mini
- Interactive menu system with real-time stats
- Threaded candidate finding (~1.5 contacts/sec)
- Comprehensive documentation and guides

Key Components:
- find_twitter_candidates.py: Core matching logic (10 methods)
- find_twitter_candidates_threaded.py: Threaded implementation
- verify_twitter_matches_v2.py: LLM verification (V5 prompt)
- review_match_quality.py: Analysis and quality review
- main.py: Interactive menu system
- Complete documentation (README, CHANGELOG, QUICKSTART)

Performance:
- Candidate finding: ~16-18 hours for 43K contacts
- LLM verification: ~23 hours for 43K users
- Cost: ~$130 for full verification

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 22:56:25 -08:00

390 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Threading-based Twitter-Telegram Candidate Finder
Uses threading instead of multiprocessing for better macOS compatibility
and efficient I/O-bound parallel processing.
"""
import sys
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_values, DictCursor
from concurrent.futures import ThreadPoolExecutor, as_completed
import argparse
from typing import List, Dict, Tuple
import time
import threading
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Contact
# Import the matcher from the original script
from find_twitter_candidates import TwitterMatcher, TWITTER_DB_CONFIG
# Database configuration
TELEGRAM_DB_URL = 'postgresql://andrewjiang@localhost:5432/telegram_contacts'
# Thread-local storage for database connections
thread_local = threading.local()
def get_thread_connections():
"""Get or create database connections for this thread"""
if not hasattr(thread_local, 'twitter_conn'):
thread_local.twitter_conn = psycopg2.connect(**TWITTER_DB_CONFIG)
thread_local.telegram_conn = psycopg2.connect(
dbname='telegram_contacts',
user='andrewjiang',
host='localhost',
port=5432
)
thread_local.matcher = TwitterMatcher(
thread_local.twitter_conn,
thread_local.telegram_conn
)
return thread_local.twitter_conn, thread_local.telegram_conn, thread_local.matcher
def process_contact(contact: Contact) -> Tuple[bool, int, List[Dict], Dict]:
"""
Process a single contact in a worker thread.
Returns: (success, num_candidates, candidates, method_stats)
"""
import sys
import io
# Capture stderr to detect silent failures
old_stderr = sys.stderr
sys.stderr = stderr_capture = io.StringIO()
try:
# Get thread-local database connections
twitter_conn, telegram_conn, matcher = get_thread_connections()
# Find candidates
candidates = matcher.find_candidates_for_contact(contact)
# Check if any warnings were logged
stderr_content = stderr_capture.getvalue()
if stderr_content:
print(f" 🔍 Warnings for contact {contact.user_id}:")
print(f" {stderr_content}")
method_stats = {}
if candidates:
# Track method stats
for cand in candidates:
method = cand['match_method']
method_stats[method] = method_stats.get(method, 0) + 1
# Add contact info to each candidate for later insertion
for cand in candidates:
cand['telegram_user_id'] = contact.user_id
cand['account_id'] = contact.account_id
return True, len(candidates), candidates, method_stats
except Exception as e:
print(f" ⚠️ Error processing contact {contact.user_id}: {e}")
print(f" Contact details: username={contact.username}, display_name={getattr(contact, 'display_name', None)}")
import traceback
traceback.print_exc()
return False, 0, [], {}
finally:
sys.stderr = old_stderr
def save_candidates_batch(candidates: List[Dict], telegram_conn):
"""Save a batch of candidates to database"""
if not candidates:
return 0
insert_data = []
for cand in candidates:
tw = cand['twitter_profile']
match_signals = cand.get('match_details', {})
insert_data.append((
cand['account_id'],
cand['telegram_user_id'],
tw['id'],
tw['username'],
tw.get('name'),
tw.get('description'),
tw.get('location'),
tw.get('verified', False),
tw.get('is_blue_verified', False),
tw.get('followers_count', 0),
0, # candidate_rank (will be set later if needed)
cand['match_method'],
cand['baseline_confidence'],
psycopg2.extras.Json(match_signals),
True, # needs_llm_review
False, # llm_processed
))
with telegram_conn.cursor() as cur:
execute_values(cur, """
INSERT INTO twitter_match_candidates (
account_id,
telegram_user_id,
twitter_id,
twitter_username,
twitter_name,
twitter_bio,
twitter_location,
twitter_verified,
twitter_blue_verified,
twitter_followers_count,
candidate_rank,
match_method,
baseline_confidence,
match_signals,
needs_llm_review,
llm_processed
) VALUES %s
ON CONFLICT (telegram_user_id, twitter_id) DO NOTHING
""", insert_data, page_size=1000)
telegram_conn.commit()
return len(insert_data)
def main():
parser = argparse.ArgumentParser(description='Find Twitter candidates for Telegram contacts (threading-based)')
parser.add_argument('--limit', type=int, help='Limit number of Telegram contacts to process')
parser.add_argument('--test', action='store_true', help='Test mode: process first 100 contacts only')
parser.add_argument('--workers', type=int, default=8,
help='Number of worker threads (default: 8)')
parser.add_argument('--user-id-min', type=int, help='Minimum user_id to process (for parallel ranges)')
parser.add_argument('--user-id-max', type=int, help='Maximum user_id to process (for parallel ranges)')
parser.add_argument('--range-name', type=str, help='Name for this range (for logging)')
args = parser.parse_args()
num_workers = args.workers
limit = args.limit
user_id_min = args.user_id_min
user_id_max = args.user_id_max
range_name = args.range_name or "full"
print("=" * 70)
print(f"🔍 Twitter-Telegram Candidate Finder (THREADED) - Range: {range_name}")
print("=" * 70)
print()
if args.test:
limit = 100
print("🧪 TEST MODE: Processing first 100 contacts only")
print()
elif limit:
print(f"📊 LIMIT MODE: Processing first {limit:,} contacts")
print()
if user_id_min is not None and user_id_max is not None:
print(f"📍 User ID Range: {user_id_min:,} to {user_id_max:,}")
print()
print(f"🧵 Worker threads: {num_workers}")
print()
# Load contacts using raw psycopg2
print("📡 Loading Telegram contacts...")
conn = psycopg2.connect(
dbname='telegram_contacts',
user='andrewjiang',
host='localhost',
port=5432
)
with conn.cursor() as cur:
# First, get list of already processed contacts
cur.execute("""
SELECT DISTINCT telegram_user_id
FROM twitter_match_candidates
""")
already_processed = set(row[0] for row in cur.fetchall())
print(f"📋 Already processed: {len(already_processed):,} contacts (will skip)")
print()
query = """
SELECT account_id, user_id, display_name, first_name, last_name, username, phone, bio, is_bot, is_deleted
FROM contacts
WHERE user_id > 0
AND is_deleted = false
AND is_bot = false
AND (bio IS NOT NULL OR username IS NOT NULL)
"""
# Add user_id range filter if specified
if user_id_min is not None:
query += f" AND user_id >= {user_id_min}"
if user_id_max is not None:
query += f" AND user_id <= {user_id_max}"
query += " ORDER BY user_id"
if limit:
query += f" LIMIT {limit}"
cur.execute(query)
rows = cur.fetchall()
conn.close()
# Convert to Contact objects, skipping already processed
contacts = []
skipped = 0
for row in rows:
user_id = row[1]
# Skip if already processed
if user_id in already_processed:
skipped += 1
continue
contact = Contact(
account_id=row[0],
user_id=user_id,
display_name=row[2],
first_name=row[3],
last_name=row[4],
username=row[5],
phone=row[6],
bio=row[7],
is_bot=row[8],
is_deleted=row[9]
)
contacts.append(contact)
total_contacts = len(contacts)
print(f"✅ Found {total_contacts:,} NEW contacts to process (skipped {skipped:,} already done)")
print()
print("🚀 Processing contacts with thread pool...")
print()
start_time = time.time()
# Stats tracking
total_processed = 0
total_with_candidates = 0
all_candidates = []
combined_method_stats = {}
total_saved = 0
# Database connection for incremental saves
telegram_conn = psycopg2.connect(
dbname='telegram_contacts',
user='andrewjiang',
host='localhost',
port=5432
)
# Process with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks
future_to_contact = {
executor.submit(process_contact, contact): contact
for contact in contacts
}
# Process results as they complete
for i, future in enumerate(as_completed(future_to_contact), 1):
try:
success, num_candidates, candidates, method_stats = future.result()
if success:
total_processed += 1
if candidates:
total_with_candidates += 1
all_candidates.extend(candidates)
# Update method stats
for method, count in method_stats.items():
combined_method_stats[method] = combined_method_stats.get(method, 0) + count
# Save to database every 100 contacts
if len(all_candidates) >= 100:
saved = save_candidates_batch(all_candidates, telegram_conn)
total_saved += saved
all_candidates = [] # Clear buffer
# Progress update every 10 contacts
if i % 10 == 0 or i == 1:
elapsed = time.time() - start_time
rate = total_processed / elapsed if elapsed > 0 else 0
remaining = total_contacts - total_processed
eta_seconds = remaining / rate if rate > 0 else 0
eta_hours = eta_seconds / 3600
print(f" Progress: {total_processed}/{total_contacts} ({total_processed/total_contacts*100:.1f}%) | "
f"{total_saved + len(all_candidates):,} candidates (💾 {total_saved:,} saved) | "
f"Rate: {rate:.1f}/sec | "
f"ETA: {eta_hours:.1f}h")
except Exception as e:
print(f" ❌ Error processing future: {e}")
import traceback
traceback.print_exc()
# Save any remaining candidates
if all_candidates:
saved = save_candidates_batch(all_candidates, telegram_conn)
total_saved += saved
telegram_conn.close()
elapsed = time.time() - start_time
print()
print(f"⏱️ Processing completed in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
print(f" Rate: {total_processed/elapsed:.1f} contacts/second")
print()
print(f"💾 Total saved: {total_saved:,} candidates")
print()
# Print stats
print("=" * 70)
print("✅ CANDIDATE FINDING COMPLETE")
print("=" * 70)
print()
print(f"📊 Statistics:")
print(f" Processed: {total_processed:,} contacts")
print(f" With candidates: {total_with_candidates:,} ({total_with_candidates/total_processed*100:.1f}%)")
print(f" Total candidates: {total_saved:,}")
if total_with_candidates > 0:
print(f" Avg candidates per match: {total_saved/total_with_candidates:.1f}")
print()
print(f"📈 By method:")
for method, count in sorted(combined_method_stats.items(), key=lambda x: x[1], reverse=True):
print(f" {method}: {count:,}")
print()
return True
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n❌ Interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)