Implementing Smart Search and Favorites in Python/Streamlit

Traditional search is transactional: the user types a query, the system returns matches. Predictive search is conversational: the system understands intent, remembers context, and surfaces content the user didn't know to ask for. This guide implements a production-ready predictive search system using Python and Streamlit, with a focus on media applications where content discovery is as important as content consumption.

Architecture: The Three-Layer Search System

┌─────────────────────────────────────────┐
│         Layer 1: Signal Collection        │
│  • Search queries & clicks              │
│  • Watch history & dwell time             │
│  • Explicit favorites & ratings           │
│  • Time-of-day & device context           │
└─────────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│       Layer 2: Intent Modeling            │
│  • Query embedding (sentence-transformers)│
│  • User embedding (behavior aggregation)  │
│  • Context embedding (time, device)       │
│  • Similarity scoring (cosine/FAISS)      │
└─────────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│        Layer 3: Result Assembly             │
│  • Multi-source fusion (TV/Radio/Music)   │
│  • Diversity injection (MMR algorithm)      │
│  • Real-time personalization              │
│  • Explanation generation                 │
└─────────────────────────────────────────┘

Implementation: Core Search Engine

# predictive_search.py
"""Production-ready predictive search for media applications."""

import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from sentence_transformers import SentenceTransformer
import faiss
from collections import defaultdict
import json
from pathlib import Path


@dataclass
class MediaItem:
    id: str
    title: str
    description: str
    category: str
    tags: List[str]
    duration: int
    type: str  # 'tv', 'radio', 'music', 'podcast'
    embedding: Optional[np.ndarray] = None


@dataclass
class SearchResult:
    item: MediaItem
    score: float
    match_type: str  # 'title', 'semantic', 'behavioral', 'trending'
    explanation: str


class PredictiveSearchEngine:
    """Hybrid search combining lexical, semantic, and behavioral signals."""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.encoder = SentenceTransformer(model_name)
        self.index = None
        self.items: Dict[str, MediaItem] = {}
        self.dimension = self.encoder.get_sentence_embedding_dimension()
        
        # User behavior storage
        self.user_history: Dict[str, Dict] = defaultdict(lambda: {
            'queries': [],
            'clicks': [],
            'dwell_times': {},
            'favorites': set(),
            'category_affinity': defaultdict(float)
        })
    
    def add_items(self, items: List[MediaItem]):
        """Index media items for search."""
        texts = [
            f"{item.title}. {item.description}. {' '.join(item.tags)}"
            for item in items
        ]
        
        embeddings = self.encoder.encode(texts, convert_to_numpy=True)
        
        # Build FAISS index
        self.index = faiss.IndexFlatIP(self.dimension)  # Inner product = cosine on normalized vectors
        
        # Normalize for cosine similarity
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings)
        
        # Store items
        for i, item in enumerate(items):
            item.embedding = embeddings[i]
            self.items[item.id] = item
    
    def search(self, query: str, user_id: Optional[str] = None,
               top_k: int = 10) -> List[SearchResult]:
        """Execute predictive search with personalization."""
        
        # 1. Semantic search
        semantic_results = self._semantic_search(query, top_k * 2)
        
        # 2. Behavioral boost (if user history exists)
        if user_id and user_id in self.user_history:
            behavioral_results = self._behavioral_search(user_id, top_k)
            semantic_results = self._fuse_results(semantic_results, behavioral_results)
        
        # 3. Query autocompletion / suggestion
        suggestions = self._generate_suggestions(query, user_id)
        
        # 4. Diversity re-ranking (MMR)
        final_results = self._mmr_rerank(semantic_results, top_k, lambda_param=0.5)
        
        # Add explanations
        for result in final_results:
            result.explanation = self._generate_explanation(result, user_id)
        
        # Log query for future learning
        if user_id:
            self.user_history[user_id]['queries'].append({
                'query': query,
                'timestamp': __import__('time').time(),
                'results': [r.item.id for r in final_results[:5]]
            })
        
        return final_results
    
    def _semantic_search(self, query: str, top_k: int) -> List[SearchResult]:
        """Vector similarity search using FAISS."""
        query_embedding = self.encoder.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(query_embedding)
        
        scores, indices = self.index.search(query_embedding, top_k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx == -1:
                continue
            
            item = list(self.items.values())[idx]
            results.append(SearchResult(
                item=item,
                score=float(score),
                match_type='semantic',
                explanation=''
            ))
        
        return results
    
    def _behavioral_search(self, user_id: str, top_k: int) -> List[SearchResult]:
        """Recommend based on user's implicit preferences."""
        history = self.user_history[user_id]
        
        if not history['clicks']:
            return []
        
        # Build user embedding from favorite items
        favorite_embeddings = []
        for item_id in history['favorites']:
            if item_id in self.items and self.items[item_id].embedding is not None:
                favorite_embeddings.append(self.items[item_id].embedding)
        
        if not favorite_embeddings:
            # Use recent clicks as proxy
            for click in history['clicks'][-10:]:
                if click['id'] in self.items:
                    emb = self.items[click['id']].embedding
                    if emb is not None:
                        weight = click.get('dwell_time', 60) / 60  # Normalize to minutes
                        favorite_embeddings.append(emb * weight)
        
        if not favorite_embeddings:
            return []
        
        user_embedding = np.mean(favorite_embeddings, axis=0)
        faiss.normalize_L2(user_embedding.reshape(1, -1))
        
        scores, indices = self.index.search(user_embedding.reshape(1, -1), top_k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx == -1:
                continue
            
            item = list(self.items.values())[idx]
            if item.id not in history['favorites']:  # Don't recommend already favorites
                results.append(SearchResult(
                    item=item,
                    score=float(score) * 0.9,  # Slightly lower weight than semantic
                    match_type='behavioral',
                    explanation=''
                ))
        
        return results
    
    def _fuse_results(self, semantic: List[SearchResult], 
                      behavioral: List[SearchResult]) -> List[SearchResult]:
        """Combine and deduplicate results from multiple sources."""
        fused = {}
        
        for r in semantic:
            fused[r.item.id] = r
        
        for r in behavioral:
            if r.item.id in fused:
                # Boost score for items in both lists
                fused[r.item.id].score = max(fused[r.item.id].score, r.score) * 1.1
                fused[r.item.id].match_type = 'hybrid'
            else:
                fused[r.item.id] = r
        
        return sorted(fused.values(), key=lambda x: x.score, reverse=True)
    
    def _mmr_rerank(self, results: List[SearchResult], top_k: int, 
                    lambda_param: float = 0.5) -> List[SearchResult]:
        """Maximal Marginal Relevance for diversity."""
        selected = []
        remaining = results.copy()
        
        while len(selected) < top_k and remaining:
            if not selected:
                # First item: highest relevance
                best = max(remaining, key=lambda r: r.score)
            else:
                # Subsequent items: balance relevance vs diversity
                best_mmr_score = -1
                best = None
                
                for candidate in remaining:
                    # Relevance component
                    relevance = candidate.score
                    
                    # Diversity component (max similarity to already selected)
                    max_sim = max(
                        self._embedding_similarity(candidate.item, s.item)
                        for s in selected
                    )
                    
                    mmr = lambda_param * relevance - (1 - lambda_param) * max_sim
                    
                    if mmr > best_mmr_score:
                        best_mmr_score = mmr
                        best = candidate
            
            selected.append(best)
            remaining.remove(best)
        
        return selected
    
    def _embedding_similarity(self, a: MediaItem, b: MediaItem) -> float:
        """Cosine similarity between two item embeddings."""
        if a.embedding is None or b.embedding is None:
            return 0.0
        return float(np.dot(a.embedding, b.embedding))
    
    def _generate_suggestions(self, query: str, user_id: Optional[str]) -> List[str]:
        """Generate query autocompletions based on popular queries and user history."""
        if not user_id or user_id not in self.user_history:
            return []
        
        # Find queries starting with same prefix
        prefix = query.lower()
        suggestions = []
        
        for past_query in self.user_history[user_id]['queries'][-50:]:
            q = past_query['query'].lower()
            if q.startswith(prefix) and q != prefix:
                suggestions.append(past_query['query'])
        
        # Return most frequent suggestions
        from collections import Counter
        return [q for q, _ in Counter(suggestions).most_common(5)]
    
    def _generate_explanation(self, result: SearchResult, user_id: Optional[str]) -> str:
        """Generate human-readable explanation for why this result appeared."""
        explanations = {
            'semantic': f"Matches your search for related content",
            'behavioral': f"Based on your interest in {result.item.category}",
            'hybrid': f"Matches your search and viewing preferences",
            'trending': f"Popular in {result.item.category} right now"
        }
        
        base = explanations.get(result.match_type, "Recommended for you")
        
        if user_id and result.item.id in self.user_history[user_id]['favorites']:
            base += " (in your favorites)"
        
        return base
    
    def record_click(self, user_id: str, item_id: str, dwell_time: int):
        """Record user interaction for behavioral learning."""
        self.user_history[user_id]['clicks'].append({
            'id': item_id,
            'timestamp': __import__('time').time(),
            'dwell_time': dwell_time
        })
        
        self.user_history[user_id]['dwell_times'][item_id] = dwell_time
        
        # Update category affinity
        if item_id in self.items:
            category = self.items[item_id].category
            self.user_history[user_id]['category_affinity'][category] += dwell_time
    
    def toggle_favorite(self, user_id: str, item_id: str) -> bool:
        """Add or remove item from user's favorites."""
        favorites = self.user_history[user_id]['favorites']
        
        if item_id in favorites:
            favorites.remove(item_id)
            return False
        else:
            favorites.add(item_id)
            return True
    
    def get_favorites(self, user_id: str) -> List[MediaItem]:
        """Get user's favorite items."""
        if user_id not in self.user_history:
            return []
        
        return [
            self.items[fid] for fid in self.user_history[user_id]['favorites']
            if fid in self.items
        ]
    
    def get_trending(self, category: Optional[str] = None, 
                     hours: int = 24) -> List[MediaItem]:
        """Get trending items based on recent aggregate activity."""
        cutoff = __import__('time').time() - (hours * 3600)
        
        item_scores = defaultdict(float)
        
        for user_history in self.user_history.values():
            for click in user_history['clicks']:
                if click['timestamp'] > cutoff:
                    item_id = click['id']
                    if category is None or (item_id in self.items and 
                                          self.items[item_id].category == category):
                        item_scores[item_id] += click.get('dwell_time', 60)
        
        sorted_items = sorted(item_scores.items(), key=lambda x: x[1], reverse=True)
        return [self.items[iid] for iid, _ in sorted_items[:20] if iid in self.items]
    
    def save_state(self, path: str):
        """Serialize index and user data."""
        data = {
            'items': {k: {
                'id': v.id,
                'title': v.title,
                'description': v.description,
                'category': v.category,
                'tags': v.tags,
                'duration': v.duration,
                'type': v.type
            } for k, v in self.items.items()},
            'user_history': {
                k: {
                    'queries': v['queries'],
                    'clicks': v['clicks'],
                    'favorites': list(v['favorites']),
                    'category_affinity': dict(v['category_affinity'])
                }
                for k, v in self.user_history.items()
            }
        }
        
        Path(path).write_text(json.dumps(data, indent=2))
        
        # Save FAISS index separately
        if self.index is not None:
            faiss.write_index(self.index, f"{path}.faiss")
    
    def load_state(self, path: str):
        """Restore from serialized state."""
        data = json.loads(Path(path).read_text())
        
        # Restore items
        items = []
        for item_data in data['items'].values():
            items.append(MediaItem(**item_data))
        
        self.add_items(items)
        
        # Restore user history
        for user_id, history in data['user_history'].items():
            self.user_history[user_id] = {
                'queries': history['queries'],
                'clicks': history['clicks'],
                'favorites': set(history['favorites']),
                'category_affinity': defaultdict(float, history['category_affinity'])
            }
        
        # Load FAISS index
        faiss_path = f"{path}.faiss"
        if Path(faiss_path).exists():
            self.index = faiss.read_index(faiss_path)

Streamlit UI Implementation

# search_ui.py
import streamlit as st
from predictive_search import PredictiveSearchEngine, MediaItem

# Initialize
@st.cache_resource
def get_engine():
    engine = PredictiveSearchEngine()
    
    # Load sample media catalog
    sample_items = [
        MediaItem("1", "BBC News at 10", "Evening news bulletin", "News", ["news", "uk"], 30, "tv"),
        MediaItem("2", "Planet Earth II", "Nature documentary series", "Documentary", ["nature", "bbc"], 60, "tv"),
        MediaItem("3", "Classic FM Relax", "Relaxing classical music", "Music", ["classical", "relax"], 180, "radio"),
        MediaItem("4", "Radio 4 Today", "Morning news program", "News", ["news", "radio"], 180, "radio"),
        MediaItem("5", "The Infinite Monkey Cage", "Science comedy panel show", "Science", ["science", "comedy"], 30, "podcast"),
    ]
    
    engine.add_items(sample_items)
    return engine

engine = get_engine()

# Sidebar: User profile
st.sidebar.title("Profile")
user_id = st.sidebar.text_input("User ID", value="user_001")

# Favorites
favorites = engine.get_favorites(user_id)
if favorites:
    st.sidebar.subheader("Favorites")
    for fav in favorites:
        st.sidebar.write(f"⭐ {fav.title}")

# Main interface
st.title("🔍 Predictive Media Search")

query = st.text_input("Search TV, radio, music, podcasts...", 
                      placeholder="Try: 'evening news' or 'relaxing music'")

if query:
    results = engine.search(query, user_id=user_id)
    
    # Results with explanations
    for result in results[:5]:
        col1, col2 = st.columns([3, 1])
        
        with col1:
            st.subheader(result.item.title)
            st.caption(f"{result.item.type.upper()} • {result.item.category} • {result.item.duration}min")
            st.write(result.item.description)
            st.info(f"🤖 {result.explanation}")
        
        with col2:
            is_fav = result.item.id in engine.user_history[user_id]['favorites']
            
            if st.button("⭐" if is_fav else "☆", key=f"fav_{result.item.id}"):
                engine.toggle_favorite(user_id, result.item.id)
                st.rerun()
            
            if st.button("▶ Play", key=f"play_{result.item.id}"):
                engine.record_click(user_id, result.item.id, dwell_time=0)
                st.success(f"Playing: {result.item.title}")

# Trending section
st.divider()
st.subheader("🔥 Trending Now")
trending = engine.get_trending(hours=24)

for item in trending[:5]:
    st.write(f"**{item.title}** — {item.category}")

# Query history
history = engine.user_history[user_id]['queries'][-10:]
if history:
    st.divider()
    st.subheader("Recent Searches")
    for h in history:
        st.caption(f"'{h['query']}' — {len(h['results'])} results")

The Bottom Line

Predictive search transforms media discovery from a pull model (user asks, system responds) to a push model (system anticipates, user confirms). The key insight is that search isn't about finding—it's about understanding. When your system genuinely understands what the user wants, it stops being a tool and starts being a companion.