mirror of
https://github.com/lockin-bot/ProfileMatching.git
synced 2026-01-12 09:44:30 +08:00
This module provides comprehensive Twitter-to-Telegram profile matching and verification using 10 different matching methods and LLM verification. Features: - 10 matching methods (phash, usernames, bio handles, URL resolution, fuzzy names) - URL resolution integration for t.co → t.me links - Async LLM verification with GPT-5-mini - Interactive menu system with real-time stats - Threaded candidate finding (~1.5 contacts/sec) - Comprehensive documentation and guides Key Components: - find_twitter_candidates.py: Core matching logic (10 methods) - find_twitter_candidates_threaded.py: Threaded implementation - verify_twitter_matches_v2.py: LLM verification (V5 prompt) - review_match_quality.py: Analysis and quality review - main.py: Interactive menu system - Complete documentation (README, CHANGELOG, QUICKSTART) Performance: - Candidate finding: ~16-18 hours for 43K contacts - LLM verification: ~23 hours for 43K users - Cost: ~$130 for full verification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
407 lines
16 KiB
Python
Executable File
407 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Critical Match Quality Reviewer
|
|
|
|
Analyzes verification results with deep understanding of:
|
|
- Twitter/Telegram/crypto culture
|
|
- Common false positive patterns
|
|
- Company vs personal account indicators
|
|
- Context alignment
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
|
|
LOG_FILE = Path(__file__).parent.parent / 'verification_v2_log.txt'
|
|
|
|
|
|
class MatchReviewer:
|
|
"""Critical reviewer with crypto/web3 domain knowledge"""
|
|
|
|
# Company/product/team indicators
|
|
COMPANY_INDICATORS = [
|
|
r'\bis\s+(a|an)\s+\w+\s+(way|tool|platform|app|service|protocol)',
|
|
r'official\s+(account|page|channel)',
|
|
r'(team|official)\s*$',
|
|
r'^(the|a)\s+\w+\s+(for|to)',
|
|
r'brought to you by',
|
|
r'hosted by',
|
|
r'founded by',
|
|
r'(community|project)\s+(account|page)',
|
|
r'(dao|protocol|network)\s*$',
|
|
r'building\s+(the|a)\s+',
|
|
]
|
|
|
|
# Personal account indicators
|
|
PERSONAL_INDICATORS = [
|
|
r'(ceo|founder|co-founder|developer|builder|engineer|researcher)\s+at',
|
|
r'working\s+(at|on|with)',
|
|
r'^(i|i\'m|my)',
|
|
r'(my|personal)\s+(views|opinions|thoughts)',
|
|
r'\b(he/him|she/her|they/them)\b',
|
|
]
|
|
|
|
# Crypto/Web3 keywords
|
|
CRYPTO_KEYWORDS = [
|
|
'web3', 'crypto', 'blockchain', 'defi', 'nft', 'dao', 'dapp',
|
|
'ethereum', 'solana', 'bitcoin', 'polygon', 'base', 'arbitrum',
|
|
'smart contract', 'token', 'wallet', 'metamask', 'coinbase',
|
|
'yield', 'farming', 'staking', 'airdrop', 'whitelist', 'mint',
|
|
'protocol', 'l1', 'l2', 'rollup', 'zk', 'evm'
|
|
]
|
|
|
|
def __init__(self):
|
|
self.issues = []
|
|
self.stats = {
|
|
'total_matches': 0,
|
|
'false_positives': 0,
|
|
'questionable': 0,
|
|
'good_matches': 0,
|
|
'company_accounts': 0,
|
|
'weak_evidence': 0,
|
|
'context_mismatch': 0,
|
|
}
|
|
|
|
def parse_log(self) -> List[Dict]:
|
|
"""Parse the verification log into structured data"""
|
|
with open(LOG_FILE, 'r') as f:
|
|
content = f.read()
|
|
|
|
entries = []
|
|
# Find all sections that start with TELEGRAM USER
|
|
pattern = r'TELEGRAM USER: ([^\(]+) \(ID: (\d+)\)(.*?)(?=TELEGRAM USER:|$)'
|
|
matches = re.finditer(pattern, content, re.DOTALL)
|
|
|
|
for match in matches:
|
|
tg_username = match.group(1).strip()
|
|
tg_id = int(match.group(2))
|
|
section = match.group(3)
|
|
|
|
# Extract TG profile
|
|
tg_bio_match = re.search(r'Bio: (.*?)(?:\n+TWITTER CANDIDATES)', section, re.DOTALL)
|
|
tg_bio = tg_bio_match.group(1).strip() if tg_bio_match else ''
|
|
|
|
# Extract username specificity
|
|
spec_match = re.search(r'username has specificity score ([\d.]+)', section)
|
|
specificity = float(spec_match.group(1)) if spec_match else 0.5
|
|
|
|
# Extract candidates
|
|
candidates = []
|
|
candidate_blocks = re.findall(
|
|
r'\[Candidate (\d+)\](.*?)(?=\[Candidate \d+\]|LLM RESPONSE:)',
|
|
section,
|
|
re.DOTALL
|
|
)
|
|
|
|
for idx, block in candidate_blocks:
|
|
tw_username = re.search(r'Twitter Username: @(\S+)', block)
|
|
tw_name = re.search(r'Twitter Display Name: (.+)', block)
|
|
tw_bio = re.search(r'Twitter Bio: (.*?)(?=\nLocation:)', block, re.DOTALL)
|
|
tw_followers = re.search(r'Followers: ([\d,]+)', block)
|
|
match_method = re.search(r'Match Method: (\S+)', block)
|
|
baseline_conf = re.search(r'Baseline Confidence: ([\d.]+)', block)
|
|
|
|
candidates.append({
|
|
'index': int(idx),
|
|
'twitter_username': tw_username.group(1) if tw_username else '',
|
|
'twitter_name': tw_name.group(1).strip() if tw_name else '',
|
|
'twitter_bio': tw_bio.group(1).strip() if tw_bio else '',
|
|
'twitter_followers': tw_followers.group(1) if tw_followers else '0',
|
|
'match_method': match_method.group(1) if match_method else '',
|
|
'baseline_confidence': float(baseline_conf.group(1)) if baseline_conf else 0.0,
|
|
})
|
|
|
|
# Extract LLM response (handle multiline JSON with nested structures)
|
|
llm_match = re.search(r'LLM RESPONSE:\s*-+\s*(\{.*)', section, re.DOTALL)
|
|
if llm_match:
|
|
try:
|
|
# Extract JSON - it should be everything after "LLM RESPONSE:" until end of section
|
|
json_text = llm_match.group(1)
|
|
# Find the JSON object (balanced braces)
|
|
brace_count = 0
|
|
json_end = 0
|
|
for i, char in enumerate(json_text):
|
|
if char == '{':
|
|
brace_count += 1
|
|
elif char == '}':
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
json_end = i + 1
|
|
break
|
|
|
|
if json_end > 0:
|
|
json_str = json_text[:json_end]
|
|
llm_result = json.loads(json_str)
|
|
else:
|
|
llm_result = {'candidates': []}
|
|
except Exception as e:
|
|
llm_result = {'candidates': []}
|
|
else:
|
|
llm_result = {'candidates': []}
|
|
|
|
entries.append({
|
|
'telegram_username': tg_username,
|
|
'telegram_id': tg_id,
|
|
'telegram_bio': tg_bio,
|
|
'username_specificity': specificity,
|
|
'candidates': candidates,
|
|
'llm_results': llm_result.get('candidates', [])
|
|
})
|
|
|
|
return entries
|
|
|
|
def is_company_account(self, bio: str, name: str) -> Tuple[bool, str]:
|
|
"""Detect if this is a company/product/team account"""
|
|
text = (bio + ' ' + name).lower()
|
|
|
|
for pattern in self.COMPANY_INDICATORS:
|
|
if re.search(pattern, text, re.IGNORECASE):
|
|
return True, f"Company pattern: '{pattern}'"
|
|
|
|
# Check if name equals bio description
|
|
if bio and len(bio.split()) < 20:
|
|
# Short bio describing what something "is"
|
|
if re.search(r'\bis\s+(a|an|the)\s+', bio, re.IGNORECASE):
|
|
return True, "Bio describes a product/service"
|
|
|
|
return False, ""
|
|
|
|
def is_personal_account(self, bio: str) -> bool:
|
|
"""Detect personal account indicators"""
|
|
for pattern in self.PERSONAL_INDICATORS:
|
|
if re.search(pattern, bio, re.IGNORECASE):
|
|
return True
|
|
return False
|
|
|
|
def has_crypto_context(self, bio: str) -> Tuple[bool, List[str]]:
|
|
"""Check if bio has crypto/web3 context"""
|
|
if not bio:
|
|
return False, []
|
|
|
|
bio_lower = bio.lower()
|
|
found_keywords = []
|
|
|
|
for keyword in self.CRYPTO_KEYWORDS:
|
|
if keyword in bio_lower:
|
|
found_keywords.append(keyword)
|
|
|
|
return len(found_keywords) > 0, found_keywords
|
|
|
|
def review_match(self, entry: Dict) -> Dict:
|
|
"""Critically review a single match"""
|
|
issues = []
|
|
severity = 'GOOD'
|
|
|
|
tg_username = entry['telegram_username']
|
|
tg_bio = entry['telegram_bio']
|
|
tg_has_crypto, tg_crypto_keywords = self.has_crypto_context(tg_bio)
|
|
|
|
# Review each LLM-approved match (confidence >= 0.5)
|
|
for llm_result in entry['llm_results']:
|
|
confidence = llm_result.get('confidence', 0)
|
|
if confidence < 0.5:
|
|
continue
|
|
|
|
self.stats['total_matches'] += 1
|
|
candidate_idx = llm_result.get('candidate_index', 0) - 1
|
|
|
|
if candidate_idx < 0 or candidate_idx >= len(entry['candidates']):
|
|
continue
|
|
|
|
candidate = entry['candidates'][candidate_idx]
|
|
tw_username = candidate['twitter_username']
|
|
tw_bio = candidate['twitter_bio']
|
|
tw_name = candidate['twitter_name']
|
|
match_method = candidate['match_method']
|
|
|
|
# Check 1: Company account
|
|
is_company, company_reason = self.is_company_account(tw_bio, tw_name)
|
|
if is_company:
|
|
issues.append({
|
|
'type': 'COMPANY_ACCOUNT',
|
|
'severity': 'HIGH',
|
|
'description': f"Twitter @{tw_username} appears to be a company/product account",
|
|
'evidence': company_reason,
|
|
'confidence': confidence
|
|
})
|
|
self.stats['company_accounts'] += 1
|
|
severity = 'FALSE_POSITIVE'
|
|
|
|
# Check 2: Context mismatch
|
|
tw_has_crypto, tw_crypto_keywords = self.has_crypto_context(tw_bio)
|
|
|
|
if tg_has_crypto and not tw_has_crypto:
|
|
issues.append({
|
|
'type': 'CONTEXT_MISMATCH',
|
|
'severity': 'MEDIUM',
|
|
'description': f"TG has crypto context but TW doesn't",
|
|
'evidence': f"TG keywords: {tg_crypto_keywords}, TW keywords: none",
|
|
'confidence': confidence
|
|
})
|
|
self.stats['context_mismatch'] += 1
|
|
if severity == 'GOOD':
|
|
severity = 'QUESTIONABLE'
|
|
|
|
# Check 3: Empty bio with no strong evidence
|
|
if not tg_bio and not tw_bio and confidence > 0.8:
|
|
issues.append({
|
|
'type': 'WEAK_EVIDENCE',
|
|
'severity': 'MEDIUM',
|
|
'description': f"High confidence ({confidence}) with both bios empty",
|
|
'evidence': f"Only username match, no contextual verification",
|
|
'confidence': confidence
|
|
})
|
|
self.stats['weak_evidence'] += 1
|
|
if severity == 'GOOD':
|
|
severity = 'QUESTIONABLE'
|
|
|
|
# Check 4: Generic username with high confidence
|
|
if entry['username_specificity'] < 0.6 and confidence > 0.85:
|
|
issues.append({
|
|
'type': 'GENERIC_USERNAME',
|
|
'severity': 'LOW',
|
|
'description': f"Generic username ({entry['username_specificity']:.2f} specificity) with high confidence",
|
|
'evidence': f"Username: {tg_username}",
|
|
'confidence': confidence
|
|
})
|
|
if severity == 'GOOD':
|
|
severity = 'QUESTIONABLE'
|
|
|
|
# Check 5: Twitter bio mentions other accounts
|
|
if match_method == 'twitter_bio_has_telegram':
|
|
# Check if the telegram username appears as @mention (not the account itself)
|
|
mentions = re.findall(r'@(\w+)', tw_bio)
|
|
if tg_username.lower() not in [m.lower() for m in mentions]:
|
|
# The username is embedded in another handle
|
|
issues.append({
|
|
'type': 'SUBSTRING_MATCH',
|
|
'severity': 'HIGH',
|
|
'description': f"TG username found as substring in other accounts, not direct mention",
|
|
'evidence': f"TW bio: {tw_bio[:100]}",
|
|
'confidence': confidence
|
|
})
|
|
self.stats['false_positives'] += 1
|
|
severity = 'FALSE_POSITIVE'
|
|
|
|
# Count severity
|
|
if severity == 'FALSE_POSITIVE':
|
|
self.stats['false_positives'] += 1
|
|
elif severity == 'QUESTIONABLE':
|
|
self.stats['questionable'] += 1
|
|
else:
|
|
self.stats['good_matches'] += 1
|
|
|
|
return {
|
|
'telegram_username': tg_username,
|
|
'telegram_id': entry['telegram_id'],
|
|
'severity': severity,
|
|
'issues': issues,
|
|
'entry': entry
|
|
}
|
|
|
|
def generate_report(self, reviews: List[Dict]):
|
|
"""Generate comprehensive review report"""
|
|
print()
|
|
print("=" * 100)
|
|
print("🔍 MATCH QUALITY REVIEW REPORT")
|
|
print("=" * 100)
|
|
print()
|
|
|
|
print("📊 STATISTICS:")
|
|
print(f" Total matches reviewed: {self.stats['total_matches']}")
|
|
print(f" ✅ Good matches: {self.stats['good_matches']} ({self.stats['good_matches']/max(self.stats['total_matches'],1)*100:.1f}%)")
|
|
print(f" ⚠️ Questionable: {self.stats['questionable']} ({self.stats['questionable']/max(self.stats['total_matches'],1)*100:.1f}%)")
|
|
print(f" ❌ False positives: {self.stats['false_positives']} ({self.stats['false_positives']/max(self.stats['total_matches'],1)*100:.1f}%)")
|
|
print()
|
|
|
|
print("🚨 ISSUE BREAKDOWN:")
|
|
print(f" Company accounts: {self.stats['company_accounts']}")
|
|
print(f" Context mismatches: {self.stats['context_mismatch']}")
|
|
print(f" Weak evidence: {self.stats['weak_evidence']}")
|
|
print()
|
|
|
|
# Show false positives
|
|
false_positives = [r for r in reviews if r['severity'] == 'FALSE_POSITIVE']
|
|
if false_positives:
|
|
print("=" * 100)
|
|
print("❌ FALSE POSITIVES:")
|
|
print("=" * 100)
|
|
for review in false_positives[:10]: # Show top 10
|
|
print()
|
|
print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})")
|
|
print(f"TG Bio: {review['entry']['telegram_bio'][:100]}")
|
|
for issue in review['issues']:
|
|
print(f" ❌ [{issue['severity']}] {issue['type']}: {issue['description']}")
|
|
print(f" Evidence: {issue['evidence'][:150]}")
|
|
print(f" LLM Confidence: {issue['confidence']:.2f}")
|
|
|
|
# Show questionable matches
|
|
questionable = [r for r in reviews if r['severity'] == 'QUESTIONABLE']
|
|
if questionable:
|
|
print()
|
|
print("=" * 100)
|
|
print("⚠️ QUESTIONABLE MATCHES:")
|
|
print("=" * 100)
|
|
for review in questionable[:10]: # Show top 10
|
|
print()
|
|
print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})")
|
|
for issue in review['issues']:
|
|
print(f" ⚠️ [{issue['severity']}] {issue['type']}: {issue['description']}")
|
|
print(f" Evidence: {issue['evidence'][:150]}")
|
|
print(f" LLM Confidence: {issue['confidence']:.2f}")
|
|
|
|
print()
|
|
print("=" * 100)
|
|
print("💡 RECOMMENDATIONS:")
|
|
print("=" * 100)
|
|
print()
|
|
|
|
if self.stats['company_accounts'] > 0:
|
|
print("1. Add company account detection to prompt:")
|
|
print(" - Check for product descriptions ('X is a platform for...')")
|
|
print(" - Look for 'official', 'team', 'hosted by' patterns")
|
|
print(" - Distinguish personal vs organizational accounts")
|
|
print()
|
|
|
|
if self.stats['context_mismatch'] > 0:
|
|
print("2. Strengthen context matching:")
|
|
print(" - Require crypto/web3 keywords in both profiles")
|
|
print(" - Lower confidence when contexts don't align")
|
|
print()
|
|
|
|
if self.stats['weak_evidence'] > 0:
|
|
print("3. Adjust confidence for weak evidence:")
|
|
print(" - Cap confidence at 0.70 when both bios are empty")
|
|
print(" - Require additional signals beyond username match")
|
|
print()
|
|
|
|
print("4. Fix 'twitter_bio_has_telegram' method:")
|
|
print(" - Only match direct @mentions, not substrings in other handles")
|
|
print(" - Example: @hipster should NOT match mentions of @HipsterHacker")
|
|
print()
|
|
|
|
|
|
def main():
|
|
reviewer = MatchReviewer()
|
|
|
|
print("📖 Parsing verification log...")
|
|
entries = reviewer.parse_log()
|
|
print(f"✅ Parsed {len(entries)} verification entries")
|
|
print()
|
|
|
|
print("🔍 Reviewing match quality...")
|
|
reviews = []
|
|
for entry in entries:
|
|
if entry['llm_results']: # Only review entries with matches
|
|
review = reviewer.review_match(entry)
|
|
reviews.append(review)
|
|
print(f"✅ Reviewed {len(reviews)} matches")
|
|
|
|
reviewer.generate_report(reviews)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|