mirror of
https://github.com/lockin-bot/ProfileMatching.git
synced 2026-01-12 09:44:30 +08:00
This module provides comprehensive Twitter-to-Telegram profile matching and verification using 10 different matching methods and LLM verification. Features: - 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names) - URL resolution integration for t.co → t.me links - Async LLM verification with GPT-5-mini - Interactive menu system with real-time stats - Threaded candidate finding (~1.5 contacts/sec) - Comprehensive documentation and guides Key Components: - find_twitter_candidates.py: Core matching logic (10 methods) - find_twitter_candidates_threaded.py: Threaded implementation - verify_twitter_matches_v2.py: LLM verification (V5 prompt) - review_match_quality.py: Analysis and quality review - main.py: Interactive menu system - Complete documentation (README, CHANGELOG, QUICKSTART) Performance: - Candidate finding: ~16-18 hours for 43K contacts - LLM verification: ~23 hours for 43K users - Cost: ~$130 for full verification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
390 lines
12 KiB
Python
390 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Threading-based Twitter-Telegram Candidate Finder
|
|
|
|
Uses threading instead of multiprocessing for better macOS compatibility
|
|
and efficient I/O-bound parallel processing.
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values, DictCursor
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import argparse
|
|
from typing import List, Dict, Tuple
|
|
import time
|
|
import threading
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from models import Contact
|
|
|
|
# Import the matcher from the original script
|
|
from find_twitter_candidates import TwitterMatcher, TWITTER_DB_CONFIG
|
|
|
|
# Database configuration
|
|
TELEGRAM_DB_URL = 'postgresql://andrewjiang@localhost:5432/telegram_contacts'
|
|
|
|
# Thread-local storage for database connections
|
|
thread_local = threading.local()
|
|
|
|
|
|
def get_thread_connections():
|
|
"""Get or create database connections for this thread"""
|
|
if not hasattr(thread_local, 'twitter_conn'):
|
|
thread_local.twitter_conn = psycopg2.connect(**TWITTER_DB_CONFIG)
|
|
thread_local.telegram_conn = psycopg2.connect(
|
|
dbname='telegram_contacts',
|
|
user='andrewjiang',
|
|
host='localhost',
|
|
port=5432
|
|
)
|
|
thread_local.matcher = TwitterMatcher(
|
|
thread_local.twitter_conn,
|
|
thread_local.telegram_conn
|
|
)
|
|
|
|
return thread_local.twitter_conn, thread_local.telegram_conn, thread_local.matcher
|
|
|
|
|
|
def process_contact(contact: Contact) -> Tuple[bool, int, List[Dict], Dict]:
|
|
"""
|
|
Process a single contact in a worker thread.
|
|
|
|
Returns: (success, num_candidates, candidates, method_stats)
|
|
"""
|
|
import sys
|
|
import io
|
|
|
|
# Capture stderr to detect silent failures
|
|
old_stderr = sys.stderr
|
|
sys.stderr = stderr_capture = io.StringIO()
|
|
|
|
try:
|
|
# Get thread-local database connections
|
|
twitter_conn, telegram_conn, matcher = get_thread_connections()
|
|
|
|
# Find candidates
|
|
candidates = matcher.find_candidates_for_contact(contact)
|
|
|
|
# Check if any warnings were logged
|
|
stderr_content = stderr_capture.getvalue()
|
|
if stderr_content:
|
|
print(f" 🔍 Warnings for contact {contact.user_id}:")
|
|
print(f" {stderr_content}")
|
|
|
|
method_stats = {}
|
|
if candidates:
|
|
# Track method stats
|
|
for cand in candidates:
|
|
method = cand['match_method']
|
|
method_stats[method] = method_stats.get(method, 0) + 1
|
|
|
|
# Add contact info to each candidate for later insertion
|
|
for cand in candidates:
|
|
cand['telegram_user_id'] = contact.user_id
|
|
cand['account_id'] = contact.account_id
|
|
|
|
return True, len(candidates), candidates, method_stats
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ Error processing contact {contact.user_id}: {e}")
|
|
print(f" Contact details: username={contact.username}, display_name={getattr(contact, 'display_name', None)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False, 0, [], {}
|
|
finally:
|
|
sys.stderr = old_stderr
|
|
|
|
|
|
def save_candidates_batch(candidates: List[Dict], telegram_conn):
|
|
"""Save a batch of candidates to database"""
|
|
if not candidates:
|
|
return 0
|
|
|
|
insert_data = []
|
|
for cand in candidates:
|
|
tw = cand['twitter_profile']
|
|
match_signals = cand.get('match_details', {})
|
|
|
|
insert_data.append((
|
|
cand['account_id'],
|
|
cand['telegram_user_id'],
|
|
tw['id'],
|
|
tw['username'],
|
|
tw.get('name'),
|
|
tw.get('description'),
|
|
tw.get('location'),
|
|
tw.get('verified', False),
|
|
tw.get('is_blue_verified', False),
|
|
tw.get('followers_count', 0),
|
|
0, # candidate_rank (will be set later if needed)
|
|
cand['match_method'],
|
|
cand['baseline_confidence'],
|
|
psycopg2.extras.Json(match_signals),
|
|
True, # needs_llm_review
|
|
False, # llm_processed
|
|
))
|
|
|
|
with telegram_conn.cursor() as cur:
|
|
execute_values(cur, """
|
|
INSERT INTO twitter_match_candidates (
|
|
account_id,
|
|
telegram_user_id,
|
|
twitter_id,
|
|
twitter_username,
|
|
twitter_name,
|
|
twitter_bio,
|
|
twitter_location,
|
|
twitter_verified,
|
|
twitter_blue_verified,
|
|
twitter_followers_count,
|
|
candidate_rank,
|
|
match_method,
|
|
baseline_confidence,
|
|
match_signals,
|
|
needs_llm_review,
|
|
llm_processed
|
|
) VALUES %s
|
|
ON CONFLICT (telegram_user_id, twitter_id) DO NOTHING
|
|
""", insert_data, page_size=1000)
|
|
|
|
telegram_conn.commit()
|
|
|
|
return len(insert_data)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Find Twitter candidates for Telegram contacts (threading-based)')
|
|
parser.add_argument('--limit', type=int, help='Limit number of Telegram contacts to process')
|
|
parser.add_argument('--test', action='store_true', help='Test mode: process first 100 contacts only')
|
|
parser.add_argument('--workers', type=int, default=8,
|
|
help='Number of worker threads (default: 8)')
|
|
parser.add_argument('--user-id-min', type=int, help='Minimum user_id to process (for parallel ranges)')
|
|
parser.add_argument('--user-id-max', type=int, help='Maximum user_id to process (for parallel ranges)')
|
|
parser.add_argument('--range-name', type=str, help='Name for this range (for logging)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
num_workers = args.workers
|
|
limit = args.limit
|
|
user_id_min = args.user_id_min
|
|
user_id_max = args.user_id_max
|
|
range_name = args.range_name or "full"
|
|
|
|
print("=" * 70)
|
|
print(f"🔍 Twitter-Telegram Candidate Finder (THREADED) - Range: {range_name}")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
if args.test:
|
|
limit = 100
|
|
print("🧪 TEST MODE: Processing first 100 contacts only")
|
|
print()
|
|
elif limit:
|
|
print(f"📊 LIMIT MODE: Processing first {limit:,} contacts")
|
|
print()
|
|
|
|
if user_id_min is not None and user_id_max is not None:
|
|
print(f"📍 User ID Range: {user_id_min:,} to {user_id_max:,}")
|
|
print()
|
|
|
|
print(f"🧵 Worker threads: {num_workers}")
|
|
print()
|
|
|
|
# Load contacts using raw psycopg2
|
|
print("📡 Loading Telegram contacts...")
|
|
|
|
conn = psycopg2.connect(
|
|
dbname='telegram_contacts',
|
|
user='andrewjiang',
|
|
host='localhost',
|
|
port=5432
|
|
)
|
|
|
|
with conn.cursor() as cur:
|
|
# First, get list of already processed contacts
|
|
cur.execute("""
|
|
SELECT DISTINCT telegram_user_id
|
|
FROM twitter_match_candidates
|
|
""")
|
|
already_processed = set(row[0] for row in cur.fetchall())
|
|
|
|
print(f"📋 Already processed: {len(already_processed):,} contacts (will skip)")
|
|
print()
|
|
|
|
query = """
|
|
SELECT account_id, user_id, display_name, first_name, last_name, username, phone, bio, is_bot, is_deleted
|
|
FROM contacts
|
|
WHERE user_id > 0
|
|
AND is_deleted = false
|
|
AND is_bot = false
|
|
AND (bio IS NOT NULL OR username IS NOT NULL)
|
|
"""
|
|
|
|
# Add user_id range filter if specified
|
|
if user_id_min is not None:
|
|
query += f" AND user_id >= {user_id_min}"
|
|
if user_id_max is not None:
|
|
query += f" AND user_id <= {user_id_max}"
|
|
|
|
query += " ORDER BY user_id"
|
|
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
|
|
conn.close()
|
|
|
|
# Convert to Contact objects, skipping already processed
|
|
contacts = []
|
|
skipped = 0
|
|
for row in rows:
|
|
user_id = row[1]
|
|
|
|
# Skip if already processed
|
|
if user_id in already_processed:
|
|
skipped += 1
|
|
continue
|
|
|
|
contact = Contact(
|
|
account_id=row[0],
|
|
user_id=user_id,
|
|
display_name=row[2],
|
|
first_name=row[3],
|
|
last_name=row[4],
|
|
username=row[5],
|
|
phone=row[6],
|
|
bio=row[7],
|
|
is_bot=row[8],
|
|
is_deleted=row[9]
|
|
)
|
|
contacts.append(contact)
|
|
|
|
total_contacts = len(contacts)
|
|
print(f"✅ Found {total_contacts:,} NEW contacts to process (skipped {skipped:,} already done)")
|
|
print()
|
|
|
|
print("🚀 Processing contacts with thread pool...")
|
|
print()
|
|
|
|
start_time = time.time()
|
|
|
|
# Stats tracking
|
|
total_processed = 0
|
|
total_with_candidates = 0
|
|
all_candidates = []
|
|
combined_method_stats = {}
|
|
total_saved = 0
|
|
|
|
# Database connection for incremental saves
|
|
telegram_conn = psycopg2.connect(
|
|
dbname='telegram_contacts',
|
|
user='andrewjiang',
|
|
host='localhost',
|
|
port=5432
|
|
)
|
|
|
|
# Process with ThreadPoolExecutor
|
|
with ThreadPoolExecutor(max_workers=num_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_contact = {
|
|
executor.submit(process_contact, contact): contact
|
|
for contact in contacts
|
|
}
|
|
|
|
# Process results as they complete
|
|
for i, future in enumerate(as_completed(future_to_contact), 1):
|
|
try:
|
|
success, num_candidates, candidates, method_stats = future.result()
|
|
|
|
if success:
|
|
total_processed += 1
|
|
|
|
if candidates:
|
|
total_with_candidates += 1
|
|
all_candidates.extend(candidates)
|
|
|
|
# Update method stats
|
|
for method, count in method_stats.items():
|
|
combined_method_stats[method] = combined_method_stats.get(method, 0) + count
|
|
|
|
# Save to database every 100 contacts
|
|
if len(all_candidates) >= 100:
|
|
saved = save_candidates_batch(all_candidates, telegram_conn)
|
|
total_saved += saved
|
|
all_candidates = [] # Clear buffer
|
|
|
|
# Progress update every 10 contacts
|
|
if i % 10 == 0 or i == 1:
|
|
elapsed = time.time() - start_time
|
|
rate = total_processed / elapsed if elapsed > 0 else 0
|
|
remaining = total_contacts - total_processed
|
|
eta_seconds = remaining / rate if rate > 0 else 0
|
|
eta_hours = eta_seconds / 3600
|
|
|
|
print(f" Progress: {total_processed}/{total_contacts} ({total_processed/total_contacts*100:.1f}%) | "
|
|
f"{total_saved + len(all_candidates):,} candidates (💾 {total_saved:,} saved) | "
|
|
f"Rate: {rate:.1f}/sec | "
|
|
f"ETA: {eta_hours:.1f}h")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Error processing future: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Save any remaining candidates
|
|
if all_candidates:
|
|
saved = save_candidates_batch(all_candidates, telegram_conn)
|
|
total_saved += saved
|
|
|
|
telegram_conn.close()
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
print()
|
|
print(f"⏱️ Processing completed in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
|
|
print(f" Rate: {total_processed/elapsed:.1f} contacts/second")
|
|
print()
|
|
print(f"💾 Total saved: {total_saved:,} candidates")
|
|
print()
|
|
|
|
# Print stats
|
|
print("=" * 70)
|
|
print("✅ CANDIDATE FINDING COMPLETE")
|
|
print("=" * 70)
|
|
print()
|
|
print(f"📊 Statistics:")
|
|
print(f" Processed: {total_processed:,} contacts")
|
|
print(f" With candidates: {total_with_candidates:,} ({total_with_candidates/total_processed*100:.1f}%)")
|
|
print(f" Total candidates: {total_saved:,}")
|
|
if total_with_candidates > 0:
|
|
print(f" Avg candidates per match: {total_saved/total_with_candidates:.1f}")
|
|
print()
|
|
print(f"📈 By method:")
|
|
for method, count in sorted(combined_method_stats.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {method}: {count:,}")
|
|
print()
|
|
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
success = main()
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n\n❌ Interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|