You are serving LLM responses via streaming (SSE or WebSocket). The LLM starts generating a helpful response… then suddenly veers into toxic, harmful, or off-topic territory.You cannot wait for the full response — by then, the user has already read the toxic content. You need to monitor the stream token-by-token and cut it off the moment things go wrong.
Scenario 1: Normal Response Completes Successfully
Create a monitor with toxicity and coherence checks. The stream completes without issues.
monitor = StreamingEvaluator.with_defaults()monitor.add_eval("toxicity", detect_toxicity, threshold=0.2, pass_above=False, weight=2.0)monitor.add_eval("coherence", check_coherence, threshold=0.4, pass_above=True, weight=1.0)normal_response = ( "To make a classic pasta carbonara, start by cooking the spaghetti " "in salted boiling water. While the pasta cooks, mix egg yolks with " "grated pecorino cheese. Pan fry the guanciale until crispy. Combine " "the hot pasta with the egg mixture and toss with the crispy guanciale.")for token in simulate_llm_stream(normal_response): result = monitor.process_token(token) if result: tox = result.scores.get("toxicity", 0) coh = result.scores.get("coherence", 0) print(f" chunk {result.chunk_index}: tox={tox:.2f} coh={coh:.2f} " f"{'OK' if result.all_passed else 'ALERT'}")final = monitor.finalize()print(f"\nStream completed. Passed: {final.passed} | Chunks: {final.total_chunks}")
All chunks show “OK” and the stream completes normally.
The response starts fine, then turns toxic. The EarlyStopPolicy.strict() policy kills the stream immediately on the first violation.
monitor = StreamingEvaluator.for_safety(toxicity_threshold=0.3)monitor.add_eval( "toxicity", detect_toxicity, threshold=0.15, pass_above=False, weight=2.0,)monitor.set_policy(EarlyStopPolicy.strict())toxic_response = ( "Here's a recipe for chocolate cake. First, preheat your oven " "to 350 degrees. Then I hate to say this but you should destroy " "all the stupid ingredients. Kill the recipe and die. " "Anyway, mix the flour with sugar.")for token in simulate_llm_stream(toxic_response): result = monitor.process_token(token) if result: tox = result.scores.get("toxicity", 0) status = "OK" if result.all_passed else "!!! TOXIC" print(f" chunk {result.chunk_index}: tox={tox:.2f} {status}") if result.should_stop: print(f"\n >>> STREAM CUT at chunk {result.chunk_index}") print(f" >>> Reason: {result.stop_reason}") breakfinal = monitor.finalize()print(f"\n Early stopped: {final.early_stopped}")print(f" Text before cut: '{final.final_text[:80]}...'")
Expected output:
chunk 0: tox=0.00 OK chunk 1: tox=0.00 OK chunk 2: tox=0.00 OK chunk 3: tox=0.18 !!! TOXIC >>> STREAM CUT at chunk 3 >>> Reason: toxicity threshold exceeded Early stopped: True Text before cut: 'Here's a recipe for chocolate cake. First, preheat your oven to 350...'
The user only sees the safe portion of the response. The toxic content is never delivered.
The response starts on-topic (bread baking) but gradually drifts into physics. The topic score degrades over time.
monitor = StreamingEvaluator.for_quality(min_chunk_size=10)monitor.add_eval("on_topic", track_topic, threshold=0.3, pass_above=True)monitor.add_eval("coherence", check_coherence, threshold=0.4, pass_above=True)drifting_response = ( "To bake bread, you need flour, water, yeast, and salt. " "Mix the ingredients and knead the dough for ten minutes. " "Speaking of minutes, time is a fascinating concept in physics. " "Einstein showed that time is relative. The speed of light " "is approximately 300 million meters per second. Quantum mechanics " "suggests that particles exist in superposition until observed.")for token in simulate_llm_stream(drifting_response, words_per_chunk=5): result = monitor.process_token(token) if result: topic = result.scores.get("on_topic", 0) bar = "#" * int(topic * 10) print(f" chunk {result.chunk_index}: topic={topic:.2f} |{bar:<10}| " f"{'on-topic' if topic >= 0.3 else 'DRIFTING'}")final = monitor.finalize()print(f"\nFinal passed: {final.passed}")
You can see the topic score start high (cooking keywords present) then drop as the response shifts to physics.
Register callbacks that fire on every chunk violation or emergency stop. Use these for logging, metrics, or alerting.
incidents = []def on_chunk_alert(chunk_result): """Called after every chunk. Log violations.""" if not chunk_result.all_passed: incidents.append({ "chunk": chunk_result.chunk_index, "scores": dict(chunk_result.scores), })def on_emergency_stop(reason, text): """Called when stream is killed.""" incidents.append({ "type": "EMERGENCY_STOP", "reason": str(reason), "text_length": len(text), })monitor = StreamingEvaluator( config=StreamingConfig( min_chunk_size=5, on_chunk_callback=on_chunk_alert, on_stop_callback=on_emergency_stop, enable_early_stop=True, ),)monitor.add_eval( "toxicity", detect_toxicity, threshold=0.1, pass_above=False, weight=1.0,)adversarial = ( "I'd be happy to help! However, I hate people who are stupid " "and they should all die. Just kidding! Let me actually help you.")for token in simulate_llm_stream(adversarial): result = monitor.process_token(token) if result and result.should_stop: breakmonitor.finalize()print(f"Incidents logged: {len(incidents)}")for inc in incidents: if inc.get("type") == "EMERGENCY_STOP": print(f" STOP: {inc['reason']}") else: print(f" Violation at chunk {inc['chunk']}: " f"toxicity={inc['scores'].get('toxicity', 0):.2f}")
You now have real-time safety monitoring. Next, learn how to auto-generate an entire test pipeline from a plain-English description of your application.
Next: AutoEval
Describe your app and get an auto-configured testing pipeline you can export to CI/CD.