Files
ProfileMatching/review_match_quality.py
Andrew Jiang 5319d4d868 Initial commit: Twitter-Telegram Profile Matching System
This module provides comprehensive Twitter-to-Telegram profile matching
and verification using 10 different matching methods and LLM verification.

Features:
- 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names)
- URL resolution integration for t.co → t.me links
- Async LLM verification with GPT-5-mini
- Interactive menu system with real-time stats
- Threaded candidate finding (~1.5 contacts/sec)
- Comprehensive documentation and guides

Key Components:
- find_twitter_candidates.py: Core matching logic (10 methods)
- find_twitter_candidates_threaded.py: Threaded implementation
- verify_twitter_matches_v2.py: LLM verification (V5 prompt)
- review_match_quality.py: Analysis and quality review
- main.py: Interactive menu system
- Complete documentation (README, CHANGELOG, QUICKSTART)

Performance:
- Candidate finding: ~16-18 hours for 43K contacts
- LLM verification: ~23 hours for 43K users
- Cost: ~$130 for full verification

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 22:56:25 -08:00

407 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Critical Match Quality Reviewer
Analyzes verification results with deep understanding of:
- Twitter/Telegram/crypto culture
- Common false positive patterns
- Company vs personal account indicators
- Context alignment
"""
import re
import json
from pathlib import Path
from typing import List, Dict, Tuple
LOG_FILE = Path(__file__).parent.parent / 'verification_v2_log.txt'
class MatchReviewer:
"""Critical reviewer with crypto/web3 domain knowledge"""
# Company/product/team indicators
COMPANY_INDICATORS = [
r'\bis\s+(a|an)\s+\w+\s+(way|tool|platform|app|service|protocol)',
r'official\s+(account|page|channel)',
r'(team|official)\s*$',
r'^(the|a)\s+\w+\s+(for|to)',
r'brought to you by',
r'hosted by',
r'founded by',
r'(community|project)\s+(account|page)',
r'(dao|protocol|network)\s*$',
r'building\s+(the|a)\s+',
]
# Personal account indicators
PERSONAL_INDICATORS = [
r'(ceo|founder|co-founder|developer|builder|engineer|researcher)\s+at',
r'working\s+(at|on|with)',
r'^(i|i\'m|my)',
r'(my|personal)\s+(views|opinions|thoughts)',
r'\b(he/him|she/her|they/them)\b',
]
# Crypto/Web3 keywords
CRYPTO_KEYWORDS = [
'web3', 'crypto', 'blockchain', 'defi', 'nft', 'dao', 'dapp',
'ethereum', 'solana', 'bitcoin', 'polygon', 'base', 'arbitrum',
'smart contract', 'token', 'wallet', 'metamask', 'coinbase',
'yield', 'farming', 'staking', 'airdrop', 'whitelist', 'mint',
'protocol', 'l1', 'l2', 'rollup', 'zk', 'evm'
]
def __init__(self):
self.issues = []
self.stats = {
'total_matches': 0,
'false_positives': 0,
'questionable': 0,
'good_matches': 0,
'company_accounts': 0,
'weak_evidence': 0,
'context_mismatch': 0,
}
def parse_log(self) -> List[Dict]:
"""Parse the verification log into structured data"""
with open(LOG_FILE, 'r') as f:
content = f.read()
entries = []
# Find all sections that start with TELEGRAM USER
pattern = r'TELEGRAM USER: ([^\(]+) \(ID: (\d+)\)(.*?)(?=TELEGRAM USER:|$)'
matches = re.finditer(pattern, content, re.DOTALL)
for match in matches:
tg_username = match.group(1).strip()
tg_id = int(match.group(2))
section = match.group(3)
# Extract TG profile
tg_bio_match = re.search(r'Bio: (.*?)(?:\n+TWITTER CANDIDATES)', section, re.DOTALL)
tg_bio = tg_bio_match.group(1).strip() if tg_bio_match else ''
# Extract username specificity
spec_match = re.search(r'username has specificity score ([\d.]+)', section)
specificity = float(spec_match.group(1)) if spec_match else 0.5
# Extract candidates
candidates = []
candidate_blocks = re.findall(
r'\[Candidate (\d+)\](.*?)(?=\[Candidate \d+\]|LLM RESPONSE:)',
section,
re.DOTALL
)
for idx, block in candidate_blocks:
tw_username = re.search(r'Twitter Username: @(\S+)', block)
tw_name = re.search(r'Twitter Display Name: (.+)', block)
tw_bio = re.search(r'Twitter Bio: (.*?)(?=\nLocation:)', block, re.DOTALL)
tw_followers = re.search(r'Followers: ([\d,]+)', block)
match_method = re.search(r'Match Method: (\S+)', block)
baseline_conf = re.search(r'Baseline Confidence: ([\d.]+)', block)
candidates.append({
'index': int(idx),
'twitter_username': tw_username.group(1) if tw_username else '',
'twitter_name': tw_name.group(1).strip() if tw_name else '',
'twitter_bio': tw_bio.group(1).strip() if tw_bio else '',
'twitter_followers': tw_followers.group(1) if tw_followers else '0',
'match_method': match_method.group(1) if match_method else '',
'baseline_confidence': float(baseline_conf.group(1)) if baseline_conf else 0.0,
})
# Extract LLM response (handle multiline JSON with nested structures)
llm_match = re.search(r'LLM RESPONSE:\s*-+\s*(\{.*)', section, re.DOTALL)
if llm_match:
try:
# Extract JSON - it should be everything after "LLM RESPONSE:" until end of section
json_text = llm_match.group(1)
# Find the JSON object (balanced braces)
brace_count = 0
json_end = 0
for i, char in enumerate(json_text):
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
json_end = i + 1
break
if json_end > 0:
json_str = json_text[:json_end]
llm_result = json.loads(json_str)
else:
llm_result = {'candidates': []}
except Exception as e:
llm_result = {'candidates': []}
else:
llm_result = {'candidates': []}
entries.append({
'telegram_username': tg_username,
'telegram_id': tg_id,
'telegram_bio': tg_bio,
'username_specificity': specificity,
'candidates': candidates,
'llm_results': llm_result.get('candidates', [])
})
return entries
def is_company_account(self, bio: str, name: str) -> Tuple[bool, str]:
"""Detect if this is a company/product/team account"""
text = (bio + ' ' + name).lower()
for pattern in self.COMPANY_INDICATORS:
if re.search(pattern, text, re.IGNORECASE):
return True, f"Company pattern: '{pattern}'"
# Check if name equals bio description
if bio and len(bio.split()) < 20:
# Short bio describing what something "is"
if re.search(r'\bis\s+(a|an|the)\s+', bio, re.IGNORECASE):
return True, "Bio describes a product/service"
return False, ""
def is_personal_account(self, bio: str) -> bool:
"""Detect personal account indicators"""
for pattern in self.PERSONAL_INDICATORS:
if re.search(pattern, bio, re.IGNORECASE):
return True
return False
def has_crypto_context(self, bio: str) -> Tuple[bool, List[str]]:
"""Check if bio has crypto/web3 context"""
if not bio:
return False, []
bio_lower = bio.lower()
found_keywords = []
for keyword in self.CRYPTO_KEYWORDS:
if keyword in bio_lower:
found_keywords.append(keyword)
return len(found_keywords) > 0, found_keywords
def review_match(self, entry: Dict) -> Dict:
"""Critically review a single match"""
issues = []
severity = 'GOOD'
tg_username = entry['telegram_username']
tg_bio = entry['telegram_bio']
tg_has_crypto, tg_crypto_keywords = self.has_crypto_context(tg_bio)
# Review each LLM-approved match (confidence >= 0.5)
for llm_result in entry['llm_results']:
confidence = llm_result.get('confidence', 0)
if confidence < 0.5:
continue
self.stats['total_matches'] += 1
candidate_idx = llm_result.get('candidate_index', 0) - 1
if candidate_idx < 0 or candidate_idx >= len(entry['candidates']):
continue
candidate = entry['candidates'][candidate_idx]
tw_username = candidate['twitter_username']
tw_bio = candidate['twitter_bio']
tw_name = candidate['twitter_name']
match_method = candidate['match_method']
# Check 1: Company account
is_company, company_reason = self.is_company_account(tw_bio, tw_name)
if is_company:
issues.append({
'type': 'COMPANY_ACCOUNT',
'severity': 'HIGH',
'description': f"Twitter @{tw_username} appears to be a company/product account",
'evidence': company_reason,
'confidence': confidence
})
self.stats['company_accounts'] += 1
severity = 'FALSE_POSITIVE'
# Check 2: Context mismatch
tw_has_crypto, tw_crypto_keywords = self.has_crypto_context(tw_bio)
if tg_has_crypto and not tw_has_crypto:
issues.append({
'type': 'CONTEXT_MISMATCH',
'severity': 'MEDIUM',
'description': f"TG has crypto context but TW doesn't",
'evidence': f"TG keywords: {tg_crypto_keywords}, TW keywords: none",
'confidence': confidence
})
self.stats['context_mismatch'] += 1
if severity == 'GOOD':
severity = 'QUESTIONABLE'
# Check 3: Empty bio with no strong evidence
if not tg_bio and not tw_bio and confidence > 0.8:
issues.append({
'type': 'WEAK_EVIDENCE',
'severity': 'MEDIUM',
'description': f"High confidence ({confidence}) with both bios empty",
'evidence': f"Only username match, no contextual verification",
'confidence': confidence
})
self.stats['weak_evidence'] += 1
if severity == 'GOOD':
severity = 'QUESTIONABLE'
# Check 4: Generic username with high confidence
if entry['username_specificity'] < 0.6 and confidence > 0.85:
issues.append({
'type': 'GENERIC_USERNAME',
'severity': 'LOW',
'description': f"Generic username ({entry['username_specificity']:.2f} specificity) with high confidence",
'evidence': f"Username: {tg_username}",
'confidence': confidence
})
if severity == 'GOOD':
severity = 'QUESTIONABLE'
# Check 5: Twitter bio mentions other accounts
if match_method == 'twitter_bio_has_telegram':
# Check if the telegram username appears as @mention (not the account itself)
mentions = re.findall(r'@(\w+)', tw_bio)
if tg_username.lower() not in [m.lower() for m in mentions]:
# The username is embedded in another handle
issues.append({
'type': 'SUBSTRING_MATCH',
'severity': 'HIGH',
'description': f"TG username found as substring in other accounts, not direct mention",
'evidence': f"TW bio: {tw_bio[:100]}",
'confidence': confidence
})
self.stats['false_positives'] += 1
severity = 'FALSE_POSITIVE'
# Count severity
if severity == 'FALSE_POSITIVE':
self.stats['false_positives'] += 1
elif severity == 'QUESTIONABLE':
self.stats['questionable'] += 1
else:
self.stats['good_matches'] += 1
return {
'telegram_username': tg_username,
'telegram_id': entry['telegram_id'],
'severity': severity,
'issues': issues,
'entry': entry
}
def generate_report(self, reviews: List[Dict]):
"""Generate comprehensive review report"""
print()
print("=" * 100)
print("🔍 MATCH QUALITY REVIEW REPORT")
print("=" * 100)
print()
print("📊 STATISTICS:")
print(f" Total matches reviewed: {self.stats['total_matches']}")
print(f" ✅ Good matches: {self.stats['good_matches']} ({self.stats['good_matches']/max(self.stats['total_matches'],1)*100:.1f}%)")
print(f" ⚠️ Questionable: {self.stats['questionable']} ({self.stats['questionable']/max(self.stats['total_matches'],1)*100:.1f}%)")
print(f" ❌ False positives: {self.stats['false_positives']} ({self.stats['false_positives']/max(self.stats['total_matches'],1)*100:.1f}%)")
print()
print("🚨 ISSUE BREAKDOWN:")
print(f" Company accounts: {self.stats['company_accounts']}")
print(f" Context mismatches: {self.stats['context_mismatch']}")
print(f" Weak evidence: {self.stats['weak_evidence']}")
print()
# Show false positives
false_positives = [r for r in reviews if r['severity'] == 'FALSE_POSITIVE']
if false_positives:
print("=" * 100)
print("❌ FALSE POSITIVES:")
print("=" * 100)
for review in false_positives[:10]: # Show top 10
print()
print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})")
print(f"TG Bio: {review['entry']['telegram_bio'][:100]}")
for issue in review['issues']:
print(f" ❌ [{issue['severity']}] {issue['type']}: {issue['description']}")
print(f" Evidence: {issue['evidence'][:150]}")
print(f" LLM Confidence: {issue['confidence']:.2f}")
# Show questionable matches
questionable = [r for r in reviews if r['severity'] == 'QUESTIONABLE']
if questionable:
print()
print("=" * 100)
print("⚠️ QUESTIONABLE MATCHES:")
print("=" * 100)
for review in questionable[:10]: # Show top 10
print()
print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})")
for issue in review['issues']:
print(f" ⚠️ [{issue['severity']}] {issue['type']}: {issue['description']}")
print(f" Evidence: {issue['evidence'][:150]}")
print(f" LLM Confidence: {issue['confidence']:.2f}")
print()
print("=" * 100)
print("💡 RECOMMENDATIONS:")
print("=" * 100)
print()
if self.stats['company_accounts'] > 0:
print("1. Add company account detection to prompt:")
print(" - Check for product descriptions ('X is a platform for...')")
print(" - Look for 'official', 'team', 'hosted by' patterns")
print(" - Distinguish personal vs organizational accounts")
print()
if self.stats['context_mismatch'] > 0:
print("2. Strengthen context matching:")
print(" - Require crypto/web3 keywords in both profiles")
print(" - Lower confidence when contexts don't align")
print()
if self.stats['weak_evidence'] > 0:
print("3. Adjust confidence for weak evidence:")
print(" - Cap confidence at 0.70 when both bios are empty")
print(" - Require additional signals beyond username match")
print()
print("4. Fix 'twitter_bio_has_telegram' method:")
print(" - Only match direct @mentions, not substrings in other handles")
print(" - Example: @hipster should NOT match mentions of @HipsterHacker")
print()
def main():
reviewer = MatchReviewer()
print("📖 Parsing verification log...")
entries = reviewer.parse_log()
print(f"✅ Parsed {len(entries)} verification entries")
print()
print("🔍 Reviewing match quality...")
reviews = []
for entry in entries:
if entry['llm_results']: # Only review entries with matches
review = reviewer.review_match(entry)
reviews.append(review)
print(f"✅ Reviewed {len(reviews)} matches")
reviewer.generate_report(reviews)
if __name__ == "__main__":
main()