Traditional search is transactional: the user types a query, the system returns matches. Predictive search is conversational: the system understands intent, remembers context, and surfaces content the user didn't know to ask for. This guide implements a production-ready predictive search system using Python and Streamlit, with a focus on media applications where content discovery is as important as content consumption.
Architecture: The Three-Layer Search System
┌─────────────────────────────────────────┐
│ Layer 1: Signal Collection │
│ • Search queries & clicks │
│ • Watch history & dwell time │
│ • Explicit favorites & ratings │
│ • Time-of-day & device context │
└─────────────────────────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Layer 2: Intent Modeling │
│ • Query embedding (sentence-transformers)│
│ • User embedding (behavior aggregation) │
│ • Context embedding (time, device) │
│ • Similarity scoring (cosine/FAISS) │
└─────────────────────────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Layer 3: Result Assembly │
│ • Multi-source fusion (TV/Radio/Music) │
│ • Diversity injection (MMR algorithm) │
│ • Real-time personalization │
│ • Explanation generation │
└─────────────────────────────────────────┘
Implementation: Core Search Engine
# predictive_search.py
"""Production-ready predictive search for media applications."""
import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from sentence_transformers import SentenceTransformer
import faiss
from collections import defaultdict
import json
from pathlib import Path
@dataclass
class MediaItem:
id: str
title: str
description: str
category: str
tags: List[str]
duration: int
type: str # 'tv', 'radio', 'music', 'podcast'
embedding: Optional[np.ndarray] = None
@dataclass
class SearchResult:
item: MediaItem
score: float
match_type: str # 'title', 'semantic', 'behavioral', 'trending'
explanation: str
class PredictiveSearchEngine:
"""Hybrid search combining lexical, semantic, and behavioral signals."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.encoder = SentenceTransformer(model_name)
self.index = None
self.items: Dict[str, MediaItem] = {}
self.dimension = self.encoder.get_sentence_embedding_dimension()
# User behavior storage
self.user_history: Dict[str, Dict] = defaultdict(lambda: {
'queries': [],
'clicks': [],
'dwell_times': {},
'favorites': set(),
'category_affinity': defaultdict(float)
})
def add_items(self, items: List[MediaItem]):
"""Index media items for search."""
texts = [
f"{item.title}. {item.description}. {' '.join(item.tags)}"
for item in items
]
embeddings = self.encoder.encode(texts, convert_to_numpy=True)
# Build FAISS index
self.index = faiss.IndexFlatIP(self.dimension) # Inner product = cosine on normalized vectors
# Normalize for cosine similarity
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
# Store items
for i, item in enumerate(items):
item.embedding = embeddings[i]
self.items[item.id] = item
def search(self, query: str, user_id: Optional[str] = None,
top_k: int = 10) -> List[SearchResult]:
"""Execute predictive search with personalization."""
# 1. Semantic search
semantic_results = self._semantic_search(query, top_k * 2)
# 2. Behavioral boost (if user history exists)
if user_id and user_id in self.user_history:
behavioral_results = self._behavioral_search(user_id, top_k)
semantic_results = self._fuse_results(semantic_results, behavioral_results)
# 3. Query autocompletion / suggestion
suggestions = self._generate_suggestions(query, user_id)
# 4. Diversity re-ranking (MMR)
final_results = self._mmr_rerank(semantic_results, top_k, lambda_param=0.5)
# Add explanations
for result in final_results:
result.explanation = self._generate_explanation(result, user_id)
# Log query for future learning
if user_id:
self.user_history[user_id]['queries'].append({
'query': query,
'timestamp': __import__('time').time(),
'results': [r.item.id for r in final_results[:5]]
})
return final_results
def _semantic_search(self, query: str, top_k: int) -> List[SearchResult]:
"""Vector similarity search using FAISS."""
query_embedding = self.encoder.encode([query], convert_to_numpy=True)
faiss.normalize_L2(query_embedding)
scores, indices = self.index.search(query_embedding, top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
item = list(self.items.values())[idx]
results.append(SearchResult(
item=item,
score=float(score),
match_type='semantic',
explanation=''
))
return results
def _behavioral_search(self, user_id: str, top_k: int) -> List[SearchResult]:
"""Recommend based on user's implicit preferences."""
history = self.user_history[user_id]
if not history['clicks']:
return []
# Build user embedding from favorite items
favorite_embeddings = []
for item_id in history['favorites']:
if item_id in self.items and self.items[item_id].embedding is not None:
favorite_embeddings.append(self.items[item_id].embedding)
if not favorite_embeddings:
# Use recent clicks as proxy
for click in history['clicks'][-10:]:
if click['id'] in self.items:
emb = self.items[click['id']].embedding
if emb is not None:
weight = click.get('dwell_time', 60) / 60 # Normalize to minutes
favorite_embeddings.append(emb * weight)
if not favorite_embeddings:
return []
user_embedding = np.mean(favorite_embeddings, axis=0)
faiss.normalize_L2(user_embedding.reshape(1, -1))
scores, indices = self.index.search(user_embedding.reshape(1, -1), top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
item = list(self.items.values())[idx]
if item.id not in history['favorites']: # Don't recommend already favorites
results.append(SearchResult(
item=item,
score=float(score) * 0.9, # Slightly lower weight than semantic
match_type='behavioral',
explanation=''
))
return results
def _fuse_results(self, semantic: List[SearchResult],
behavioral: List[SearchResult]) -> List[SearchResult]:
"""Combine and deduplicate results from multiple sources."""
fused = {}
for r in semantic:
fused[r.item.id] = r
for r in behavioral:
if r.item.id in fused:
# Boost score for items in both lists
fused[r.item.id].score = max(fused[r.item.id].score, r.score) * 1.1
fused[r.item.id].match_type = 'hybrid'
else:
fused[r.item.id] = r
return sorted(fused.values(), key=lambda x: x.score, reverse=True)
def _mmr_rerank(self, results: List[SearchResult], top_k: int,
lambda_param: float = 0.5) -> List[SearchResult]:
"""Maximal Marginal Relevance for diversity."""
selected = []
remaining = results.copy()
while len(selected) < top_k and remaining:
if not selected:
# First item: highest relevance
best = max(remaining, key=lambda r: r.score)
else:
# Subsequent items: balance relevance vs diversity
best_mmr_score = -1
best = None
for candidate in remaining:
# Relevance component
relevance = candidate.score
# Diversity component (max similarity to already selected)
max_sim = max(
self._embedding_similarity(candidate.item, s.item)
for s in selected
)
mmr = lambda_param * relevance - (1 - lambda_param) * max_sim
if mmr > best_mmr_score:
best_mmr_score = mmr
best = candidate
selected.append(best)
remaining.remove(best)
return selected
def _embedding_similarity(self, a: MediaItem, b: MediaItem) -> float:
"""Cosine similarity between two item embeddings."""
if a.embedding is None or b.embedding is None:
return 0.0
return float(np.dot(a.embedding, b.embedding))
def _generate_suggestions(self, query: str, user_id: Optional[str]) -> List[str]:
"""Generate query autocompletions based on popular queries and user history."""
if not user_id or user_id not in self.user_history:
return []
# Find queries starting with same prefix
prefix = query.lower()
suggestions = []
for past_query in self.user_history[user_id]['queries'][-50:]:
q = past_query['query'].lower()
if q.startswith(prefix) and q != prefix:
suggestions.append(past_query['query'])
# Return most frequent suggestions
from collections import Counter
return [q for q, _ in Counter(suggestions).most_common(5)]
def _generate_explanation(self, result: SearchResult, user_id: Optional[str]) -> str:
"""Generate human-readable explanation for why this result appeared."""
explanations = {
'semantic': f"Matches your search for related content",
'behavioral': f"Based on your interest in {result.item.category}",
'hybrid': f"Matches your search and viewing preferences",
'trending': f"Popular in {result.item.category} right now"
}
base = explanations.get(result.match_type, "Recommended for you")
if user_id and result.item.id in self.user_history[user_id]['favorites']:
base += " (in your favorites)"
return base
def record_click(self, user_id: str, item_id: str, dwell_time: int):
"""Record user interaction for behavioral learning."""
self.user_history[user_id]['clicks'].append({
'id': item_id,
'timestamp': __import__('time').time(),
'dwell_time': dwell_time
})
self.user_history[user_id]['dwell_times'][item_id] = dwell_time
# Update category affinity
if item_id in self.items:
category = self.items[item_id].category
self.user_history[user_id]['category_affinity'][category] += dwell_time
def toggle_favorite(self, user_id: str, item_id: str) -> bool:
"""Add or remove item from user's favorites."""
favorites = self.user_history[user_id]['favorites']
if item_id in favorites:
favorites.remove(item_id)
return False
else:
favorites.add(item_id)
return True
def get_favorites(self, user_id: str) -> List[MediaItem]:
"""Get user's favorite items."""
if user_id not in self.user_history:
return []
return [
self.items[fid] for fid in self.user_history[user_id]['favorites']
if fid in self.items
]
def get_trending(self, category: Optional[str] = None,
hours: int = 24) -> List[MediaItem]:
"""Get trending items based on recent aggregate activity."""
cutoff = __import__('time').time() - (hours * 3600)
item_scores = defaultdict(float)
for user_history in self.user_history.values():
for click in user_history['clicks']:
if click['timestamp'] > cutoff:
item_id = click['id']
if category is None or (item_id in self.items and
self.items[item_id].category == category):
item_scores[item_id] += click.get('dwell_time', 60)
sorted_items = sorted(item_scores.items(), key=lambda x: x[1], reverse=True)
return [self.items[iid] for iid, _ in sorted_items[:20] if iid in self.items]
def save_state(self, path: str):
"""Serialize index and user data."""
data = {
'items': {k: {
'id': v.id,
'title': v.title,
'description': v.description,
'category': v.category,
'tags': v.tags,
'duration': v.duration,
'type': v.type
} for k, v in self.items.items()},
'user_history': {
k: {
'queries': v['queries'],
'clicks': v['clicks'],
'favorites': list(v['favorites']),
'category_affinity': dict(v['category_affinity'])
}
for k, v in self.user_history.items()
}
}
Path(path).write_text(json.dumps(data, indent=2))
# Save FAISS index separately
if self.index is not None:
faiss.write_index(self.index, f"{path}.faiss")
def load_state(self, path: str):
"""Restore from serialized state."""
data = json.loads(Path(path).read_text())
# Restore items
items = []
for item_data in data['items'].values():
items.append(MediaItem(**item_data))
self.add_items(items)
# Restore user history
for user_id, history in data['user_history'].items():
self.user_history[user_id] = {
'queries': history['queries'],
'clicks': history['clicks'],
'favorites': set(history['favorites']),
'category_affinity': defaultdict(float, history['category_affinity'])
}
# Load FAISS index
faiss_path = f"{path}.faiss"
if Path(faiss_path).exists():
self.index = faiss.read_index(faiss_path)
Streamlit UI Implementation
# search_ui.py
import streamlit as st
from predictive_search import PredictiveSearchEngine, MediaItem
# Initialize
@st.cache_resource
def get_engine():
engine = PredictiveSearchEngine()
# Load sample media catalog
sample_items = [
MediaItem("1", "BBC News at 10", "Evening news bulletin", "News", ["news", "uk"], 30, "tv"),
MediaItem("2", "Planet Earth II", "Nature documentary series", "Documentary", ["nature", "bbc"], 60, "tv"),
MediaItem("3", "Classic FM Relax", "Relaxing classical music", "Music", ["classical", "relax"], 180, "radio"),
MediaItem("4", "Radio 4 Today", "Morning news program", "News", ["news", "radio"], 180, "radio"),
MediaItem("5", "The Infinite Monkey Cage", "Science comedy panel show", "Science", ["science", "comedy"], 30, "podcast"),
]
engine.add_items(sample_items)
return engine
engine = get_engine()
# Sidebar: User profile
st.sidebar.title("Profile")
user_id = st.sidebar.text_input("User ID", value="user_001")
# Favorites
favorites = engine.get_favorites(user_id)
if favorites:
st.sidebar.subheader("Favorites")
for fav in favorites:
st.sidebar.write(f"⭐ {fav.title}")
# Main interface
st.title("🔍 Predictive Media Search")
query = st.text_input("Search TV, radio, music, podcasts...",
placeholder="Try: 'evening news' or 'relaxing music'")
if query:
results = engine.search(query, user_id=user_id)
# Results with explanations
for result in results[:5]:
col1, col2 = st.columns([3, 1])
with col1:
st.subheader(result.item.title)
st.caption(f"{result.item.type.upper()} • {result.item.category} • {result.item.duration}min")
st.write(result.item.description)
st.info(f"🤖 {result.explanation}")
with col2:
is_fav = result.item.id in engine.user_history[user_id]['favorites']
if st.button("⭐" if is_fav else "☆", key=f"fav_{result.item.id}"):
engine.toggle_favorite(user_id, result.item.id)
st.rerun()
if st.button("▶ Play", key=f"play_{result.item.id}"):
engine.record_click(user_id, result.item.id, dwell_time=0)
st.success(f"Playing: {result.item.title}")
# Trending section
st.divider()
st.subheader("🔥 Trending Now")
trending = engine.get_trending(hours=24)
for item in trending[:5]:
st.write(f"**{item.title}** — {item.category}")
# Query history
history = engine.user_history[user_id]['queries'][-10:]
if history:
st.divider()
st.subheader("Recent Searches")
for h in history:
st.caption(f"'{h['query']}' — {len(h['results'])} results")
The Bottom Line
Predictive search transforms media discovery from a pull model (user asks, system responds) to a push model (system anticipates, user confirms). The key insight is that search isn't about finding—it's about understanding. When your system genuinely understands what the user wants, it stops being a tool and starts being a companion.