commit 5319d4d868a150838e49b24d96be9d9b3e844438 Author: Andrew Jiang Date: Tue Nov 4 22:56:25 2025 -0800 Initial commit: Twitter-Telegram Profile Matching System This module provides comprehensive Twitter-to-Telegram profile matching and verification using 10 different matching methods and LLM verification. Features: - 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names) - URL resolution integration for t.co โ†’ t.me links - Async LLM verification with GPT-5-mini - Interactive menu system with real-time stats - Threaded candidate finding (~1.5 contacts/sec) - Comprehensive documentation and guides Key Components: - find_twitter_candidates.py: Core matching logic (10 methods) - find_twitter_candidates_threaded.py: Threaded implementation - verify_twitter_matches_v2.py: LLM verification (V5 prompt) - review_match_quality.py: Analysis and quality review - main.py: Interactive menu system - Complete documentation (README, CHANGELOG, QUICKSTART) Performance: - Candidate finding: ~16-18 hours for 43K contacts - LLM verification: ~23 hours for 43K users - Cost: ~$130 for full verification ๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..80e50b9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +# Environment variables +.env +.env.local +.env.*.local + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +dist/ +*.egg-info/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Logs +*.log +*.out +*.pid + +# Database +*.db +*.sqlite +*.sqlite3 + +# Checkpoints and temp files +*_checkpoint.json +*.tmp +*.temp + +# OS +.DS_Store +Thumbs.db + +# Test outputs +test_output/ +*.test diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1a01248 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,159 @@ +# ProfileMatching Changelog + +## 2025-11-04 - Initial Module Creation & URL Resolution Integration + +### Module Organization +- Created dedicated `ProfileMatching/` folder within UnifiedContacts +- Bundled all matching and verification scripts together +- Added comprehensive interactive `main.py` with menu system +- Added detailed `README.md` documentation + +### Major Enhancement: URL Resolution Integration + +**Problem**: Missing matches where Twitter usernames differ from Telegram usernames, even when Twitter profile explicitly links to Telegram. + +**Solution**: Integrated `url_resolution_queue` table data into candidate finding. + +**Implementation**: +- Added new method `find_by_resolved_url()` to TwitterMatcher class (find_twitter_candidates.py:339-378) +- Queries Twitter profiles where bio URLs (t.co/xyz) resolve to t.me/{username} +- Integrated as Method 5b in candidate finding pipeline (find_twitter_candidates.py:554-574) +- Baseline confidence: 0.95 (very high - explicit link in bio) + +**Impact**: +- 140+ potential new matches identified +- Captures matches like Twitter @Block_Flash โ†’ Telegram @bull_flash +- Especially valuable when usernames differ but user explicitly links profiles + +**Example Match**: +``` +Twitter: @Block_Flash (ID: 63429948) +Telegram: @bull_flash +Method: twitter_bio_url_resolves_to_telegram +Confidence: 0.95 +Resolved URL: https://t.me/bull_flash +Original URL: https://t.co/dc3iztSG9B +``` + +### URL Resolution Data Source + +The `url_resolution_queue` table contains: +- 16,133 Twitter profiles with resolved Telegram URLs +- Shortened URLs from Twitter bios (t.co/xyz) +- Resolved destinations (full URLs) +- Extracted Telegram handles (JSONB array) + +### Files Modified + +1. **find_twitter_candidates.py** + - Added `find_by_resolved_url()` method + - Integrated into `find_candidates_for_contact()` as Method 5b + - Fixed type casting for twitter_id (VARCHAR) to user.id (BIGINT) join + +2. **main.py** (ProfileMatching module) + - Created comprehensive interactive menu + - Real-time statistics display + - Streamlined workflow for candidate finding and LLM verification + +3. **README.md** + - Complete documentation of all 10 matching methods + - Usage examples and performance metrics + - Configuration and troubleshooting guides + +4. **UnifiedContacts/main.py** + - Added option 15: "Open Profile Matching System (Interactive Menu)" + - Reorganized menu to separate ProfileMatching from individual Twitter steps + +### Current Statistics (as of 2025-11-04) + +**Candidates**: +- Users with candidates: 38,121 +- Total candidates found: 253,117 +- Processed by LLM: 253,085 +- Pending verification: 32 + +**Verified Matches**: +- Users with matches: 25,662 +- Total matches: 36,147 +- Average confidence: 0.74 +- High confidence (90%+): 12,031 +- Medium confidence (80-89%): 1,452 +- Low confidence (70-79%): 7,505 + +### LLM Verification (V6 Prompt) + +Current prompt improvements: +- Upfront directive for comparative evaluation +- Clear signal strength hierarchy (Very Strong, Strong Supporting, Weak, Red Flags) +- Company vs personal account differentiation +- Streamlined from ~135 to ~90 lines while being clearer +- Emphasis on evaluating ALL candidates together + +### Performance Metrics + +**Candidate Finding (Threaded)**: +- Speed: ~1.5 contacts/sec +- Time for 43K contacts: ~16-18 hours +- Workers: 8 (default) + +**LLM Verification (Async)**: +- Speed: ~32 users/minute (100 concurrent requests) +- Cost: ~$0.003 per user (GPT-5-mini) +- Time for 43K users: ~23 hours + +### Module Structure + +``` +ProfileMatching/ +โ”œโ”€โ”€ main.py # Interactive menu system +โ”œโ”€โ”€ README.md # Complete documentation +โ”œโ”€โ”€ CHANGELOG.md # This file +โ”œโ”€โ”€ find_twitter_candidates.py # Core matching logic (10 methods) +โ”œโ”€โ”€ find_twitter_candidates_threaded.py # Threaded implementation +โ”œโ”€โ”€ verify_twitter_matches_v2.py # LLM verification (V6 prompt) +โ”œโ”€โ”€ review_match_quality.py # Analysis tools +โ””โ”€โ”€ setup_twitter_matching_schema.sql # Database schema +``` + +### 10 Matching Methods Summary + +1. **Phash Match** (0.95/0.88) - Profile picture similarity +2. **Exact Bio Handle** (0.95) - Twitter handle extracted from Telegram bio +3. **Bio URL Resolution** (0.95) โญ NEW - Shortened URL resolves to Telegram +4. **Twitter Bio Has Telegram** (0.92) - Twitter bio mentions Telegram username +5. **Display Name Containment** (0.92) - TG name in TW name +6. **Exact Username** (0.90) - Usernames match exactly +7. **TG Username in Twitter Name** (0.88) +8. **Twitter Username in TG Name** (0.86) +9. **Fuzzy Name** (0.65-0.85) - Trigram similarity +10. **Username Variation** (0.80) - Generated username variations + +### Testing + +All changes tested with: +- Standalone method testing (find_by_resolved_url) +- Full integration testing (find_candidates_for_contact) +- Verified deduplication works correctly +- Confirmed matches with different usernames are captured + +### Next Steps + +Potential future enhancements: +- Add more matching methods (location, bio keywords, mutual connections) +- Implement feedback loop for prompt improvement +- Add manual review interface for borderline matches +- Export matches to various formats +- Additional URL resolution sources beyond Twitter bios + +### Migration Notes + +**For existing deployments**: +1. No database schema changes required +2. Existing `url_resolution_queue` table is used as-is +3. Scripts in `scripts/` folder remain unchanged and functional +4. New ProfileMatching module is additive, doesn't break existing workflows + +**To use new features**: +1. Use ProfileMatching/main.py instead of individual scripts +2. Or run scripts directly from ProfileMatching folder +3. Or update import paths to use ProfileMatching module diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..d586646 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,218 @@ +# ProfileMatching Quick Start Guide + +## ๐Ÿš€ Fastest Way to Get Started + +### Option 1: Interactive Menu (RECOMMENDED) +```bash +cd /Users/andrewjiang/Bao/TimeToLockIn/Profile/UnifiedContacts/ProfileMatching +python3.10 main.py +``` + +This gives you an interactive menu with: +- Real-time statistics +- Guided workflow +- Easy access to all features + +### Option 2: Launch from Main UnifiedContacts Menu +```bash +cd /Users/andrewjiang/Bao/TimeToLockIn/Profile/UnifiedContacts +python3.10 main.py +# Select option 15: "Open Profile Matching System" +``` + +## ๐Ÿ“‹ Typical Workflow + +### Step 1: Find Candidates (First Time) +If you haven't found candidates yet, run: + +```bash +# From ProfileMatching folder +python3.10 find_twitter_candidates_threaded.py --workers 8 + +# Or use interactive menu: Option 1 +``` + +**Expected time**: ~16-18 hours for all 43K contacts + +### Step 2: Verify with LLM +After candidates are found, verify them: + +```bash +# From ProfileMatching folder +python3.10 verify_twitter_matches_v2.py --verbose --concurrent 100 + +# Or use interactive menu: Option 3 +``` + +**Expected time**: ~23 hours for all users +**Cost**: ~$130 for all users (GPT-5-mini at $0.003/user) + +### Step 3: Review Results +```bash +# From ProfileMatching folder +python3.10 review_match_quality.py + +# Or use interactive menu: Option 5 +``` + +## ๐Ÿงช Test Mode (Recommended Before Full Run) + +Always test with a small batch first: + +```bash +# Test with 50 users +python3.10 verify_twitter_matches_v2.py --test --limit 50 --verbose --concurrent 10 + +# Or use interactive menu: Option 4 +``` + +This helps you: +- Verify the system is working correctly +- Check match quality before spending on full run +- Estimate costs and timing + +## ๐Ÿ“Š Check Current Status + +At any time, you can check where you're at: + +```bash +# Launch interactive menu and select Option 6: "Show statistics only" +python3.10 main.py +# Press 6, then 0 to exit +``` + +Or query directly: + +```bash +psql -d telegram_contacts -U andrewjiang -c " +SELECT + COUNT(DISTINCT telegram_user_id) as users_with_candidates, + COUNT(*) as total_candidates, + COUNT(*) FILTER (WHERE llm_processed = TRUE) as processed, + COUNT(*) FILTER (WHERE llm_processed = FALSE) as pending +FROM twitter_match_candidates; +" +``` + +## ๐Ÿ”„ Re-Running After Updates + +If you've updated the LLM prompt or matching logic: + +### Re-find Candidates (if matching logic changed) +```bash +# Delete old candidates +psql -d telegram_contacts -U andrewjiang -c "TRUNCATE twitter_match_candidates CASCADE;" + +# Re-run candidate finding +python3.10 find_twitter_candidates_threaded.py --workers 8 +``` + +### Re-verify with New Prompt (if only prompt changed) +```bash +# Reset LLM processing flag +psql -d telegram_contacts -U andrewjiang -c "UPDATE twitter_match_candidates SET llm_processed = FALSE;" + +# Delete old matches +psql -d telegram_contacts -U andrewjiang -c "TRUNCATE twitter_telegram_matches;" + +# Re-run verification +python3.10 verify_twitter_matches_v2.py --verbose --concurrent 100 +``` + +## ๐ŸŽฏ Most Common Commands + +### Find candidates for first 1000 contacts (testing) +```bash +python3.10 find_twitter_candidates_threaded.py --limit 1000 --workers 8 +``` + +### Verify matches for pending candidates +```bash +python3.10 verify_twitter_matches_v2.py --verbose --concurrent 100 +``` + +### Check match quality distribution +```bash +python3.10 review_match_quality.py +``` + +### Export matches to CSV (coming soon) +```bash +# Will be added in future update +``` + +## ๐Ÿ’ก Pro Tips + +1. **Always use threaded candidate finding** - It's 10-20x faster +2. **Use high concurrency for verification** - 100-200 concurrent requests for optimal speed +3. **Test first** - Always run with `--test --limit 50` before full runs +4. **Monitor costs** - Check OpenAI dashboard during verification +5. **Check the stats** - Use Option 6 in interactive menu to monitor progress + +## ๐Ÿ› Troubleshooting + +### "No candidates found" +- Check if Twitter database has data: `psql -d twitter_data -c "SELECT COUNT(*) FROM users;"` +- Check Telegram contacts: `psql -d telegram_contacts -c "SELECT COUNT(*) FROM contacts;"` + +### "LLM verification is slow" +- Increase `--concurrent` parameter (try 150-200) +- Check OpenAI rate limits in dashboard +- Verify network connection + +### "Too many low-quality matches" +- Review the V6 prompt in `verify_twitter_matches_v2.py` +- Run `review_match_quality.py` to analyze +- Consider adjusting confidence thresholds + +### "Missing obvious matches" +- Check if candidate was found: + ```sql + SELECT * FROM twitter_match_candidates WHERE telegram_user_id = YOUR_USER_ID; + ``` +- If found but not verified, check `llm_verdict` field for reasoning +- If not found at all, may need new matching method + +## ๐Ÿ“š More Information + +- See `README.md` for complete documentation +- See `CHANGELOG.md` for recent updates +- See individual script files for command-line options + +## ๐Ÿ†˜ Need Help? + +Common issues and solutions: + +| Issue | Solution | +|-------|----------| +| Import errors | Make sure you're using python3.10 | +| Database connection errors | Check PostgreSQL is running: `pg_isready` | +| OpenAI API errors | Verify API key in `.env` file | +| Out of memory | Reduce concurrent requests or use batching | + +## ๐ŸŽ“ Understanding the Output + +### Candidate Finding Output +``` +Processing contact 1000/43000 (2.3%) +Found 6 candidates for @username + โ€ข exact_username: @username (0.90) + โ€ข fuzzy_name: @similar_name (0.75) +``` + +### LLM Verification Output +``` +[Progress] 500/1000 users (50.0%) | 125 matches | ~$1.50 | 25.0 users/min +``` + +### Match Quality Review +``` +Total users with matches: 25,662 +Total matches: 36,147 +Average confidence: 0.74 + +Confidence Distribution: + 90%+: 12,031 matches (HIGH) + 80-89%: 1,452 matches (MEDIUM) + 70-79%: 7,505 matches (LOW) +``` diff --git a/README.md b/README.md new file mode 100644 index 0000000..358eb55 --- /dev/null +++ b/README.md @@ -0,0 +1,207 @@ +# Twitter-Telegram Profile Matching System + +A comprehensive system for finding and verifying Twitter-Telegram profile matches using multiple matching methods and LLM-based verification. + +## Overview + +This system operates in two main steps: + +1. **Candidate Finding**: Discovers potential Twitter profiles that match Telegram contacts using 10 different matching methods +2. **LLM Verification**: Uses GPT to evaluate candidates and assign confidence scores (0.70-1.0) + +## Quick Start + +```bash +cd /Users/andrewjiang/Bao/TimeToLockIn/Profile/UnifiedContacts/ProfileMatching +python3.10 main.py +``` + +## Matching Methods + +The system uses 10 different methods to find Twitter candidates: + +### High Confidence Methods (0.90-0.95) + +1. **Phash Match** (0.95 for exact, 0.88 for distance=1) + - Compares profile picture hashes + - Pre-computed in `telegram_twitter_phash_matches` table + +2. **Exact Bio Handle** (0.95) + - Extracts Twitter handles from Telegram bio + - Patterns: `@username`, `twitter.com/username`, `x.com/username` + +3. **Bio URL Resolution** (0.95) โญ NEW + - Twitter bio contains shortened URL (t.co/xyz) that resolves to `t.me/username` + - Queries `url_resolution_queue` table + - Captures matches even when usernames differ + +4. **Twitter Bio Has Telegram** (0.92) + - Reverse lookup: Twitter bio mentions Telegram username + - Searches for `@username`, `t.me/username`, `telegram.me/username` + +5. **Display Name Containment** (0.92) + - Telegram name contained within Twitter display name + +6. **Exact Username** (0.90) + - Telegram username exactly matches Twitter username + +### Medium Confidence Methods (0.80-0.88) + +7. **TG Username in Twitter Name** (0.88) +8. **Twitter Username in TG Name** (0.86) +9. **Fuzzy Name** (0.65-0.85) + - PostgreSQL trigram similarity with 0.65 threshold +10. **Username Variation** (0.80) + - Generates variations (remove underscores, flip numbers, etc.) + +## LLM Verification + +The system uses GPT-5-mini with a sophisticated V6 prompt that: + +- Evaluates ALL candidates together (comparative evaluation) +- Applies differential scoring (only one can be "most likely") +- Distinguishes between personal and company accounts +- Considers signal strength holistically +- Only saves matches with 70%+ confidence + +## Files + +### Core Scripts + +- `main.py` - Interactive menu for running the system +- `find_twitter_candidates.py` - Core matching logic (TwitterMatcher class) +- `find_twitter_candidates_threaded.py` - Threaded implementation (RECOMMENDED) +- `verify_twitter_matches_v2.py` - LLM verification with async (RECOMMENDED) +- `review_match_quality.py` - Analyze match quality and statistics + +### Database Schema + +- `setup_twitter_matching_schema.sql` - Database tables and indexes + +## Database Tables + +### `twitter_match_candidates` +Stores all potential matches found by the matching methods. + +**Key fields:** +- `telegram_user_id` - Telegram contact user ID +- `twitter_id` - Twitter profile ID +- `match_method` - Which method found this candidate +- `baseline_confidence` - Initial confidence (0.0-1.0) +- `match_signals` - JSON with match details +- `llm_processed` - Whether LLM has evaluated this candidate + +### `twitter_telegram_matches` +Stores verified matches (70%+ confidence from LLM). + +**Key fields:** +- `telegram_user_id` - Telegram contact +- `twitter_id` - Matched Twitter profile +- `final_confidence` - LLM-assigned confidence (0.70-1.0) +- `llm_verdict` - LLM reasoning +- `match_method` - Original matching method +- `matched_at` - Timestamp + +### `url_resolution_queue` +Maps shortened URLs in Twitter bios to resolved URLs (including Telegram links). + +**Key fields:** +- `twitter_id` - Twitter profile ID +- `original_url` - Shortened URL (e.g., t.co/abc) +- `resolved_url` - Full URL (e.g., https://t.me/username) +- `telegram_handles` - Extracted Telegram handles (JSONB array) + +## Usage Examples + +### Find Candidates for All Contacts (Threaded) +```bash +python3.10 find_twitter_candidates_threaded.py --workers 8 +``` + +### Find Candidates for First 1000 Contacts +```bash +python3.10 find_twitter_candidates_threaded.py --limit 1000 --workers 8 +``` + +### Verify Matches with LLM (100 concurrent requests) +```bash +python3.10 verify_twitter_matches_v2.py --verbose --concurrent 100 +``` + +### Test Mode (50 users, 10 concurrent) +```bash +python3.10 verify_twitter_matches_v2.py --test --limit 50 --verbose --concurrent 10 +``` + +### Review Match Quality +```bash +python3.10 review_match_quality.py +``` + +## Performance + +### Candidate Finding (Threaded) +- **Speed**: ~1.5 contacts/sec +- **Time for 43K contacts**: ~16-18 hours +- **Workers**: 8 (default, configurable) + +### LLM Verification (Async) +- **Speed**: ~32 users/minute with 100 concurrent requests +- **Cost**: ~$0.003 per user (GPT-5-mini) +- **Time for 43K users**: ~23 hours + +## Recent Improvements + +### V6 Prompt (Latest) +- Upfront directive for comparative evaluation +- Clear signal strength hierarchy +- Company vs personal account differentiation +- Streamlined from ~135 to ~90 lines while being clearer + +### URL Resolution Integration +- Added Method 5b: Bio URL resolution +- Captures 140+ additional matches +- Especially valuable when usernames differ +- 0.95 baseline confidence (very high) + +## Configuration + +Environment variables (in `/Users/andrewjiang/Bao/TimeToLockIn/Profile/.env`): +``` +OPENAI_API_KEY=your_key_here +OPENAI_MODEL=gpt-5-mini +``` + +Database connections: +- `telegram_contacts` - Telegram contact data +- `twitter_data` - Twitter profile data + +## Tips + +1. **Always run threaded candidate finding** - 10-20x faster than single-threaded +2. **Use high concurrency for LLM verification** - 100+ concurrent requests for optimal speed +3. **Monitor costs** - Check OpenAI usage during verification +4. **Review match quality periodically** - Use `review_match_quality.py` to analyze results +5. **Test first** - Use `--test --limit 50` flags before full runs + +## Troubleshooting + +### LLM verification is slow +- Increase `--concurrent` parameter (try 100-200) +- Check OpenAI rate limits (1,000 RPM for Tier 1) + +### Many low-quality matches +- Review and adjust V6 prompt in `verify_twitter_matches_v2.py` +- Check `review_match_quality.py` for insights + +### Missing obvious matches +- Check if candidate was found: Query `twitter_match_candidates` +- If not found, may need new matching method +- If found but not verified, check LLM reasoning in `llm_verdict` + +## Future Enhancements + +- Add more matching methods (location, bio keywords, etc.) +- Implement feedback loop for prompt improvement +- Add manual review interface for borderline matches +- Export matches to various formats diff --git a/find_twitter_candidates.py b/find_twitter_candidates.py new file mode 100755 index 0000000..cdaa2ce --- /dev/null +++ b/find_twitter_candidates.py @@ -0,0 +1,851 @@ +#!/usr/bin/env python3 +""" +Twitter-Telegram Candidate Finder +Finds potential Twitter matches for Telegram contacts using: +1. Handle extraction from bios +2. Username variation generation +3. Fuzzy name matching +""" + +import sys +import re +import json +from pathlib import Path +from typing import List, Dict, Set, Tuple +import psycopg2 +from psycopg2.extras import DictCursor, execute_values + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from db_config import SessionLocal +from models import Contact + +# Twitter database connection (adjust as needed) +TWITTER_DB_CONFIG = { + 'dbname': 'twitter_data', + 'user': 'andrewjiang', # Adjust to your setup + 'host': 'localhost', + 'port': 5432 +} + + +class HandleExtractor: + """Extract Twitter handles from text""" + + @staticmethod + def extract_handles(text: str) -> List[str]: + """Extract Twitter handles from bio/text""" + if not text: + return [] + + handles = set() + + # Pattern 1: @username + pattern1 = r'@([a-zA-Z0-9_]{4,15})' + handles.update(re.findall(pattern1, text)) + + # Pattern 2: twitter.com/username or x.com/username + pattern2 = r'(?:twitter\.com|x\.com)/([a-zA-Z0-9_]{4,15})' + handles.update(re.findall(pattern2, text, re.IGNORECASE)) + + # Pattern 3: Clean standalone handles (risky, be conservative) + # Only if text is short and looks like a handle + if len(text) < 30 and text.count('@') == 1: + clean = text.strip('@').strip() + if re.match(r'^[a-zA-Z0-9_]{4,15}$', clean): + handles.add(clean) + + return [h.lower() for h in handles if len(h) >= 4] + + +class UsernameVariationGenerator: + """Generate Twitter handle variations from Telegram usernames""" + + @staticmethod + def generate_variations(telegram_username: str) -> List[str]: + """ + Generate possible Twitter handle variations + + Examples: + - alice_0x โ†’ [alice_0x, 0xalice, 0x_alice, alice0x] + - trader_69 โ†’ [trader_69, 69trader, 69_trader, trader69] + """ + if not telegram_username: + return [] + + variations = [telegram_username.lower()] + + # Remove underscores + no_underscore = telegram_username.replace('_', '') + if no_underscore != telegram_username and len(no_underscore) >= 4: + variations.append(no_underscore.lower()) + + # Handle "0x" patterns (common in crypto) + if '0x' in telegram_username.lower(): + # If 0x at end, try moving to front + if telegram_username.lower().endswith('_0x'): + base = telegram_username[:-3] + variations.extend([ + f"0x{base}".lower(), + f"0x_{base}".lower() + ]) + elif telegram_username.lower().endswith('0x'): + base = telegram_username[:-2] + variations.extend([ + f"0x{base}".lower(), + f"0x_{base}".lower() + ]) + + # Try without underscore + no_under = telegram_username.replace('_', '') + if '0x' in no_under.lower() and no_under.lower() not in variations: + variations.append(no_under.lower()) + + # Handle trailing numbers (alice_69 โ†’ 69alice) + match = re.match(r'^([a-z_]+?)_?(\d+)$', telegram_username, re.IGNORECASE) + if match: + prefix, number = match.groups() + prefix = prefix.rstrip('_') + if len(f"{number}{prefix}") >= 4: + variations.extend([ + f"{number}{prefix}".lower(), + f"{number}_{prefix}".lower() + ]) + + # Handle leading numbers (69_alice โ†’ alice69) + match = re.match(r'^(\d+)_?([a-z_]+)$', telegram_username, re.IGNORECASE) + if match: + number, suffix = match.groups() + suffix = suffix.lstrip('_') + if len(f"{suffix}{number}") >= 4: + variations.extend([ + f"{suffix}{number}".lower(), + f"{suffix}_{number}".lower() + ]) + + # Single character removals (banteg โ†’ bantg, trader โ†’ trade) + # This catches shortened versions of usernames + base = telegram_username.lower() + if len(base) > 4: # Only if result would be at least 4 chars + for i in range(len(base)): + variation = base[:i] + base[i+1:] + if len(variation) >= 4: + variations.append(variation) + + # Deduplicate and validate (4-15 chars for Twitter) + valid = [] + for v in set(variations): + v_clean = v.strip('_') + if 4 <= len(v_clean) <= 15: + valid.append(v_clean) + + return list(set(valid)) + + +class TwitterMatcher: + """Find Twitter profiles matching Telegram contacts""" + + def __init__(self, twitter_conn, telegram_conn=None): + self.twitter_conn = twitter_conn + self.telegram_conn = telegram_conn # Needed for phash lookups + self.handle_extractor = HandleExtractor() + self.variation_generator = UsernameVariationGenerator() + + def find_by_handle(self, handle: str) -> Dict: + """Lookup Twitter profile by exact handle""" + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE LOWER(username) = %s + LIMIT 1 + """, (handle.lower(),)) + + result = cur.fetchone() + return dict(result) if result else None + + def find_by_fuzzy_name(self, telegram_name: str, limit=3) -> List[Dict]: + """Find Twitter profiles with similar names using fuzzy matching""" + if not telegram_name or len(telegram_name) < 3: + return [] + + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + # Use parameterized query with proper escaping for % operator + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at, + similarity(name, %(name)s) AS name_score + FROM public.users + WHERE name %% %(name)s -- %% for similarity operator (escaped in string) + AND similarity(name, %(name)s) > 0.65 -- Increased threshold from 0.5 to reduce noise + ORDER BY name_score DESC + LIMIT %(limit)s + """, {'name': telegram_name, 'limit': limit}) + + return [dict(row) for row in cur.fetchall()] + + def find_by_display_name_containment(self, telegram_name: str, limit=5) -> List[Dict]: + """Find Twitter profiles where TG display name is contained in TW display name""" + if not telegram_name or len(telegram_name) < 3: + return [] + + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + # Direct containment search (case-insensitive) + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE name ILIKE %s -- TG name contained in TW name + AND LENGTH(name) >= %s -- TW name must be at least as long as TG name + ORDER BY followers_count DESC -- Prioritize by follower count + LIMIT %s + """, (f'%{telegram_name}%', len(telegram_name), limit)) + + return [dict(row) for row in cur.fetchall()] + + def find_by_phash_match(self, telegram_user_id: int) -> List[Dict]: + """Find Twitter profiles with matching profile picture phash (distance 0-1 only)""" + if not self.telegram_conn: + return [] + + with self.telegram_conn.cursor(cursor_factory=DictCursor) as cur: + # Query pre-computed phash matches (distance 0-1 for high confidence) + cur.execute(""" + SELECT + m.twitter_user_id, + m.twitter_username, + m.hamming_distance, + m.telegram_phash, + m.twitter_phash + FROM telegram_twitter_phash_matches m + WHERE m.telegram_user_id = %s + AND m.hamming_distance <= 1 -- Only exact and distance-1 matches + ORDER BY m.hamming_distance ASC + LIMIT 5 + """, (telegram_user_id,)) + + phash_matches = cur.fetchall() + + # Commit to close transaction + self.telegram_conn.commit() + + if not phash_matches: + return [] + + # Fetch full Twitter profile data for matched users + twitter_ids = [m['twitter_user_id'] for m in phash_matches] + + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE id = ANY(%s) + """, (twitter_ids,)) + + twitter_profiles = [dict(row) for row in cur.fetchall()] + + # Enrich with phash match details + twitter_profile_map = {str(p['id']): p for p in twitter_profiles} + results = [] + + for match in phash_matches: + tw_id = match['twitter_user_id'] + if tw_id in twitter_profile_map: + profile = twitter_profile_map[tw_id].copy() + profile['phash_distance'] = match['hamming_distance'] + profile['telegram_phash'] = match['telegram_phash'] + profile['twitter_phash'] = match['twitter_phash'] + results.append(profile) + + return results + + def find_by_telegram_in_twitter_bio(self, telegram_username: str, limit=3) -> List[Dict]: + """Find Twitter profiles that mention this Telegram username in their bio (exact @mention only)""" + if not telegram_username or len(telegram_username) < 4: + return [] + + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + # Search for various Telegram handle patterns in Twitter bios + # Use regex with word boundaries to avoid substring matches in other handles + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE description IS NOT NULL + AND ( + description ~* %s -- @username with word boundary (not part of longer handle) + OR LOWER(description) LIKE %s -- t.me/username + OR LOWER(description) LIKE %s -- telegram.me/username + ) + ORDER BY followers_count DESC + LIMIT %s + """, ( + r'@' + telegram_username + r'(\s|$|[^a-zA-Z0-9_])', # Word boundary: space, end, or non-alphanumeric + f'%t.me/{telegram_username.lower()}%', + f'%telegram.me/{telegram_username.lower()}%', + limit + )) + + return [dict(row) for row in cur.fetchall()] + + def find_by_resolved_url(self, telegram_username: str) -> List[Dict]: + """Find Twitter profiles whose bio URL resolves to this Telegram username""" + if not telegram_username or len(telegram_username) < 4: + return [] + + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + # Query url_resolution_queue for Twitter profiles with resolved Telegram links + cur.execute(""" + SELECT DISTINCT + u.id, + u.username, + u.name, + u.description, + u.location, + u.verified, + u.is_blue_verified, + u.followers_count, + u.following_count, + u.created_at, + urq.resolved_url, + urq.original_url + FROM url_resolution_queue urq + JOIN public.users u ON u.id::text = urq.twitter_id + WHERE urq.telegram_handles IS NOT NULL + AND urq.telegram_handles @> %s::jsonb + ORDER BY u.followers_count DESC + LIMIT 3 + """, (f'["{telegram_username.lower()}"]',)) + + results = cur.fetchall() + + # Enrich with URL details + enriched = [] + for row in results: + profile = dict(row) + profile['resolved_telegram_url'] = profile.pop('resolved_url') + profile['original_url'] = profile.pop('original_url') + enriched.append(profile) + + return enriched + + def find_by_username_in_display_name(self, search_term: str, is_telegram: bool, limit: int = 5) -> List[Dict]: + """ + Find Twitter profiles where display name contains username pattern + + If is_telegram=True: Search for TG username in Twitter display names + If is_telegram=False: Search for Twitter username pattern in TG display names (reverse: find Twitter profiles whose username matches the TG display name) + """ + with self.twitter_conn.cursor(cursor_factory=DictCursor) as cur: + if is_telegram: + # TG username in Twitter display name + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE name IS NOT NULL + AND LOWER(name) LIKE %s + ORDER BY followers_count DESC + LIMIT %s + """, (f'%{search_term.lower()}%', limit)) + else: + # Twitter username in TG display name - search for Twitter profiles whose username appears in the search term + cur.execute(""" + SELECT + id, + username, + name, + description, + location, + verified, + is_blue_verified, + followers_count, + following_count, + created_at + FROM public.users + WHERE username IS NOT NULL + AND LOWER(%s) LIKE '%%' || LOWER(username) || '%%' + ORDER BY followers_count DESC + LIMIT %s + """, (search_term, limit)) + + return [dict(row) for row in cur.fetchall()] + + def get_display_name(self, contact: Contact) -> str: + """Get display name with fallback to first_name + last_name""" + if contact.display_name: + return contact.display_name + # Fallback to first_name + last_name + parts = [contact.first_name, contact.last_name] + return ' '.join(p for p in parts if p).strip() + + def find_candidates_for_contact(self, contact: Contact) -> List[Dict]: + """ + Find all Twitter candidates for a single Telegram contact + + Returns list of candidates with: + { + 'twitter_profile': {...}, + 'match_method': 'exact_bio_handle' | 'exact_username' | 'username_variation' | 'fuzzy_name', + 'baseline_confidence': 0.0-1.0, + 'match_details': {...} + } + """ + candidates = [] + seen_twitter_ids = set() + + # Get display name with fallback + display_name = self.get_display_name(contact) + + # Method 1: Phash matching (profile picture similarity) + phash_matches = self.find_by_phash_match(contact.user_id) + for twitter_profile in phash_matches: + phash_distance = twitter_profile.pop('phash_distance') + telegram_phash = twitter_profile.pop('telegram_phash') + twitter_phash = twitter_profile.pop('twitter_phash') + + # Baseline confidence based on phash distance + if phash_distance == 0: + baseline_confidence = 0.95 # Exact match - VERY strong signal + elif phash_distance == 1: + baseline_confidence = 0.88 # 1-bit difference - strong signal + else: + continue # Skip distance > 1 + + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'phash_match', + 'baseline_confidence': baseline_confidence, + 'match_details': { + 'phash_distance': phash_distance, + 'telegram_phash': telegram_phash, + 'twitter_phash': twitter_phash + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 2: Extract handles from bio + if contact.bio: + bio_handles = self.handle_extractor.extract_handles(contact.bio) + for handle in bio_handles: + twitter_profile = self.find_by_handle(handle) + if twitter_profile and twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'exact_bio_handle', + 'baseline_confidence': 0.95, + 'match_details': { + 'extracted_handle': handle, + 'from_bio': True + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 3: Exact username match + if contact.username: + twitter_profile = self.find_by_handle(contact.username) + if twitter_profile and twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'exact_username', + 'baseline_confidence': 0.90, + 'match_details': { + 'telegram_username': contact.username, + 'twitter_username': twitter_profile['username'] + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 4: Username variations + if contact.username: + variations = self.variation_generator.generate_variations(contact.username) + for variation in variations: + if variation == contact.username.lower(): + continue # Already checked in Method 2 + + twitter_profile = self.find_by_handle(variation) + if twitter_profile and twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'username_variation', + 'baseline_confidence': 0.80, + 'match_details': { + 'telegram_username': contact.username, + 'username_variation': variation, + 'twitter_username': twitter_profile['username'] + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 5: Twitter bio contains Telegram username (reverse lookup) + if contact.username: + reverse_matches = self.find_by_telegram_in_twitter_bio(contact.username, limit=3) + for twitter_profile in reverse_matches: + if twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'twitter_bio_has_telegram', + 'baseline_confidence': 0.92, # Very high confidence - they explicitly mention their Telegram + 'match_details': { + 'telegram_username': contact.username, + 'twitter_username': twitter_profile['username'], + 'found_in_twitter_bio': True + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 5b: Twitter bio URL resolves to Telegram username (via url_resolution_queue) + if contact.username: + url_resolved_matches = self.find_by_resolved_url(contact.username) + for twitter_profile in url_resolved_matches: + if twitter_profile['id'] not in seen_twitter_ids: + resolved_url = twitter_profile.pop('resolved_telegram_url', None) + original_url = twitter_profile.pop('original_url', None) + + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'twitter_bio_url_resolves_to_telegram', + 'baseline_confidence': 0.95, # VERY high confidence - explicit URL link in bio + 'match_details': { + 'telegram_username': contact.username, + 'twitter_username': twitter_profile['username'], + 'resolved_url': resolved_url, + 'original_url': original_url, + 'found_via_url_resolution': True + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 6: Display name containment (TG name in TW name) + if display_name: + containment_matches = self.find_by_display_name_containment(display_name, limit=5) + for twitter_profile in containment_matches: + if twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'display_name_containment', + 'baseline_confidence': 0.92, # High confidence for exact name containment + 'match_details': { + 'telegram_name': display_name, + 'twitter_name': twitter_profile['name'], + 'match_type': 'tg_name_contained_in_tw_name' + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 7: Fuzzy name match (always run to find additional candidates) + if display_name: + fuzzy_matches = self.find_by_fuzzy_name(display_name, limit=5) + for i, twitter_profile in enumerate(fuzzy_matches): + name_score = twitter_profile.pop('name_score') + + # Calculate baseline confidence from name similarity + baseline_confidence = min(0.85, name_score) # Cap at 0.85 for fuzzy matches + + if twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'fuzzy_name', + 'baseline_confidence': baseline_confidence, + 'match_details': { + 'telegram_name': display_name, + 'twitter_name': twitter_profile['name'], + 'fuzzy_score': name_score, + 'candidate_rank': i + 1 + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 8: TG username in Twitter display name + if contact.username: + matches = self.find_by_username_in_display_name(contact.username, is_telegram=True, limit=3) + for twitter_profile in matches: + if twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'tg_username_in_twitter_name', + 'baseline_confidence': 0.88, + 'match_details': { + 'telegram_username': contact.username, + 'twitter_name': twitter_profile['name'], + 'found_in_display_name': True + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + # Method 9: Twitter username in TG display name + if display_name: + matches = self.find_by_username_in_display_name(display_name, is_telegram=False, limit=3) + for twitter_profile in matches: + if twitter_profile['id'] not in seen_twitter_ids: + candidates.append({ + 'twitter_profile': twitter_profile, + 'match_method': 'twitter_username_in_tg_name', + 'baseline_confidence': 0.86, + 'match_details': { + 'telegram_name': display_name, + 'twitter_username': twitter_profile['username'], + 'username_in_display_name': True + } + }) + seen_twitter_ids.add(twitter_profile['id']) + + return candidates + + +def save_candidates_to_db(telegram_user_id: int, account_id: int, candidates: List[Dict], telegram_db): + """Save candidates to twitter_match_candidates table""" + if not candidates: + return + + insert_data = [] + for cand in candidates: + tw = cand['twitter_profile'] + insert_data.append(( + account_id, # Add account_id + telegram_user_id, + tw['id'], + tw['username'], + tw['name'], + tw.get('description', ''), + tw.get('location'), + tw.get('verified', False), + tw.get('is_blue_verified', False), + tw.get('followers_count', 0), + cand.get('match_details', {}).get('candidate_rank', 1), + cand['match_method'], + cand['baseline_confidence'], + json.dumps(cand['match_details']) # Convert dict to JSON string + )) + + execute_values(telegram_db, """ + INSERT INTO twitter_match_candidates ( + account_id, + telegram_user_id, + twitter_id, + twitter_username, + twitter_name, + twitter_bio, + twitter_location, + twitter_verified, + twitter_blue_verified, + twitter_followers_count, + candidate_rank, + match_method, + baseline_confidence, + match_signals + ) VALUES %s + ON CONFLICT DO NOTHING + """, insert_data, template="""( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + )""") + + +def main(): + print() + print("=" * 70) + print("๐Ÿ” Twitter-Telegram Candidate Finder") + print("=" * 70) + print() + + # Check arguments + test_mode = '--test' in sys.argv + limit = 100 if test_mode else None + + # Check for --limit parameter + if '--limit' in sys.argv: + idx = sys.argv.index('--limit') + limit = int(sys.argv[idx + 1]) + print(f"๐Ÿ“Š LIMIT MODE: Processing first {limit:,} contacts") + print() + elif test_mode: + print("๐Ÿงช TEST MODE: Processing first 100 contacts only") + print() + + # Connect to databases + print("๐Ÿ“ก Connecting to databases...") + + # Connect SQLAlchemy to localhost (not RDS) + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + localhost_engine = create_engine('postgresql://andrewjiang@localhost:5432/telegram_contacts') + LocalSession = sessionmaker(bind=localhost_engine) + telegram_db = LocalSession() + + try: + twitter_conn = psycopg2.connect(**TWITTER_DB_CONFIG) + twitter_conn.autocommit = False + except Exception as e: + print(f"โŒ Failed to connect to Twitter database: {e}") + print(f" Config: {TWITTER_DB_CONFIG}") + return False + + # Also need psycopg2 connection to telegram DB for writing + try: + telegram_conn = psycopg2.connect(dbname='telegram_contacts', user='andrewjiang', host='localhost', port=5432) + telegram_conn.autocommit = False + except Exception as e: + print(f"โŒ Failed to connect to Telegram database: {e}") + return False + + print("โœ… Connected to both databases") + print() + + try: + # Initialize matcher (pass both connections for phash lookup) + telegram_psycopg_conn = psycopg2.connect( + dbname='telegram_contacts', + user='andrewjiang', + host='localhost' + ) + matcher = TwitterMatcher(twitter_conn, telegram_psycopg_conn) + + # Get Telegram contacts with bios or usernames + print("๐Ÿ” Loading Telegram contacts...") + query = telegram_db.query(Contact).filter( + Contact.user_id > 0, # Exclude channels + Contact.is_deleted == False, + Contact.is_bot == False + ).filter( + (Contact.bio != None) | (Contact.username != None) + ).order_by(Contact.user_id) + + if limit: + query = query.limit(limit) + + contacts = query.all() + print(f"โœ… Found {len(contacts):,} contacts to process") + print() + + # Process each contact + stats = { + 'processed': 0, + 'with_candidates': 0, + 'total_candidates': 0, + 'by_method': {} + } + + print("๐Ÿš€ Finding Twitter candidates...") + print() + + for i, contact in enumerate(contacts, 1): + candidates = matcher.find_candidates_for_contact(contact) + + if candidates: + stats['with_candidates'] += 1 + stats['total_candidates'] += len(candidates) + + # Track by method + for cand in candidates: + method = cand['match_method'] + stats['by_method'][method] = stats['by_method'].get(method, 0) + 1 + + # Save to database + with telegram_conn.cursor() as cur: + save_candidates_to_db(contact.user_id, contact.account_id, candidates, cur) + telegram_conn.commit() + + stats['processed'] += 1 + + # Progress update every 10 contacts + if i % 10 == 0: + print(f" Processed {i:,}/{len(contacts):,} contacts... (candidates: {stats['with_candidates']:,}, total: {stats['total_candidates']:,})") + elif i == 1: + # Show first contact immediately + print(f" Processed 1/{len(contacts):,} contacts... (candidates: {stats['with_candidates']:,}, total: {stats['total_candidates']:,})") + + # Final stats + print() + print("=" * 70) + print("โœ… CANDIDATE FINDING COMPLETE") + print("=" * 70) + print() + print(f"๐Ÿ“Š Statistics:") + print(f" Processed: {stats['processed']:,} contacts") + print(f" With candidates: {stats['with_candidates']:,} ({stats['with_candidates']/stats['processed']*100:.1f}%)") + print(f" Total candidates: {stats['total_candidates']:,}") + print(f" Avg candidates per match: {stats['total_candidates']/max(stats['with_candidates'], 1):.1f}") + print() + print(f"๐Ÿ“ˆ By method:") + for method, count in sorted(stats['by_method'].items(), key=lambda x: -x[1]): + print(f" {method}: {count:,}") + print() + + return True + + except Exception as e: + print(f"โŒ Error: {e}") + import traceback + traceback.print_exc() + return False + + finally: + telegram_db.close() + twitter_conn.close() + telegram_conn.close() + + +if __name__ == "__main__": + try: + success = main() + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\n\nโš ๏ธ Interrupted by user") + sys.exit(1) diff --git a/find_twitter_candidates_threaded.py b/find_twitter_candidates_threaded.py new file mode 100644 index 0000000..c426c18 --- /dev/null +++ b/find_twitter_candidates_threaded.py @@ -0,0 +1,389 @@ +#!/usr/bin/env python3 +""" +Threading-based Twitter-Telegram Candidate Finder + +Uses threading instead of multiprocessing for better macOS compatibility +and efficient I/O-bound parallel processing. +""" + +import sys +from pathlib import Path +import psycopg2 +from psycopg2.extras import execute_values, DictCursor +from concurrent.futures import ThreadPoolExecutor, as_completed +import argparse +from typing import List, Dict, Tuple +import time +import threading + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from models import Contact + +# Import the matcher from the original script +from find_twitter_candidates import TwitterMatcher, TWITTER_DB_CONFIG + +# Database configuration +TELEGRAM_DB_URL = 'postgresql://andrewjiang@localhost:5432/telegram_contacts' + +# Thread-local storage for database connections +thread_local = threading.local() + + +def get_thread_connections(): + """Get or create database connections for this thread""" + if not hasattr(thread_local, 'twitter_conn'): + thread_local.twitter_conn = psycopg2.connect(**TWITTER_DB_CONFIG) + thread_local.telegram_conn = psycopg2.connect( + dbname='telegram_contacts', + user='andrewjiang', + host='localhost', + port=5432 + ) + thread_local.matcher = TwitterMatcher( + thread_local.twitter_conn, + thread_local.telegram_conn + ) + + return thread_local.twitter_conn, thread_local.telegram_conn, thread_local.matcher + + +def process_contact(contact: Contact) -> Tuple[bool, int, List[Dict], Dict]: + """ + Process a single contact in a worker thread. + + Returns: (success, num_candidates, candidates, method_stats) + """ + import sys + import io + + # Capture stderr to detect silent failures + old_stderr = sys.stderr + sys.stderr = stderr_capture = io.StringIO() + + try: + # Get thread-local database connections + twitter_conn, telegram_conn, matcher = get_thread_connections() + + # Find candidates + candidates = matcher.find_candidates_for_contact(contact) + + # Check if any warnings were logged + stderr_content = stderr_capture.getvalue() + if stderr_content: + print(f" ๐Ÿ” Warnings for contact {contact.user_id}:") + print(f" {stderr_content}") + + method_stats = {} + if candidates: + # Track method stats + for cand in candidates: + method = cand['match_method'] + method_stats[method] = method_stats.get(method, 0) + 1 + + # Add contact info to each candidate for later insertion + for cand in candidates: + cand['telegram_user_id'] = contact.user_id + cand['account_id'] = contact.account_id + + return True, len(candidates), candidates, method_stats + + except Exception as e: + print(f" โš ๏ธ Error processing contact {contact.user_id}: {e}") + print(f" Contact details: username={contact.username}, display_name={getattr(contact, 'display_name', None)}") + import traceback + traceback.print_exc() + return False, 0, [], {} + finally: + sys.stderr = old_stderr + + +def save_candidates_batch(candidates: List[Dict], telegram_conn): + """Save a batch of candidates to database""" + if not candidates: + return 0 + + insert_data = [] + for cand in candidates: + tw = cand['twitter_profile'] + match_signals = cand.get('match_details', {}) + + insert_data.append(( + cand['account_id'], + cand['telegram_user_id'], + tw['id'], + tw['username'], + tw.get('name'), + tw.get('description'), + tw.get('location'), + tw.get('verified', False), + tw.get('is_blue_verified', False), + tw.get('followers_count', 0), + 0, # candidate_rank (will be set later if needed) + cand['match_method'], + cand['baseline_confidence'], + psycopg2.extras.Json(match_signals), + True, # needs_llm_review + False, # llm_processed + )) + + with telegram_conn.cursor() as cur: + execute_values(cur, """ + INSERT INTO twitter_match_candidates ( + account_id, + telegram_user_id, + twitter_id, + twitter_username, + twitter_name, + twitter_bio, + twitter_location, + twitter_verified, + twitter_blue_verified, + twitter_followers_count, + candidate_rank, + match_method, + baseline_confidence, + match_signals, + needs_llm_review, + llm_processed + ) VALUES %s + ON CONFLICT (telegram_user_id, twitter_id) DO NOTHING + """, insert_data, page_size=1000) + + telegram_conn.commit() + + return len(insert_data) + + +def main(): + parser = argparse.ArgumentParser(description='Find Twitter candidates for Telegram contacts (threading-based)') + parser.add_argument('--limit', type=int, help='Limit number of Telegram contacts to process') + parser.add_argument('--test', action='store_true', help='Test mode: process first 100 contacts only') + parser.add_argument('--workers', type=int, default=8, + help='Number of worker threads (default: 8)') + parser.add_argument('--user-id-min', type=int, help='Minimum user_id to process (for parallel ranges)') + parser.add_argument('--user-id-max', type=int, help='Maximum user_id to process (for parallel ranges)') + parser.add_argument('--range-name', type=str, help='Name for this range (for logging)') + + args = parser.parse_args() + + num_workers = args.workers + limit = args.limit + user_id_min = args.user_id_min + user_id_max = args.user_id_max + range_name = args.range_name or "full" + + print("=" * 70) + print(f"๐Ÿ” Twitter-Telegram Candidate Finder (THREADED) - Range: {range_name}") + print("=" * 70) + print() + + if args.test: + limit = 100 + print("๐Ÿงช TEST MODE: Processing first 100 contacts only") + print() + elif limit: + print(f"๐Ÿ“Š LIMIT MODE: Processing first {limit:,} contacts") + print() + + if user_id_min is not None and user_id_max is not None: + print(f"๐Ÿ“ User ID Range: {user_id_min:,} to {user_id_max:,}") + print() + + print(f"๐Ÿงต Worker threads: {num_workers}") + print() + + # Load contacts using raw psycopg2 + print("๐Ÿ“ก Loading Telegram contacts...") + + conn = psycopg2.connect( + dbname='telegram_contacts', + user='andrewjiang', + host='localhost', + port=5432 + ) + + with conn.cursor() as cur: + # First, get list of already processed contacts + cur.execute(""" + SELECT DISTINCT telegram_user_id + FROM twitter_match_candidates + """) + already_processed = set(row[0] for row in cur.fetchall()) + + print(f"๐Ÿ“‹ Already processed: {len(already_processed):,} contacts (will skip)") + print() + + query = """ + SELECT account_id, user_id, display_name, first_name, last_name, username, phone, bio, is_bot, is_deleted + FROM contacts + WHERE user_id > 0 + AND is_deleted = false + AND is_bot = false + AND (bio IS NOT NULL OR username IS NOT NULL) + """ + + # Add user_id range filter if specified + if user_id_min is not None: + query += f" AND user_id >= {user_id_min}" + if user_id_max is not None: + query += f" AND user_id <= {user_id_max}" + + query += " ORDER BY user_id" + + if limit: + query += f" LIMIT {limit}" + + cur.execute(query) + rows = cur.fetchall() + + conn.close() + + # Convert to Contact objects, skipping already processed + contacts = [] + skipped = 0 + for row in rows: + user_id = row[1] + + # Skip if already processed + if user_id in already_processed: + skipped += 1 + continue + + contact = Contact( + account_id=row[0], + user_id=user_id, + display_name=row[2], + first_name=row[3], + last_name=row[4], + username=row[5], + phone=row[6], + bio=row[7], + is_bot=row[8], + is_deleted=row[9] + ) + contacts.append(contact) + + total_contacts = len(contacts) + print(f"โœ… Found {total_contacts:,} NEW contacts to process (skipped {skipped:,} already done)") + print() + + print("๐Ÿš€ Processing contacts with thread pool...") + print() + + start_time = time.time() + + # Stats tracking + total_processed = 0 + total_with_candidates = 0 + all_candidates = [] + combined_method_stats = {} + total_saved = 0 + + # Database connection for incremental saves + telegram_conn = psycopg2.connect( + dbname='telegram_contacts', + user='andrewjiang', + host='localhost', + port=5432 + ) + + # Process with ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=num_workers) as executor: + # Submit all tasks + future_to_contact = { + executor.submit(process_contact, contact): contact + for contact in contacts + } + + # Process results as they complete + for i, future in enumerate(as_completed(future_to_contact), 1): + try: + success, num_candidates, candidates, method_stats = future.result() + + if success: + total_processed += 1 + + if candidates: + total_with_candidates += 1 + all_candidates.extend(candidates) + + # Update method stats + for method, count in method_stats.items(): + combined_method_stats[method] = combined_method_stats.get(method, 0) + count + + # Save to database every 100 contacts + if len(all_candidates) >= 100: + saved = save_candidates_batch(all_candidates, telegram_conn) + total_saved += saved + all_candidates = [] # Clear buffer + + # Progress update every 10 contacts + if i % 10 == 0 or i == 1: + elapsed = time.time() - start_time + rate = total_processed / elapsed if elapsed > 0 else 0 + remaining = total_contacts - total_processed + eta_seconds = remaining / rate if rate > 0 else 0 + eta_hours = eta_seconds / 3600 + + print(f" Progress: {total_processed}/{total_contacts} ({total_processed/total_contacts*100:.1f}%) | " + f"{total_saved + len(all_candidates):,} candidates (๐Ÿ’พ {total_saved:,} saved) | " + f"Rate: {rate:.1f}/sec | " + f"ETA: {eta_hours:.1f}h") + + except Exception as e: + print(f" โŒ Error processing future: {e}") + import traceback + traceback.print_exc() + + # Save any remaining candidates + if all_candidates: + saved = save_candidates_batch(all_candidates, telegram_conn) + total_saved += saved + + telegram_conn.close() + + elapsed = time.time() - start_time + + print() + print(f"โฑ๏ธ Processing completed in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)") + print(f" Rate: {total_processed/elapsed:.1f} contacts/second") + print() + print(f"๐Ÿ’พ Total saved: {total_saved:,} candidates") + print() + + # Print stats + print("=" * 70) + print("โœ… CANDIDATE FINDING COMPLETE") + print("=" * 70) + print() + print(f"๐Ÿ“Š Statistics:") + print(f" Processed: {total_processed:,} contacts") + print(f" With candidates: {total_with_candidates:,} ({total_with_candidates/total_processed*100:.1f}%)") + print(f" Total candidates: {total_saved:,}") + if total_with_candidates > 0: + print(f" Avg candidates per match: {total_saved/total_with_candidates:.1f}") + print() + print(f"๐Ÿ“ˆ By method:") + for method, count in sorted(combined_method_stats.items(), key=lambda x: x[1], reverse=True): + print(f" {method}: {count:,}") + print() + + return True + + +if __name__ == "__main__": + try: + success = main() + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\n\nโŒ Interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\nโŒ Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/main.py b/main.py new file mode 100755 index 0000000..30f5483 --- /dev/null +++ b/main.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +""" +Twitter-Telegram Profile Matching System +Main menu for finding candidates and verifying matches with LLM +""" + +import sys +import os +import subprocess +import psycopg2 + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'src')) + +# Database configuration +DB_CONFIG = { + 'dbname': 'telegram_contacts', + 'user': 'andrewjiang', + 'host': 'localhost', + 'port': 5432 +} + +TWITTER_DB_CONFIG = { + 'dbname': 'twitter_data', + 'user': 'andrewjiang', + 'host': 'localhost', + 'port': 5432 +} + + +def get_stats(): + """Get current matching statistics""" + conn = psycopg2.connect(**DB_CONFIG) + cur = conn.cursor() + + stats = {} + + # Candidates stats + cur.execute(""" + SELECT + COUNT(DISTINCT telegram_user_id) as total_users, + COUNT(*) as total_candidates, + COUNT(*) FILTER (WHERE llm_processed = TRUE) as processed_candidates, + COUNT(*) FILTER (WHERE llm_processed = FALSE) as pending_candidates + FROM twitter_match_candidates + """) + row = cur.fetchone() + stats['total_users'] = row[0] + stats['total_candidates'] = row[1] + stats['processed_candidates'] = row[2] + stats['pending_candidates'] = row[3] + + # Matches stats + cur.execute(""" + SELECT + COUNT(*) as total_matches, + AVG(final_confidence) as avg_confidence, + COUNT(*) FILTER (WHERE final_confidence >= 0.90) as high_conf, + COUNT(*) FILTER (WHERE final_confidence >= 0.80 AND final_confidence < 0.90) as med_conf, + COUNT(*) FILTER (WHERE final_confidence >= 0.70 AND final_confidence < 0.80) as low_conf + FROM twitter_telegram_matches + """) + row = cur.fetchone() + stats['total_matches'] = row[0] + stats['avg_confidence'] = row[1] or 0 + stats['high_conf'] = row[2] + stats['med_conf'] = row[3] + stats['low_conf'] = row[4] + + # Users with matches + cur.execute(""" + SELECT COUNT(DISTINCT telegram_user_id) + FROM twitter_telegram_matches + """) + stats['users_with_matches'] = cur.fetchone()[0] + + cur.close() + conn.close() + + return stats + + +def print_header(): + """Print main header""" + print() + print("=" * 80) + print("๐Ÿ”— Twitter-Telegram Profile Matching System") + print("=" * 80) + print() + + +def print_stats(): + """Print current statistics""" + stats = get_stats() + + print("๐Ÿ“Š Current Statistics:") + print("-" * 80) + print(f"Candidates:") + print(f" โ€ข Users with candidates: {stats['total_users']:,}") + print(f" โ€ข Total candidates found: {stats['total_candidates']:,}") + print(f" โ€ข Processed by LLM: {stats['processed_candidates']:,}") + print(f" โ€ข Pending verification: {stats['pending_candidates']:,}") + print() + print(f"Verified Matches:") + print(f" โ€ข Users with matches: {stats['users_with_matches']:,}") + print(f" โ€ข Total matches: {stats['total_matches']:,}") + print(f" โ€ข Average confidence: {stats['avg_confidence']:.2f}") + print(f" โ€ข High confidence (90%+): {stats['high_conf']:,}") + print(f" โ€ข Medium confidence (80-89%): {stats['med_conf']:,}") + print(f" โ€ข Low confidence (70-79%): {stats['low_conf']:,}") + print("-" * 80) + print() + + +def run_script(script_name, *args): + """Run a Python script with arguments""" + script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"{script_name}.py") + cmd = ['python3.10', script_path] + list(args) + subprocess.run(cmd) + + +def main(): + while True: + print_header() + print_stats() + + print("๐Ÿ“‹ Main Menu:") + print() + print("STEP 1: Find Candidates") + print(" 1. Find Twitter candidates (threaded, RECOMMENDED)") + print(" 2. Find Twitter candidates (single-threaded)") + print() + print("STEP 2: Verify with LLM") + print(" 3. Verify matches with LLM (async, RECOMMENDED)") + print(" 4. Verify matches with LLM (test mode - 50 users)") + print() + print("Analysis & Review") + print(" 5. Review match quality") + print(" 6. Show statistics only") + print() + print(" 0. Exit") + print() + + choice = input("๐Ÿ‘‰ Enter your choice: ").strip() + + if choice == '0': + print("\n๐Ÿ‘‹ Goodbye!\n") + break + + elif choice == '1': + # Find candidates (threaded) + print() + print("๐Ÿ” Finding Twitter candidates (threaded mode)...") + print() + limit_input = input("๐Ÿ‘‰ How many contacts? (press Enter for all): ").strip() + workers = input("๐Ÿ‘‰ Number of worker threads (default: 8): ").strip() or '8' + + if limit_input: + run_script('find_twitter_candidates_threaded', '--limit', limit_input, '--workers', workers) + else: + run_script('find_twitter_candidates_threaded', '--workers', workers) + + input("\nโœ… Press Enter to continue...") + + elif choice == '2': + # Find candidates (single-threaded) + print() + print("๐Ÿ” Finding Twitter candidates (single-threaded mode)...") + print() + limit_input = input("๐Ÿ‘‰ How many contacts? (press Enter for all): ").strip() + + if limit_input: + run_script('find_twitter_candidates', '--limit', limit_input) + else: + run_script('find_twitter_candidates') + + input("\nโœ… Press Enter to continue...") + + elif choice == '3': + # Verify with LLM (async) + print() + print("๐Ÿค– Verifying matches with LLM (async mode)...") + print() + concurrent = input("๐Ÿ‘‰ Concurrent requests (default: 100): ").strip() or '100' + + run_script('verify_twitter_matches_v2', '--verbose', '--concurrent', concurrent) + + input("\nโœ… Press Enter to continue...") + + elif choice == '4': + # Verify with LLM (test mode) + print() + print("๐Ÿงช Test mode: Verifying 50 users with LLM...") + print() + + run_script('verify_twitter_matches_v2', '--test', '--limit', '50', '--verbose', '--concurrent', '10') + + input("\nโœ… Press Enter to continue...") + + elif choice == '5': + # Review match quality + print() + print("๐Ÿ“Š Reviewing match quality...") + print() + + run_script('review_match_quality') + + input("\nโœ… Press Enter to continue...") + + elif choice == '6': + # Just show stats, loop back to menu + continue + + else: + print("\nโŒ Invalid choice. Please try again.\n") + input("Press Enter to continue...") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n๐Ÿ‘‹ Interrupted. Goodbye!\n") + sys.exit(0) diff --git a/review_match_quality.py b/review_match_quality.py new file mode 100755 index 0000000..3ee3b1a --- /dev/null +++ b/review_match_quality.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +Critical Match Quality Reviewer + +Analyzes verification results with deep understanding of: +- Twitter/Telegram/crypto culture +- Common false positive patterns +- Company vs personal account indicators +- Context alignment +""" + +import re +import json +from pathlib import Path +from typing import List, Dict, Tuple + +LOG_FILE = Path(__file__).parent.parent / 'verification_v2_log.txt' + + +class MatchReviewer: + """Critical reviewer with crypto/web3 domain knowledge""" + + # Company/product/team indicators + COMPANY_INDICATORS = [ + r'\bis\s+(a|an)\s+\w+\s+(way|tool|platform|app|service|protocol)', + r'official\s+(account|page|channel)', + r'(team|official)\s*$', + r'^(the|a)\s+\w+\s+(for|to)', + r'brought to you by', + r'hosted by', + r'founded by', + r'(community|project)\s+(account|page)', + r'(dao|protocol|network)\s*$', + r'building\s+(the|a)\s+', + ] + + # Personal account indicators + PERSONAL_INDICATORS = [ + r'(ceo|founder|co-founder|developer|builder|engineer|researcher)\s+at', + r'working\s+(at|on|with)', + r'^(i|i\'m|my)', + r'(my|personal)\s+(views|opinions|thoughts)', + r'\b(he/him|she/her|they/them)\b', + ] + + # Crypto/Web3 keywords + CRYPTO_KEYWORDS = [ + 'web3', 'crypto', 'blockchain', 'defi', 'nft', 'dao', 'dapp', + 'ethereum', 'solana', 'bitcoin', 'polygon', 'base', 'arbitrum', + 'smart contract', 'token', 'wallet', 'metamask', 'coinbase', + 'yield', 'farming', 'staking', 'airdrop', 'whitelist', 'mint', + 'protocol', 'l1', 'l2', 'rollup', 'zk', 'evm' + ] + + def __init__(self): + self.issues = [] + self.stats = { + 'total_matches': 0, + 'false_positives': 0, + 'questionable': 0, + 'good_matches': 0, + 'company_accounts': 0, + 'weak_evidence': 0, + 'context_mismatch': 0, + } + + def parse_log(self) -> List[Dict]: + """Parse the verification log into structured data""" + with open(LOG_FILE, 'r') as f: + content = f.read() + + entries = [] + # Find all sections that start with TELEGRAM USER + pattern = r'TELEGRAM USER: ([^\(]+) \(ID: (\d+)\)(.*?)(?=TELEGRAM USER:|$)' + matches = re.finditer(pattern, content, re.DOTALL) + + for match in matches: + tg_username = match.group(1).strip() + tg_id = int(match.group(2)) + section = match.group(3) + + # Extract TG profile + tg_bio_match = re.search(r'Bio: (.*?)(?:\n+TWITTER CANDIDATES)', section, re.DOTALL) + tg_bio = tg_bio_match.group(1).strip() if tg_bio_match else '' + + # Extract username specificity + spec_match = re.search(r'username has specificity score ([\d.]+)', section) + specificity = float(spec_match.group(1)) if spec_match else 0.5 + + # Extract candidates + candidates = [] + candidate_blocks = re.findall( + r'\[Candidate (\d+)\](.*?)(?=\[Candidate \d+\]|LLM RESPONSE:)', + section, + re.DOTALL + ) + + for idx, block in candidate_blocks: + tw_username = re.search(r'Twitter Username: @(\S+)', block) + tw_name = re.search(r'Twitter Display Name: (.+)', block) + tw_bio = re.search(r'Twitter Bio: (.*?)(?=\nLocation:)', block, re.DOTALL) + tw_followers = re.search(r'Followers: ([\d,]+)', block) + match_method = re.search(r'Match Method: (\S+)', block) + baseline_conf = re.search(r'Baseline Confidence: ([\d.]+)', block) + + candidates.append({ + 'index': int(idx), + 'twitter_username': tw_username.group(1) if tw_username else '', + 'twitter_name': tw_name.group(1).strip() if tw_name else '', + 'twitter_bio': tw_bio.group(1).strip() if tw_bio else '', + 'twitter_followers': tw_followers.group(1) if tw_followers else '0', + 'match_method': match_method.group(1) if match_method else '', + 'baseline_confidence': float(baseline_conf.group(1)) if baseline_conf else 0.0, + }) + + # Extract LLM response (handle multiline JSON with nested structures) + llm_match = re.search(r'LLM RESPONSE:\s*-+\s*(\{.*)', section, re.DOTALL) + if llm_match: + try: + # Extract JSON - it should be everything after "LLM RESPONSE:" until end of section + json_text = llm_match.group(1) + # Find the JSON object (balanced braces) + brace_count = 0 + json_end = 0 + for i, char in enumerate(json_text): + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0: + json_end = i + 1 + break + + if json_end > 0: + json_str = json_text[:json_end] + llm_result = json.loads(json_str) + else: + llm_result = {'candidates': []} + except Exception as e: + llm_result = {'candidates': []} + else: + llm_result = {'candidates': []} + + entries.append({ + 'telegram_username': tg_username, + 'telegram_id': tg_id, + 'telegram_bio': tg_bio, + 'username_specificity': specificity, + 'candidates': candidates, + 'llm_results': llm_result.get('candidates', []) + }) + + return entries + + def is_company_account(self, bio: str, name: str) -> Tuple[bool, str]: + """Detect if this is a company/product/team account""" + text = (bio + ' ' + name).lower() + + for pattern in self.COMPANY_INDICATORS: + if re.search(pattern, text, re.IGNORECASE): + return True, f"Company pattern: '{pattern}'" + + # Check if name equals bio description + if bio and len(bio.split()) < 20: + # Short bio describing what something "is" + if re.search(r'\bis\s+(a|an|the)\s+', bio, re.IGNORECASE): + return True, "Bio describes a product/service" + + return False, "" + + def is_personal_account(self, bio: str) -> bool: + """Detect personal account indicators""" + for pattern in self.PERSONAL_INDICATORS: + if re.search(pattern, bio, re.IGNORECASE): + return True + return False + + def has_crypto_context(self, bio: str) -> Tuple[bool, List[str]]: + """Check if bio has crypto/web3 context""" + if not bio: + return False, [] + + bio_lower = bio.lower() + found_keywords = [] + + for keyword in self.CRYPTO_KEYWORDS: + if keyword in bio_lower: + found_keywords.append(keyword) + + return len(found_keywords) > 0, found_keywords + + def review_match(self, entry: Dict) -> Dict: + """Critically review a single match""" + issues = [] + severity = 'GOOD' + + tg_username = entry['telegram_username'] + tg_bio = entry['telegram_bio'] + tg_has_crypto, tg_crypto_keywords = self.has_crypto_context(tg_bio) + + # Review each LLM-approved match (confidence >= 0.5) + for llm_result in entry['llm_results']: + confidence = llm_result.get('confidence', 0) + if confidence < 0.5: + continue + + self.stats['total_matches'] += 1 + candidate_idx = llm_result.get('candidate_index', 0) - 1 + + if candidate_idx < 0 or candidate_idx >= len(entry['candidates']): + continue + + candidate = entry['candidates'][candidate_idx] + tw_username = candidate['twitter_username'] + tw_bio = candidate['twitter_bio'] + tw_name = candidate['twitter_name'] + match_method = candidate['match_method'] + + # Check 1: Company account + is_company, company_reason = self.is_company_account(tw_bio, tw_name) + if is_company: + issues.append({ + 'type': 'COMPANY_ACCOUNT', + 'severity': 'HIGH', + 'description': f"Twitter @{tw_username} appears to be a company/product account", + 'evidence': company_reason, + 'confidence': confidence + }) + self.stats['company_accounts'] += 1 + severity = 'FALSE_POSITIVE' + + # Check 2: Context mismatch + tw_has_crypto, tw_crypto_keywords = self.has_crypto_context(tw_bio) + + if tg_has_crypto and not tw_has_crypto: + issues.append({ + 'type': 'CONTEXT_MISMATCH', + 'severity': 'MEDIUM', + 'description': f"TG has crypto context but TW doesn't", + 'evidence': f"TG keywords: {tg_crypto_keywords}, TW keywords: none", + 'confidence': confidence + }) + self.stats['context_mismatch'] += 1 + if severity == 'GOOD': + severity = 'QUESTIONABLE' + + # Check 3: Empty bio with no strong evidence + if not tg_bio and not tw_bio and confidence > 0.8: + issues.append({ + 'type': 'WEAK_EVIDENCE', + 'severity': 'MEDIUM', + 'description': f"High confidence ({confidence}) with both bios empty", + 'evidence': f"Only username match, no contextual verification", + 'confidence': confidence + }) + self.stats['weak_evidence'] += 1 + if severity == 'GOOD': + severity = 'QUESTIONABLE' + + # Check 4: Generic username with high confidence + if entry['username_specificity'] < 0.6 and confidence > 0.85: + issues.append({ + 'type': 'GENERIC_USERNAME', + 'severity': 'LOW', + 'description': f"Generic username ({entry['username_specificity']:.2f} specificity) with high confidence", + 'evidence': f"Username: {tg_username}", + 'confidence': confidence + }) + if severity == 'GOOD': + severity = 'QUESTIONABLE' + + # Check 5: Twitter bio mentions other accounts + if match_method == 'twitter_bio_has_telegram': + # Check if the telegram username appears as @mention (not the account itself) + mentions = re.findall(r'@(\w+)', tw_bio) + if tg_username.lower() not in [m.lower() for m in mentions]: + # The username is embedded in another handle + issues.append({ + 'type': 'SUBSTRING_MATCH', + 'severity': 'HIGH', + 'description': f"TG username found as substring in other accounts, not direct mention", + 'evidence': f"TW bio: {tw_bio[:100]}", + 'confidence': confidence + }) + self.stats['false_positives'] += 1 + severity = 'FALSE_POSITIVE' + + # Count severity + if severity == 'FALSE_POSITIVE': + self.stats['false_positives'] += 1 + elif severity == 'QUESTIONABLE': + self.stats['questionable'] += 1 + else: + self.stats['good_matches'] += 1 + + return { + 'telegram_username': tg_username, + 'telegram_id': entry['telegram_id'], + 'severity': severity, + 'issues': issues, + 'entry': entry + } + + def generate_report(self, reviews: List[Dict]): + """Generate comprehensive review report""" + print() + print("=" * 100) + print("๐Ÿ” MATCH QUALITY REVIEW REPORT") + print("=" * 100) + print() + + print("๐Ÿ“Š STATISTICS:") + print(f" Total matches reviewed: {self.stats['total_matches']}") + print(f" โœ… Good matches: {self.stats['good_matches']} ({self.stats['good_matches']/max(self.stats['total_matches'],1)*100:.1f}%)") + print(f" โš ๏ธ Questionable: {self.stats['questionable']} ({self.stats['questionable']/max(self.stats['total_matches'],1)*100:.1f}%)") + print(f" โŒ False positives: {self.stats['false_positives']} ({self.stats['false_positives']/max(self.stats['total_matches'],1)*100:.1f}%)") + print() + + print("๐Ÿšจ ISSUE BREAKDOWN:") + print(f" Company accounts: {self.stats['company_accounts']}") + print(f" Context mismatches: {self.stats['context_mismatch']}") + print(f" Weak evidence: {self.stats['weak_evidence']}") + print() + + # Show false positives + false_positives = [r for r in reviews if r['severity'] == 'FALSE_POSITIVE'] + if false_positives: + print("=" * 100) + print("โŒ FALSE POSITIVES:") + print("=" * 100) + for review in false_positives[:10]: # Show top 10 + print() + print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})") + print(f"TG Bio: {review['entry']['telegram_bio'][:100]}") + for issue in review['issues']: + print(f" โŒ [{issue['severity']}] {issue['type']}: {issue['description']}") + print(f" Evidence: {issue['evidence'][:150]}") + print(f" LLM Confidence: {issue['confidence']:.2f}") + + # Show questionable matches + questionable = [r for r in reviews if r['severity'] == 'QUESTIONABLE'] + if questionable: + print() + print("=" * 100) + print("โš ๏ธ QUESTIONABLE MATCHES:") + print("=" * 100) + for review in questionable[:10]: # Show top 10 + print() + print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})") + for issue in review['issues']: + print(f" โš ๏ธ [{issue['severity']}] {issue['type']}: {issue['description']}") + print(f" Evidence: {issue['evidence'][:150]}") + print(f" LLM Confidence: {issue['confidence']:.2f}") + + print() + print("=" * 100) + print("๐Ÿ’ก RECOMMENDATIONS:") + print("=" * 100) + print() + + if self.stats['company_accounts'] > 0: + print("1. Add company account detection to prompt:") + print(" - Check for product descriptions ('X is a platform for...')") + print(" - Look for 'official', 'team', 'hosted by' patterns") + print(" - Distinguish personal vs organizational accounts") + print() + + if self.stats['context_mismatch'] > 0: + print("2. Strengthen context matching:") + print(" - Require crypto/web3 keywords in both profiles") + print(" - Lower confidence when contexts don't align") + print() + + if self.stats['weak_evidence'] > 0: + print("3. Adjust confidence for weak evidence:") + print(" - Cap confidence at 0.70 when both bios are empty") + print(" - Require additional signals beyond username match") + print() + + print("4. Fix 'twitter_bio_has_telegram' method:") + print(" - Only match direct @mentions, not substrings in other handles") + print(" - Example: @hipster should NOT match mentions of @HipsterHacker") + print() + + +def main(): + reviewer = MatchReviewer() + + print("๐Ÿ“– Parsing verification log...") + entries = reviewer.parse_log() + print(f"โœ… Parsed {len(entries)} verification entries") + print() + + print("๐Ÿ” Reviewing match quality...") + reviews = [] + for entry in entries: + if entry['llm_results']: # Only review entries with matches + review = reviewer.review_match(entry) + reviews.append(review) + print(f"โœ… Reviewed {len(reviews)} matches") + + reviewer.generate_report(reviews) + + +if __name__ == "__main__": + main() diff --git a/setup_twitter_matching_schema.sql b/setup_twitter_matching_schema.sql new file mode 100644 index 0000000..ff709c4 --- /dev/null +++ b/setup_twitter_matching_schema.sql @@ -0,0 +1,101 @@ +-- Twitter-Telegram Matching Schema +-- Run this against your telegram_contacts database + +-- Table: twitter_telegram_matches +-- Stores confirmed matches between Telegram and Twitter profiles +CREATE TABLE IF NOT EXISTS twitter_telegram_matches ( + id SERIAL PRIMARY KEY, + + -- Telegram side + telegram_user_id BIGINT NOT NULL REFERENCES contacts(user_id), + telegram_username VARCHAR(255), + telegram_name VARCHAR(255), + telegram_bio TEXT, + + -- Twitter side + twitter_id VARCHAR(50) NOT NULL, + twitter_username VARCHAR(100) NOT NULL, + twitter_name VARCHAR(200), + twitter_bio TEXT, + twitter_location VARCHAR(200), + twitter_verified BOOLEAN, + twitter_blue_verified BOOLEAN, + twitter_followers_count INTEGER, + + -- Matching metadata + match_method VARCHAR(50) NOT NULL, -- 'exact_bio_handle', 'exact_username', 'username_variation', 'fuzzy_name' + baseline_confidence FLOAT NOT NULL, -- Confidence before LLM (0-1) + llm_verdict VARCHAR(20) NOT NULL, -- 'YES', 'NO', 'UNSURE' + final_confidence FLOAT NOT NULL CHECK (final_confidence BETWEEN 0 AND 1), + + -- Match details (JSON for flexibility) + match_details JSONB, -- {extracted_handles: [...], username_variation: 'xxx', fuzzy_score: 0.85} + + -- LLM metadata + llm_tokens_used INTEGER, + llm_cost FLOAT, + + -- Audit trail + matched_at TIMESTAMP DEFAULT NOW(), + needs_manual_review BOOLEAN DEFAULT FALSE, + verified_manually BOOLEAN DEFAULT FALSE, + manual_review_notes TEXT, + + UNIQUE(telegram_user_id, twitter_id) +); + +-- Indexes for performance +CREATE INDEX IF NOT EXISTS idx_ttm_telegram_user ON twitter_telegram_matches(telegram_user_id); +CREATE INDEX IF NOT EXISTS idx_ttm_twitter_id ON twitter_telegram_matches(twitter_id); +CREATE INDEX IF NOT EXISTS idx_ttm_twitter_username ON twitter_telegram_matches(LOWER(twitter_username)); +CREATE INDEX IF NOT EXISTS idx_ttm_confidence ON twitter_telegram_matches(final_confidence DESC); +CREATE INDEX IF NOT EXISTS idx_ttm_verdict ON twitter_telegram_matches(llm_verdict); +CREATE INDEX IF NOT EXISTS idx_ttm_method ON twitter_telegram_matches(match_method); +CREATE INDEX IF NOT EXISTS idx_ttm_needs_review ON twitter_telegram_matches(needs_manual_review) WHERE needs_manual_review = TRUE; + +-- Table: twitter_match_candidates (temporary staging) +-- Stores potential matches before LLM verification +CREATE TABLE IF NOT EXISTS twitter_match_candidates ( + id SERIAL PRIMARY KEY, + + telegram_user_id BIGINT NOT NULL REFERENCES contacts(user_id), + + -- Twitter candidate info + twitter_id VARCHAR(50) NOT NULL, + twitter_username VARCHAR(100) NOT NULL, + twitter_name VARCHAR(200), + twitter_bio TEXT, + twitter_location VARCHAR(200), + twitter_verified BOOLEAN, + twitter_blue_verified BOOLEAN, + twitter_followers_count INTEGER, + + -- Candidate scoring + candidate_rank INTEGER, -- 1 = best match, 2 = second best, etc. + match_method VARCHAR(50), + baseline_confidence FLOAT, + match_signals JSONB, -- {'handle_match': true, 'fuzzy_score': 0.85, ...} + + -- LLM processing status + needs_llm_review BOOLEAN DEFAULT TRUE, + llm_processed BOOLEAN DEFAULT FALSE, + llm_verdict VARCHAR(20), + final_confidence FLOAT, + + created_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_tmc_telegram_user ON twitter_match_candidates(telegram_user_id); +CREATE INDEX IF NOT EXISTS idx_tmc_twitter_id ON twitter_match_candidates(twitter_id); +CREATE INDEX IF NOT EXISTS idx_tmc_needs_review ON twitter_match_candidates(llm_processed, needs_llm_review) + WHERE needs_llm_review = TRUE AND llm_processed = FALSE; +CREATE INDEX IF NOT EXISTS idx_tmc_rank ON twitter_match_candidates(telegram_user_id, candidate_rank); + +-- Grant permissions (adjust as needed for your setup) +-- GRANT ALL PRIVILEGES ON twitter_telegram_matches TO your_user; +-- GRANT ALL PRIVILEGES ON twitter_match_candidates TO your_user; +-- GRANT USAGE, SELECT ON SEQUENCE twitter_telegram_matches_id_seq TO your_user; +-- GRANT USAGE, SELECT ON SEQUENCE twitter_match_candidates_id_seq TO your_user; + +COMMENT ON TABLE twitter_telegram_matches IS 'Confirmed matches between Telegram and Twitter profiles'; +COMMENT ON TABLE twitter_match_candidates IS 'Temporary staging for potential matches awaiting LLM verification'; diff --git a/verify_twitter_matches_v2.py b/verify_twitter_matches_v2.py new file mode 100755 index 0000000..66e4edc --- /dev/null +++ b/verify_twitter_matches_v2.py @@ -0,0 +1,791 @@ +#!/usr/bin/env python3 +""" +Twitter-Telegram Match Verifier V2 (Confidence-Based with Batch Evaluation) +Uses LLM with confidence scoring (0-1) and evaluates all candidates for a TG user together +""" + +import sys +import asyncio +import json +from pathlib import Path +from typing import List, Dict +import psycopg2 +from psycopg2.extras import DictCursor, RealDictCursor +import openai +from datetime import datetime +import re + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from db_config import SessionLocal +from models import Contact + +# Twitter database connection +TWITTER_DB_CONFIG = { + 'dbname': 'twitter_data', + 'user': 'andrewjiang', + 'host': 'localhost', + 'port': 5432 +} + +# Checkpoint file +CHECKPOINT_FILE = Path(__file__).parent.parent / 'llm_verification_v2_checkpoint.json' + + +class CheckpointManager: + """Manage checkpoints for resumable verification""" + + def __init__(self, checkpoint_file): + self.checkpoint_file = checkpoint_file + self.data = self.load() + + def load(self): + if self.checkpoint_file.exists(): + with open(self.checkpoint_file, 'r') as f: + return json.load(f) + return { + 'last_processed_telegram_id': None, + 'processed_count': 0, + 'total_matches_saved': 0, + 'total_cost': 0.0, + 'total_tokens': 0, + 'started_at': None + } + + def save(self): + self.data['last_updated_at'] = datetime.now().isoformat() + with open(self.checkpoint_file, 'w') as f: + json.dump(self.data, f, indent=2) + + def update(self, telegram_user_id, matches_saved, tokens_used, cost): + self.data['last_processed_telegram_id'] = telegram_user_id + self.data['processed_count'] += 1 + self.data['total_matches_saved'] += matches_saved + self.data['total_tokens'] += tokens_used + self.data['total_cost'] += cost + + # Save every 20 telegram users + if self.data['processed_count'] % 20 == 0: + self.save() + + +def calculate_username_specificity(username: str) -> float: + """ + Calculate how specific/unique a username is (0-1) + + Generic usernames get lower scores, unique ones get higher scores + """ + if not username: + return 0.5 + + username_lower = username.lower() + + # Very generic patterns + generic_patterns = [ + r'^admin\d*$', r'^user\d+$', r'^crypto\d*$', r'^nft\d*$', + r'^web3\d*$', r'^trader\d*$', r'^dev\d*$', r'^official\d*$', + r'^team\d*$', r'^support\d*$', r'^info\d*$' + ] + + for pattern in generic_patterns: + if re.match(pattern, username_lower): + return 0.3 + + # Length-based scoring + length = len(username) + if length < 4: + return 0.4 + elif length < 6: + return 0.6 + elif length < 8: + return 0.75 + else: + return 0.9 + + +class LLMVerifier: + """Use GPT-5 Nano for confidence-based verification""" + + def __init__(self): + self.client = openai.AsyncOpenAI() + self.model = "gpt-5-mini" # GPT-5 Mini - balanced performance and cost + + # Cost calculation for gpt-5-mini + self.input_cost_per_1m = 0.25 # $0.25 per 1M input tokens + self.output_cost_per_1m = 2.00 # $2.00 per 1M output tokens + + def build_batch_prompt(self, telegram_profile: Dict, candidates: List[Dict]) -> str: + """Build LLM prompt for batch evaluation of all candidates""" + + # Format Telegram profile + tg_bio = telegram_profile.get('bio', 'none') + if tg_bio and len(tg_bio) > 300: + tg_bio = tg_bio[:300] + '...' + + # Format chat context + chat_context = telegram_profile.get('chat_context', {}) + chat_info = "" + if chat_context.get('chat_titles'): + chat_info = f"\nGroup Chats: {chat_context['chat_titles']}" + if chat_context.get('is_crypto_focused'): + chat_info += " [CRYPTO-FOCUSED GROUPS]" + + prompt = f"""TELEGRAM PROFILE: +Username: @{telegram_profile.get('username') or 'none'} +Display Name: {telegram_profile.get('name', 'none')} (first_name + last_name combined) +Bio: {tg_bio}{chat_info} + +TWITTER CANDIDATES (evaluate all together): +""" + + for i, candidate in enumerate(candidates, 1): + tw_bio = candidate.get('twitter_bio', 'none') + if tw_bio and len(tw_bio) > 250: + tw_bio = tw_bio[:250] + '...' + + # Check if phash match info exists + phash_info = "" + if candidate['match_method'] == 'phash_match': + phash_distance = candidate.get('match_signals', {}).get('phash_distance', 'unknown') + phash_info = f"\nโญ Phash Match: distance={phash_distance} (identical profile pictures!)" + + prompt += f""" +[Candidate {i}] +Twitter Username: @{candidate['twitter_username']} +Twitter Display Name: {candidate.get('twitter_name', 'Unknown')} +Twitter Bio: {tw_bio} +Location: {candidate.get('twitter_location') or 'none'} +Verified: {candidate.get('twitter_verified', False)} (Blue: {candidate.get('twitter_blue_verified', False)}) +Followers: {candidate.get('twitter_followers_count', 0):,} +Match Method: {candidate['match_method']}{phash_info} +Baseline Confidence: {candidate.get('baseline_confidence', 0):.2f} +""" + + return prompt + + async def verify_batch(self, telegram_profile: Dict, candidates: List[Dict], semaphore, log_file=None) -> Dict: + """Verify all candidates for a single Telegram user""" + + async with semaphore: + prompt = self.build_batch_prompt(telegram_profile, candidates) + + if log_file: + log_file.write(f"\n{'=' * 100}\n") + log_file.write(f"TELEGRAM USER: {telegram_profile.get('username', 'N/A')} (ID: {telegram_profile['user_id']})\n") + log_file.write(f"{'=' * 100}\n\n") + + system_prompt = """You are an expert at determining if two social media profiles belong to the same person. + +# TASK +Determine confidence (0.0-1.0) that each Twitter candidate is the same person as the Telegram profile. + +**CRITICAL: Evaluate ALL candidates together, not in isolation. Compare them against each other to identify which has the STRONGEST evidence.** + +# SIGNAL STRENGTH GUIDE + +Evaluate the FULL CONTEXT of all available signals together. Individual signals can be strong or weak, but the overall picture matters most. + +## VERY STRONG SIGNALS (Can individually suggest high confidence) +- **Explicit bio mention**: TG bio says "x.com/username" or "Follow me @username" + - โš ๏ธ EXCEPTION: If that account is clearly a company/project (not personal), this is NOT definitive + - Example: TG bio "x.com/gems_gun" but @gems_gun is company account โ†’ Look for personal account like @lucahl0 "Building @gems_gun" +- **Unique username exact match**: Unusual/long username (like @kupermind, @schellinger_k) that matches exactly + - Generic usernames (@mike, @crypto123) don't qualify as "unique" + +## STRONG SUPPORTING SIGNALS (Good indicators when combined) +Each of these helps build confidence, especially when multiple align: +- Full name match (after normalization: remove .eth, emojis, separators) +- Same profile picture (phash match) +- Aligned bio themes/context (both in crypto, both mention same projects/interests) +- Very similar username (not exact, but close: @kevin vs @k_kevin) + +## WEAK SIGNALS (Need multiple strong signals to be meaningful) +- Generic name match only (Alex, Mike, David, John, Baz) +- Same general field but no specifics +- Partial username similarity with generic name + +## RED FLAGS (Lower confidence significantly) +- Context mismatch: TG is crypto/tech, TW is chef/athlete/journalist +- Company account when looking for personal profile +- Famous person/celebrity (unless clear evidence it's actually them) + +# CONFIDENCE BANDS + +## 0.90-1.0: NEARLY CERTAIN +Very strong signal (bio mention of personal account OR unique username match) + supporting signals align +Examples: +- TG bio: "https://x.com/kupermind" + TW @kupermind personal account โ†’ 0.97 +- TG @olliten + TW @olliten + same name + same pic โ†’ 0.98 + +## 0.70-0.89: LIKELY +Multiple strong supporting signals converge, or one very strong signal with some gap +Examples: +- Very similar username + name match + context: @schellinger โ†’ @k_schellinger "Kevin Schellinger" โ†’ 0.85 +- Exact username on moderately unique name: @alexcrypto + "Alex Smith" crypto โ†’ 0.78 + +## 0.40-0.69: POSSIBLE +Some evidence but significant uncertainty +- Generic name + same field but no username/pic match +- Weak username similarity with generic name +- Profile pic match but name is very generic + +## 0.10-0.39: UNLIKELY +Minimal evidence or contradictions +- Only generic name match (David, Alex, Mike) +- Context mismatch (crypto person vs chef) + +## 0.0-0.09: EXTREMELY UNLIKELY +No meaningful evidence or clear contradiction + +# COMPARATIVE EVALUATION PROCESS + +**Step 1: Review ALL candidates together** +Don't score each in isolation. Look at the full set to understand which has the strongest evidence. + +**Step 2: Identify the strongest signals present** +- Is there a bio mention? (Check if it's personal vs company account!) +- Is there a unique username match? +- Do multiple supporting signals converge for one candidate? + +**Step 3: Apply differential scoring** +- The candidate with STRONGEST evidence should get meaningfully higher score +- If Candidate A has unique username + name match, and Candidate B only has generic name โ†’ A gets 0.85+, B gets 0.40 max +- If ALL candidates only have weak signals (generic name only) โ†’ ALL score 0.20-0.40 + +**Step 4: Sanity checks** +- Could this evidence match thousands of people? โ†’ Lower confidence +- Is there a context mismatch? โ†’ Max 0.50 +- Is this a company account when we need personal? โ†’ Not the right match + +**Key principle: Only ONE candidate can be "most likely" - differentiate clearly between them.** + +# TECHNICAL NOTES + +**Name Normalization**: Before comparing, remove .eth/.ton/.sol suffixes, emojis, "| company" separators, and ignore capitalization + +**Profile Picture (phash)**: Phash match alone โ†’ MAX 0.70 (supporting signal). Use to break ties or add confidence to other signals. + +# OUTPUT FORMAT +Return ONLY valid JSON (no markdown, no explanation): +{{ + "candidates": [ + {{ + "candidate_index": 1, + "confidence": 0.85, + "reasoning": "Brief explanation" + }}, + ... + ] +}}""" + + try: + if log_file: + log_file.write("SYSTEM PROMPT:\n") + log_file.write("-" * 100 + "\n") + log_file.write(system_prompt + "\n\n") + log_file.write("USER PROMPT:\n") + log_file.write("-" * 100 + "\n") + log_file.write(prompt + "\n\n") + log_file.flush() + + response = await self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + response_format={"type": "json_object"} + ) + + content = response.choices[0].message.content.strip() + + if log_file: + log_file.write("LLM RESPONSE:\n") + log_file.write("-" * 100 + "\n") + log_file.write(content + "\n\n") + log_file.flush() + + # Parse JSON + try: + result = json.loads(content) + except json.JSONDecodeError: + print(f" โš ๏ธ Failed to parse JSON response") + return { + 'success': False, + 'error': 'json_parse_error', + 'tokens_used': response.usage.total_tokens, + 'cost': self.calculate_cost(response.usage) + } + + tokens_used = response.usage.total_tokens + cost = self.calculate_cost(response.usage) + + return { + 'success': True, + 'results': result.get('candidates', []), + 'tokens_used': tokens_used, + 'cost': cost, + 'error': None + } + + except Exception as e: + print(f" โš ๏ธ LLM error: {str(e)[:100]}") + return { + 'success': False, + 'error': str(e), + 'tokens_used': 0, + 'cost': 0.0 + } + + def calculate_cost(self, usage) -> float: + """Calculate cost for this API call""" + input_cost = (usage.prompt_tokens / 1_000_000) * self.input_cost_per_1m + output_cost = (usage.completion_tokens / 1_000_000) * self.output_cost_per_1m + return input_cost + output_cost + + +def get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit=None): + """Get list of telegram_user_ids that have unprocessed candidates""" + with telegram_conn.cursor() as cur: + query = """ + SELECT DISTINCT telegram_user_id + FROM twitter_match_candidates + WHERE needs_llm_review = TRUE + AND llm_processed = FALSE + """ + + if checkpoint_manager.data['last_processed_telegram_id']: + query += f" AND telegram_user_id > {checkpoint_manager.data['last_processed_telegram_id']}" + + query += " ORDER BY telegram_user_id" + + if limit: + query += f" LIMIT {limit}" + + cur.execute(query) + return [row[0] for row in cur.fetchall()] + + +def get_candidates_for_telegram_user(telegram_user_id: int, telegram_conn): + """Get all candidates for a specific Telegram user""" + with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(""" + SELECT * + FROM twitter_match_candidates + WHERE telegram_user_id = %s + AND needs_llm_review = TRUE + AND llm_processed = FALSE + ORDER BY baseline_confidence DESC + """, (telegram_user_id,)) + return [dict(row) for row in cur.fetchall()] + + +def get_user_chat_context(user_id: int, telegram_conn) -> Dict: + """Get chat participation context for a user""" + with telegram_conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(""" + SELECT + STRING_AGG(DISTINCT c.title, ' | ') FILTER (WHERE c.title IS NOT NULL) as chat_titles, + COUNT(DISTINCT cp.chat_id) as chat_count + FROM chat_participants cp + JOIN chats c ON cp.chat_id = c.chat_id + WHERE cp.user_id = %s + AND c.title IS NOT NULL + AND c.chat_type != 'private' + """, (user_id,)) + result = cur.fetchone() + + if result and result['chat_titles']: + # Check if chats indicate crypto/web3 interest + chat_titles_lower = result['chat_titles'].lower() + crypto_keywords = ['crypto', 'bitcoin', 'eth', 'defi', 'nft', 'dao', 'web3', 'blockchain', + 'solana', 'near', 'avalanche', 'polygon', 'base', 'arbitrum', 'optimism', + 'cosmos', 'builders', 'degen', 'lobster'] + is_crypto_focused = any(keyword in chat_titles_lower for keyword in crypto_keywords) + + return { + 'chat_titles': result['chat_titles'], + 'chat_count': result['chat_count'], + 'is_crypto_focused': is_crypto_focused + } + + return {'chat_titles': None, 'chat_count': 0, 'is_crypto_focused': False} + + +async def verify_telegram_user(telegram_user_id: int, verifier: LLMVerifier, checkpoint_manager: CheckpointManager, + telegram_db, telegram_conn, log_file=None) -> Dict: + """Verify all candidates for a single Telegram user""" + + # Get Telegram profile + telegram_profile = telegram_db.query(Contact).filter( + Contact.user_id == telegram_user_id + ).first() + + if not telegram_profile: + return {'matches_saved': 0, 'tokens': 0, 'cost': 0} + + # Construct display name from first_name + last_name + display_name = (telegram_profile.first_name or '') + (' ' + telegram_profile.last_name if telegram_profile.last_name else '') + display_name = display_name.strip() or None + + # Get chat participation context + chat_context = get_user_chat_context(telegram_user_id, telegram_conn) + + telegram_dict = { + 'user_id': telegram_profile.user_id, + 'account_id': telegram_profile.account_id, + 'username': telegram_profile.username, + 'name': display_name, + 'bio': telegram_profile.bio, + 'chat_context': chat_context + } + + # Get all candidates + candidates = get_candidates_for_telegram_user(telegram_user_id, telegram_conn) + + if not candidates: + return {'matches_saved': 0, 'tokens': 0, 'cost': 0} + + # Verify with LLM + semaphore = asyncio.Semaphore(1) # One at a time for now + llm_result = await verifier.verify_batch(telegram_dict, candidates, semaphore, log_file) + + if not llm_result['success']: + # Mark as processed even on error so we don't retry infinitely + mark_candidates_processed([c['id'] for c in candidates], telegram_conn) + return { + 'matches_saved': 0, + 'tokens': llm_result['tokens_used'], + 'cost': llm_result['cost'] + } + + # Save matches + matches_saved = save_verified_matches( + telegram_dict, + candidates, + llm_result['results'], + telegram_conn + ) + + # Mark candidates as processed + mark_candidates_processed([c['id'] for c in candidates], telegram_conn) + + return { + 'matches_saved': matches_saved, + 'tokens': llm_result['tokens_used'], + 'cost': llm_result['cost'] + } + + +def save_verified_matches(telegram_profile: Dict, candidates: List[Dict], llm_results: List[Dict], telegram_conn): + """Save verified matches with confidence scores""" + + matches_to_save = [] + + # CRITICAL: Post-process to fix generic name false positives + tg_name = telegram_profile.get('name', '') + tg_username = telegram_profile.get('username', '') + tg_bio = telegram_profile.get('bio', '') + + # Check if profile has generic characteristics + has_generic_name = len(tg_name) <= 7 # Short display name + has_generic_username = len(tg_username) <= 8 # Short username + has_empty_bio = not tg_bio or len(tg_bio) <= 20 + + is_generic_profile = has_empty_bio and (has_generic_name or has_generic_username) + + for llm_result in llm_results: + candidate_idx = llm_result.get('candidate_index', 0) - 1 # Convert to 0-indexed + + if candidate_idx < 0 or candidate_idx >= len(candidates): + continue + + confidence = llm_result.get('confidence', 0) + reasoning = llm_result.get('reasoning', '') + + # Only save if confidence >= 0.5 (moderate or higher) + if confidence < 0.5: + continue + + candidate = candidates[candidate_idx] + + # CRITICAL FIX: Cap confidence for generic profiles with weak match methods + # Weak match methods are those based purely on name/username containment + weak_match_methods = [ + 'display_name_containment', + 'fuzzy_name', + 'tg_username_in_twitter_name', + 'twitter_username_in_tg_name' + ] + + if is_generic_profile and candidate['match_method'] in weak_match_methods: + # Cap at 0.70 unless it's a strong signal (phash, exact_username, exact_bio_handle) + if confidence > 0.70: + confidence = 0.70 + reasoning += " [Confidence capped at 0.70: generic profile + weak match method]" + + match_details = { + 'match_method': candidate['match_method'], + 'baseline_confidence': candidate['baseline_confidence'], + 'llm_confidence': confidence, + 'llm_reasoning': reasoning + } + + matches_to_save.append(( + telegram_profile['account_id'], + telegram_profile['user_id'], + telegram_profile.get('username'), + telegram_profile.get('name'), + telegram_profile.get('bio'), + candidate['twitter_id'], + candidate['twitter_username'], + candidate.get('twitter_name'), + candidate.get('twitter_bio'), + candidate.get('twitter_location'), + candidate.get('twitter_verified', False), + candidate.get('twitter_blue_verified', False), + candidate.get('twitter_followers_count', 0), + candidate['match_method'], + candidate['baseline_confidence'], + 'CONFIDENT' if confidence >= 0.8 else 'MODERATE' if confidence >= 0.6 else 'UNSURE', + confidence, + json.dumps(match_details), + llm_result.get('tokens_used', 0), + 0, # cost will be aggregated at the batch level + confidence < 0.75 # needs_manual_review + )) + + if matches_to_save: + with telegram_conn.cursor() as cur: + cur.executemany(""" + INSERT INTO twitter_telegram_matches ( + account_id, + telegram_user_id, + telegram_username, + telegram_name, + telegram_bio, + twitter_id, + twitter_username, + twitter_name, + twitter_bio, + twitter_location, + twitter_verified, + twitter_blue_verified, + twitter_followers_count, + match_method, + baseline_confidence, + llm_verdict, + final_confidence, + match_details, + llm_tokens_used, + llm_cost, + needs_manual_review + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (telegram_user_id, twitter_id) DO UPDATE SET + llm_verdict = EXCLUDED.llm_verdict, + final_confidence = EXCLUDED.final_confidence, + matched_at = NOW() + """, matches_to_save) + telegram_conn.commit() + + return len(matches_to_save) + + +def mark_candidates_processed(candidate_ids: List[int], telegram_conn): + """Mark candidates as LLM processed""" + with telegram_conn.cursor() as cur: + cur.execute(""" + UPDATE twitter_match_candidates + SET llm_processed = TRUE + WHERE id = ANY(%s) + """, (candidate_ids,)) + telegram_conn.commit() + + +async def main(): + print() + print("=" * 70) + print("๐Ÿค– Twitter-Telegram Match Verifier V2 (Confidence-Based)") + print("=" * 70) + print() + + # Check arguments + test_mode = '--test' in sys.argv + verbose = '--verbose' in sys.argv + limit = 100 if test_mode else None + + # Check for concurrency argument + concurrent_requests = 10 # Default + for i, arg in enumerate(sys.argv): + if arg == '--concurrent' and i + 1 < len(sys.argv): + try: + concurrent_requests = int(sys.argv[i + 1]) + except ValueError: + pass + + # Setup verbose logging + log_file = None + if verbose: + log_file = open(Path(__file__).parent.parent / 'verification_v2_log.txt', 'w') + log_file.write("=" * 100 + "\n") + log_file.write("VERIFICATION V2 LOG\n") + log_file.write("=" * 100 + "\n\n") + + if test_mode: + print("๐Ÿงช TEST MODE: Processing first 100 Telegram users only") + print() + + # Initialize checkpoint + checkpoint_manager = CheckpointManager(CHECKPOINT_FILE) + + if checkpoint_manager.data['last_processed_telegram_id']: + print(f"๐Ÿ“ Resuming from telegram_user_id {checkpoint_manager.data['last_processed_telegram_id']}") + print(f" Already processed: {checkpoint_manager.data['processed_count']:,}") + print(f" Cost so far: ${checkpoint_manager.data['total_cost']:.4f}") + print() + else: + checkpoint_manager.data['started_at'] = datetime.now().isoformat() + + # Connect to databases + print("๐Ÿ“ก Connecting to databases...") + telegram_db = SessionLocal() + + try: + telegram_conn = psycopg2.connect(dbname='telegram_contacts', user='andrewjiang', host='localhost', port=5432) + telegram_conn.autocommit = False + except Exception as e: + print(f"โŒ Failed to connect to Telegram database: {e}") + return False + + print("โœ… Connected") + print() + + try: + # Load telegram users needing verification + print("๐Ÿ” Loading telegram users with candidates...") + telegram_user_ids = get_telegram_users_with_candidates(telegram_conn, checkpoint_manager, limit) + + if not telegram_user_ids: + print("โœ… No users to verify!") + return True + + print(f"โœ… Found {len(telegram_user_ids):,} telegram users to verify") + print() + + # Estimate cost (rough) + estimated_cost = len(telegram_user_ids) * 0.003 # ~$0.003 per user + print(f"๐Ÿ’ฐ Estimated cost: ${estimated_cost:.4f}") + print() + + # Initialize verifier + verifier = LLMVerifier() + + print("๐Ÿš€ Starting LLM verification...") + print(f"โšก Concurrent requests: {concurrent_requests}") + print() + + # Configuration for parallel processing + CONCURRENT_REQUESTS = concurrent_requests # Process N users at a time + BATCH_SIZE = 50 # Save checkpoint every 50 users + + # Process users in parallel batches + total_users = len(telegram_user_ids) + processed_count = 0 + + for batch_start in range(0, total_users, BATCH_SIZE): + batch_end = min(batch_start + BATCH_SIZE, total_users) + batch = telegram_user_ids[batch_start:batch_end] + + print(f"๐Ÿ“ฆ Processing batch {batch_start//BATCH_SIZE + 1}/{(total_users + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} users)...") + + # Create tasks for concurrent processing + tasks = [] + for telegram_user_id in batch: + task = verify_telegram_user( + telegram_user_id, + verifier, + checkpoint_manager, + telegram_db, + telegram_conn, + log_file + ) + tasks.append((telegram_user_id, task)) + + # Process batch concurrently with limit + semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS) + + async def process_with_semaphore(user_id, task): + async with semaphore: + return user_id, await task + + results = await asyncio.gather( + *[process_with_semaphore(user_id, task) for user_id, task in tasks], + return_exceptions=True + ) + + # Process results and update checkpoints + for i, result in enumerate(results): + processed_count += 1 + user_id = batch[i] + + if isinstance(result, Exception): + print(f"[{processed_count}/{total_users}] โŒ User {user_id} failed: {result}") + continue + + user_id_result, verification_result = result + print(f"[{processed_count}/{total_users}] โœ… User {user_id_result}: {verification_result['matches_saved']} matches | ${verification_result['cost']:.4f}") + + # Update checkpoint + checkpoint_manager.update( + user_id_result, + verification_result['matches_saved'], + verification_result['tokens'], + verification_result['cost'] + ) + + print(f" Batch complete. Total processed: {processed_count}/{total_users}") + print() + + # Final stats + print() + print("=" * 70) + print("โœ… VERIFICATION COMPLETE") + print("=" * 70) + print() + print(f"๐Ÿ“Š Statistics:") + print(f" Processed: {checkpoint_manager.data['processed_count']:,} telegram users") + print(f" ๐Ÿ’พ Saved matches: {checkpoint_manager.data['total_matches_saved']:,}") + print() + print(f"๐Ÿ’ฐ Cost:") + print(f" Total tokens: {checkpoint_manager.data['total_tokens']:,}") + print(f" Total cost: ${checkpoint_manager.data['total_cost']:.4f}") + print() + + # Clean up checkpoint + CHECKPOINT_FILE.unlink(missing_ok=True) + + return True + + except Exception as e: + print(f"โŒ Error: {e}") + import traceback + traceback.print_exc() + return False + + finally: + if log_file: + log_file.close() + telegram_db.close() + telegram_conn.close() + checkpoint_manager.save() + + +if __name__ == "__main__": + try: + success = asyncio.run(main()) + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\n\nโš ๏ธ Interrupted by user") + print("๐Ÿ’พ Progress saved - you can resume by running this script again") + sys.exit(1)