#!/usr/bin/env python3 """ Critical Match Quality Reviewer Analyzes verification results with deep understanding of: - Twitter/Telegram/crypto culture - Common false positive patterns - Company vs personal account indicators - Context alignment """ import re import json from pathlib import Path from typing import List, Dict, Tuple LOG_FILE = Path(__file__).parent.parent / 'verification_v2_log.txt' class MatchReviewer: """Critical reviewer with crypto/web3 domain knowledge""" # Company/product/team indicators COMPANY_INDICATORS = [ r'\bis\s+(a|an)\s+\w+\s+(way|tool|platform|app|service|protocol)', r'official\s+(account|page|channel)', r'(team|official)\s*$', r'^(the|a)\s+\w+\s+(for|to)', r'brought to you by', r'hosted by', r'founded by', r'(community|project)\s+(account|page)', r'(dao|protocol|network)\s*$', r'building\s+(the|a)\s+', ] # Personal account indicators PERSONAL_INDICATORS = [ r'(ceo|founder|co-founder|developer|builder|engineer|researcher)\s+at', r'working\s+(at|on|with)', r'^(i|i\'m|my)', r'(my|personal)\s+(views|opinions|thoughts)', r'\b(he/him|she/her|they/them)\b', ] # Crypto/Web3 keywords CRYPTO_KEYWORDS = [ 'web3', 'crypto', 'blockchain', 'defi', 'nft', 'dao', 'dapp', 'ethereum', 'solana', 'bitcoin', 'polygon', 'base', 'arbitrum', 'smart contract', 'token', 'wallet', 'metamask', 'coinbase', 'yield', 'farming', 'staking', 'airdrop', 'whitelist', 'mint', 'protocol', 'l1', 'l2', 'rollup', 'zk', 'evm' ] def __init__(self): self.issues = [] self.stats = { 'total_matches': 0, 'false_positives': 0, 'questionable': 0, 'good_matches': 0, 'company_accounts': 0, 'weak_evidence': 0, 'context_mismatch': 0, } def parse_log(self) -> List[Dict]: """Parse the verification log into structured data""" with open(LOG_FILE, 'r') as f: content = f.read() entries = [] # Find all sections that start with TELEGRAM USER pattern = r'TELEGRAM USER: ([^\(]+) \(ID: (\d+)\)(.*?)(?=TELEGRAM USER:|$)' matches = re.finditer(pattern, content, re.DOTALL) for match in matches: tg_username = match.group(1).strip() tg_id = int(match.group(2)) section = match.group(3) # Extract TG profile tg_bio_match = re.search(r'Bio: (.*?)(?:\n+TWITTER CANDIDATES)', section, re.DOTALL) tg_bio = tg_bio_match.group(1).strip() if tg_bio_match else '' # Extract username specificity spec_match = re.search(r'username has specificity score ([\d.]+)', section) specificity = float(spec_match.group(1)) if spec_match else 0.5 # Extract candidates candidates = [] candidate_blocks = re.findall( r'\[Candidate (\d+)\](.*?)(?=\[Candidate \d+\]|LLM RESPONSE:)', section, re.DOTALL ) for idx, block in candidate_blocks: tw_username = re.search(r'Twitter Username: @(\S+)', block) tw_name = re.search(r'Twitter Display Name: (.+)', block) tw_bio = re.search(r'Twitter Bio: (.*?)(?=\nLocation:)', block, re.DOTALL) tw_followers = re.search(r'Followers: ([\d,]+)', block) match_method = re.search(r'Match Method: (\S+)', block) baseline_conf = re.search(r'Baseline Confidence: ([\d.]+)', block) candidates.append({ 'index': int(idx), 'twitter_username': tw_username.group(1) if tw_username else '', 'twitter_name': tw_name.group(1).strip() if tw_name else '', 'twitter_bio': tw_bio.group(1).strip() if tw_bio else '', 'twitter_followers': tw_followers.group(1) if tw_followers else '0', 'match_method': match_method.group(1) if match_method else '', 'baseline_confidence': float(baseline_conf.group(1)) if baseline_conf else 0.0, }) # Extract LLM response (handle multiline JSON with nested structures) llm_match = re.search(r'LLM RESPONSE:\s*-+\s*(\{.*)', section, re.DOTALL) if llm_match: try: # Extract JSON - it should be everything after "LLM RESPONSE:" until end of section json_text = llm_match.group(1) # Find the JSON object (balanced braces) brace_count = 0 json_end = 0 for i, char in enumerate(json_text): if char == '{': brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0: json_end = i + 1 break if json_end > 0: json_str = json_text[:json_end] llm_result = json.loads(json_str) else: llm_result = {'candidates': []} except Exception as e: llm_result = {'candidates': []} else: llm_result = {'candidates': []} entries.append({ 'telegram_username': tg_username, 'telegram_id': tg_id, 'telegram_bio': tg_bio, 'username_specificity': specificity, 'candidates': candidates, 'llm_results': llm_result.get('candidates', []) }) return entries def is_company_account(self, bio: str, name: str) -> Tuple[bool, str]: """Detect if this is a company/product/team account""" text = (bio + ' ' + name).lower() for pattern in self.COMPANY_INDICATORS: if re.search(pattern, text, re.IGNORECASE): return True, f"Company pattern: '{pattern}'" # Check if name equals bio description if bio and len(bio.split()) < 20: # Short bio describing what something "is" if re.search(r'\bis\s+(a|an|the)\s+', bio, re.IGNORECASE): return True, "Bio describes a product/service" return False, "" def is_personal_account(self, bio: str) -> bool: """Detect personal account indicators""" for pattern in self.PERSONAL_INDICATORS: if re.search(pattern, bio, re.IGNORECASE): return True return False def has_crypto_context(self, bio: str) -> Tuple[bool, List[str]]: """Check if bio has crypto/web3 context""" if not bio: return False, [] bio_lower = bio.lower() found_keywords = [] for keyword in self.CRYPTO_KEYWORDS: if keyword in bio_lower: found_keywords.append(keyword) return len(found_keywords) > 0, found_keywords def review_match(self, entry: Dict) -> Dict: """Critically review a single match""" issues = [] severity = 'GOOD' tg_username = entry['telegram_username'] tg_bio = entry['telegram_bio'] tg_has_crypto, tg_crypto_keywords = self.has_crypto_context(tg_bio) # Review each LLM-approved match (confidence >= 0.5) for llm_result in entry['llm_results']: confidence = llm_result.get('confidence', 0) if confidence < 0.5: continue self.stats['total_matches'] += 1 candidate_idx = llm_result.get('candidate_index', 0) - 1 if candidate_idx < 0 or candidate_idx >= len(entry['candidates']): continue candidate = entry['candidates'][candidate_idx] tw_username = candidate['twitter_username'] tw_bio = candidate['twitter_bio'] tw_name = candidate['twitter_name'] match_method = candidate['match_method'] # Check 1: Company account is_company, company_reason = self.is_company_account(tw_bio, tw_name) if is_company: issues.append({ 'type': 'COMPANY_ACCOUNT', 'severity': 'HIGH', 'description': f"Twitter @{tw_username} appears to be a company/product account", 'evidence': company_reason, 'confidence': confidence }) self.stats['company_accounts'] += 1 severity = 'FALSE_POSITIVE' # Check 2: Context mismatch tw_has_crypto, tw_crypto_keywords = self.has_crypto_context(tw_bio) if tg_has_crypto and not tw_has_crypto: issues.append({ 'type': 'CONTEXT_MISMATCH', 'severity': 'MEDIUM', 'description': f"TG has crypto context but TW doesn't", 'evidence': f"TG keywords: {tg_crypto_keywords}, TW keywords: none", 'confidence': confidence }) self.stats['context_mismatch'] += 1 if severity == 'GOOD': severity = 'QUESTIONABLE' # Check 3: Empty bio with no strong evidence if not tg_bio and not tw_bio and confidence > 0.8: issues.append({ 'type': 'WEAK_EVIDENCE', 'severity': 'MEDIUM', 'description': f"High confidence ({confidence}) with both bios empty", 'evidence': f"Only username match, no contextual verification", 'confidence': confidence }) self.stats['weak_evidence'] += 1 if severity == 'GOOD': severity = 'QUESTIONABLE' # Check 4: Generic username with high confidence if entry['username_specificity'] < 0.6 and confidence > 0.85: issues.append({ 'type': 'GENERIC_USERNAME', 'severity': 'LOW', 'description': f"Generic username ({entry['username_specificity']:.2f} specificity) with high confidence", 'evidence': f"Username: {tg_username}", 'confidence': confidence }) if severity == 'GOOD': severity = 'QUESTIONABLE' # Check 5: Twitter bio mentions other accounts if match_method == 'twitter_bio_has_telegram': # Check if the telegram username appears as @mention (not the account itself) mentions = re.findall(r'@(\w+)', tw_bio) if tg_username.lower() not in [m.lower() for m in mentions]: # The username is embedded in another handle issues.append({ 'type': 'SUBSTRING_MATCH', 'severity': 'HIGH', 'description': f"TG username found as substring in other accounts, not direct mention", 'evidence': f"TW bio: {tw_bio[:100]}", 'confidence': confidence }) self.stats['false_positives'] += 1 severity = 'FALSE_POSITIVE' # Count severity if severity == 'FALSE_POSITIVE': self.stats['false_positives'] += 1 elif severity == 'QUESTIONABLE': self.stats['questionable'] += 1 else: self.stats['good_matches'] += 1 return { 'telegram_username': tg_username, 'telegram_id': entry['telegram_id'], 'severity': severity, 'issues': issues, 'entry': entry } def generate_report(self, reviews: List[Dict]): """Generate comprehensive review report""" print() print("=" * 100) print("🔍 MATCH QUALITY REVIEW REPORT") print("=" * 100) print() print("📊 STATISTICS:") print(f" Total matches reviewed: {self.stats['total_matches']}") print(f" ✅ Good matches: {self.stats['good_matches']} ({self.stats['good_matches']/max(self.stats['total_matches'],1)*100:.1f}%)") print(f" ⚠️ Questionable: {self.stats['questionable']} ({self.stats['questionable']/max(self.stats['total_matches'],1)*100:.1f}%)") print(f" ❌ False positives: {self.stats['false_positives']} ({self.stats['false_positives']/max(self.stats['total_matches'],1)*100:.1f}%)") print() print("🚨 ISSUE BREAKDOWN:") print(f" Company accounts: {self.stats['company_accounts']}") print(f" Context mismatches: {self.stats['context_mismatch']}") print(f" Weak evidence: {self.stats['weak_evidence']}") print() # Show false positives false_positives = [r for r in reviews if r['severity'] == 'FALSE_POSITIVE'] if false_positives: print("=" * 100) print("❌ FALSE POSITIVES:") print("=" * 100) for review in false_positives[:10]: # Show top 10 print() print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})") print(f"TG Bio: {review['entry']['telegram_bio'][:100]}") for issue in review['issues']: print(f" ❌ [{issue['severity']}] {issue['type']}: {issue['description']}") print(f" Evidence: {issue['evidence'][:150]}") print(f" LLM Confidence: {issue['confidence']:.2f}") # Show questionable matches questionable = [r for r in reviews if r['severity'] == 'QUESTIONABLE'] if questionable: print() print("=" * 100) print("⚠️ QUESTIONABLE MATCHES:") print("=" * 100) for review in questionable[:10]: # Show top 10 print() print(f"TG @{review['telegram_username']} (ID: {review['telegram_id']})") for issue in review['issues']: print(f" ⚠️ [{issue['severity']}] {issue['type']}: {issue['description']}") print(f" Evidence: {issue['evidence'][:150]}") print(f" LLM Confidence: {issue['confidence']:.2f}") print() print("=" * 100) print("💡 RECOMMENDATIONS:") print("=" * 100) print() if self.stats['company_accounts'] > 0: print("1. Add company account detection to prompt:") print(" - Check for product descriptions ('X is a platform for...')") print(" - Look for 'official', 'team', 'hosted by' patterns") print(" - Distinguish personal vs organizational accounts") print() if self.stats['context_mismatch'] > 0: print("2. Strengthen context matching:") print(" - Require crypto/web3 keywords in both profiles") print(" - Lower confidence when contexts don't align") print() if self.stats['weak_evidence'] > 0: print("3. Adjust confidence for weak evidence:") print(" - Cap confidence at 0.70 when both bios are empty") print(" - Require additional signals beyond username match") print() print("4. Fix 'twitter_bio_has_telegram' method:") print(" - Only match direct @mentions, not substrings in other handles") print(" - Example: @hipster should NOT match mentions of @HipsterHacker") print() def main(): reviewer = MatchReviewer() print("📖 Parsing verification log...") entries = reviewer.parse_log() print(f"✅ Parsed {len(entries)} verification entries") print() print("🔍 Reviewing match quality...") reviews = [] for entry in entries: if entry['llm_results']: # Only review entries with matches review = reviewer.review_match(entry) reviews.append(review) print(f"✅ Reviewed {len(reviews)} matches") reviewer.generate_report(reviews) if __name__ == "__main__": main()