#!/usr/bin/env python3 """Voice Loop — a minimal on-device voice agent. CUDA / Linux. Gemma 4 handles audio input directly via llama-server (mmproj). Kokoro FastAPI speaks the response. WebRTC AEC3 enables voice interrupt. Usage: uv run voice_loop_cuda_audio.py # defaults uv run voice_loop_cuda_audio.py --no-tts # text out only uv run voice_loop_cuda_audio.py --no-aec # keypress interrupt only uv run voice_loop_cuda_audio.py --chime-loop # chime + ticks while generating """ import argparse import asyncio import os import queue import re import sys import tempfile import termios import threading import time as _time import tty import wave import base64 from concurrent.futures import ThreadPoolExecutor from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError import numpy as np import sounddevice as sd import pyaudio import json # Larger audio buffer via 'high' latency sd.default.latency = 'high' def list_audio_devices(): """List available audio devices for debugging.""" devices = sd.query_devices() print("\n=== Available Audio Devices ===") for i, d in enumerate(devices): input_ch = d['max_input_channels'] output_ch = d['max_output_channels'] if input_ch > 0 or output_ch > 0: flags = [] if input_ch > 0: flags.append("INPUT") if output_ch > 0: flags.append("OUTPUT") print(f" [{i}] {d['name']} ({', '.join(flags)})") default_in = sd.query_devices(kind='input') default_out = sd.query_devices(kind='output') print(f"\nDefault input device: {sd.default.device[0]} - {default_in['name'] if default_in else 'None'}") print(f"Default output device: {sd.default.device[1]} - {default_out['name'] if default_out else 'None'}") print("================================\n") SAMPLE_RATE = 16000 CHUNK_SAMPLES = 512 MAX_HISTORY = 10 CHIME_SR = 24000 _DIR = Path(__file__).parent _SENT_END = re.compile(r'(?<=[.!?])\s+') _SENT_MIN_CHARS = 20 _GAP_BLANK_SAMPLES = int(0.15 * 16000) def _split_sentences(text: str) -> list[str]: parts, carry = [], "" for p in _SENT_END.split(text.strip()): p = p.strip() if not p: continue carry = f"{carry} {p}".strip() if carry else p if len(carry) >= _SENT_MIN_CHARS: parts.append(carry) carry = "" if carry: parts.append(carry) return parts def load_system_prompt(include_memory: bool = False) -> str: names = ("SOUL.md", "MEMORY.md") if include_memory else ("SOUL.md",) parts = [(_DIR / n).read_text().strip() for n in names if (_DIR / n).exists()] return "\n\n".join(p for p in parts if p) def _fade_tone(freq, dur, amp=0.6): n = int(dur * CHIME_SR) t = np.linspace(0, dur, n, dtype=np.float32) env = 0.5 * (1 - np.cos(2 * np.pi * np.arange(n) / max(1, n - 1))) return amp * np.sin(2 * np.pi * freq * t) * env def _silence(dur): return np.zeros(int(dur * CHIME_SR), dtype=np.float32) def make_chime(duration=30.0, tick_every=1.5): head = np.concatenate([_fade_tone(880, 0.09), _silence(0.03), _fade_tone(1320, 0.10)]) tick = _fade_tone(550, 0.04, amp=0.18) total = int(duration * CHIME_SR) buf = np.zeros(total, dtype=np.float32) buf[:len(head)] = head step = int(tick_every * CHIME_SR) for pos in range(len(head), total, step): end = min(pos + len(tick), total) buf[pos:end] = tick[:end - pos] return buf def _lang_from_voice(v: str) -> str: prefix = v[:1] if len(v) > 1 and v[1] == '_' else '' return { 'a': 'en-us', 'b': 'en-gb', 'e': 'es', 'f': 'fr-fr', 'h': 'hi', 'i': 'it', 'j': 'ja', 'p': 'pt-br', 'z': 'cmn', }.get(prefix, 'en-us') def save_wav(audio, sr=SAMPLE_RATE): path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name with wave.open(path, "wb") as wf: wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(sr) wf.writeframes((audio * 32767).clip(-32768, 32767).astype(np.int16).tobytes()) return path def load_smart_turn(): import onnxruntime as ort from transformers import WhisperFeatureExtractor model_path = os.path.join(tempfile.gettempdir(), "smart_turn_v3", "smart_turn_v3.2_cpu.onnx") if not os.path.exists(model_path): print("Downloading Smart Turn v3.2 model...", flush=True) os.makedirs(os.path.dirname(model_path), exist_ok=True) import urllib.request urllib.request.urlretrieve( "https://huggingface.co/pipecat-ai/smart-turn-v3/resolve/main/smart-turn_v3.2-cpu.onnx", model_path) session = ort.InferenceSession(model_path, providers=["CPUExecutionProvider"]) extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-tiny") def predict(audio_float32: np.ndarray) -> float: max_samples = 8 * SAMPLE_RATE audio_float32 = audio_float32[-max_samples:] features = extractor( audio_float32, sampling_rate=SAMPLE_RATE, max_length=max_samples, padding="max_length", return_attention_mask=False, return_tensors="np", ) return float(session.run(None, {"input_features": features.input_features.astype(np.float32)})[0].flatten()[0]) return predict def _vad_prob(vad, chunk): p = vad(torch.from_numpy(chunk), SAMPLE_RATE) return p.item() if hasattr(p, "item") else p def _get_ref_segment(tts_concat, pos, length): if pos >= len(tts_concat): return np.zeros(length, dtype=np.float32) seg = tts_concat[pos:pos + length] return np.concatenate([seg, np.zeros(length - len(seg), dtype=np.float32)]) if len(seg) < length else seg # ============================================================================ # Llama-server API Integration (Multimodal Audio) # ============================================================================ class LlamaServerClient: """Client for llama-server with multimodal audio support.""" def __init__(self, base_url="http://localhost:8080"): self.base_url = f"{base_url}/v1" self._model = None def _call_chat(self, messages, max_tokens=200, temperature=0.7, audio_data=None): """Call llama-server chat completions with optional audio.""" # llama-server expects a specific list of content objects content = [] if audio_data is not None: content.append({ "type": "input_audio", "input_audio": { "data": audio_data, "format": "wav" } }) last_msg = messages[-1] if last_msg["role"] == "user": user_content = last_msg["content"] if isinstance(user_content, str): content.append({"type": "text", "text": user_content}) else: for item in user_content: if item.get("type") == "text": content.append(item) if audio_data and not any(c.get("type") == "input_audio" for c in content): content.append({ "type": "input_audio", "input_audio": { "data": audio_data, "format": "wav" } }) messages[-1]["content"] = content data = { "model": self._model or "gemma-4-12b-it-UD-Q8_K_XL.gguf", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False, "backend_sampling": False, "reasoning_control": True, "return_progress": True, } req = Request( f"{self.base_url}/chat/completions", data=json.dumps(data).encode('utf-8'), headers={"Content-Type": "application/json"}, method='POST' ) try: with urlopen(req, timeout=120) as resp: result = json.loads(resp.read().decode('utf-8')) return result['choices'][0]['message']['content'] except URLError as e: raise RuntimeError(f"Llama-server API error: {e}") def _call_chat_stream(self, messages, max_tokens=200, temperature=0.7, audio_data=None): """Call llama-server chat completions with streaming and optional audio.""" content = [] if audio_data is not None: content.append({ "type": "input_audio", "input_audio": { "data": audio_data, "format": "wav" } }) last_msg = messages[-1] if last_msg["role"] == "user": user_content = last_msg["content"] if isinstance(user_content, str): content.append({"type": "text", "text": user_content}) else: for item in user_content: if item.get("type") == "text": content.append(item) if audio_data and not any(c.get("type") == "input_audio" for c in content): content.append({ "type": "input_audio", "input_audio": { "data": audio_data, "format": "wav" } }) messages[-1]["content"] = content data = { "model": self._model or "gemma-4-12b-it-UD-Q8_K_XL.gguf", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": True, "backend_sampling": False, "reasoning_control": True, "return_progress": True, } req = Request( f"{self.base_url}/chat/completions", data=json.dumps(data).encode('utf-8'), headers={"Content-Type": "application/json"}, method='POST' ) try: with urlopen(req, timeout=120) as resp: for line in resp: line = line.decode('utf-8').strip() if not line or not line.startswith("data: "): continue data_line = json.loads(line[6:]) delta = data_line['choices'][0].get('delta', {}) content = delta.get('content', '') yield content except URLError as e: raise RuntimeError(f"Llama-server API error: {e}") def set_model(self, model_name): self._model = model_name # ============================================================================ # Kokoro FastAPI Integration (from tts-speak skill) # ============================================================================ class KokoroFastAPIClient: """Client for Kokoro TTS FastAPI server.""" def __init__(self, base_url="http://localhost:8880"): self.base_url = base_url def synthesize(self, text, voice="af_heart", lang="en-us", speed=1.0): """Synthesize speech and return PCM audio array.""" data = { "input": text, "voice_id": voice, } req = Request( f"{self.base_url}/v1/audio/speech", data=json.dumps(data).encode('utf-8'), headers={"Content-Type": "application/json"}, method='POST' ) try: with urlopen(req, timeout=60) as resp: audio_bytes = resp.read() import io import soundfile as sf samples, sr = sf.read(io.BytesIO(audio_bytes), dtype='float32') return samples, sr except URLError as e: raise RuntimeError(f"Kokoro TTS API error: {e}") def main(): ap = argparse.ArgumentParser(description="Voice Loop — a minimal on-device voice agent (Gemma 4 Audio)") B = argparse.BooleanOptionalAction ap.add_argument("--tts", action=B, default=True, help="Kokoro TTS output") ap.add_argument("--smart_turn", action=B, default=True, help="Smart Turn v3 endpoint detection") ap.add_argument("--aec", action=B, default=True, help="WebRTC AEC3 voice interrupt") ap.add_argument("--chime", action=B, default=True, help="Chime on utterance + soft ticks while generating (default: on)") ap.add_argument("--memory", action="store_true", help="Read/write MEMORY.md (auto-update durable facts, consolidate every 5 turns)") ap.add_argument("--audio-mode", action="store_true", help="Send audio directly to Gemma (experimental)") ap.add_argument("--llama-url", default="http://localhost:8080", help="Llama-server API base URL") ap.add_argument("--tts-url", default="http://localhost:8880", help="Kokoro FastAPI TTS URL") ap.add_argument("--silence-ms", type=int, default=700) ap.add_argument("--record", nargs="?", const="", metavar="FILE", help="Record mic to WAV for debugging (default: tmp/recording-TIMESTAMP.wav)") ap.add_argument("--voice", default="af_heart", help="Kokoro voice") ap.add_argument("--list-devices", action="store_true", help="List available audio devices and exit") ap.add_argument("--input-device", type=int, default=None, help="Input device index (use --list-devices to see options)") ap.add_argument("--output-device", type=int, default=None, help="Output device index (use --list-devices to see options)") args = ap.parse_args() if args.list_devices: list_audio_devices() sys.exit(0) if args.input_device is not None: sd.default.device[0] = args.input_device if args.output_device is not None: sd.default.device[1] = args.output_device if args.record == "": tmp_dir = _DIR / "tmp" tmp_dir.mkdir(exist_ok=True) args.record = str(tmp_dir / f"recording-{_time.strftime('%Y%m%d-%H%M%S')}.wav") silence_limit = max(1, int(args.silence_ms / (CHUNK_SAMPLES / SAMPLE_RATE * 1000))) print("Loading Silero VAD...", flush=True) from silero_vad import load_silero_vad vad = load_silero_vad(onnx=True) print("Loading Moonshine (transcription)...", flush=True) from moonshine_voice import Transcriber, get_model_for_language ms_path, ms_arch = get_model_for_language("en") moonshine = Transcriber(model_path=str(ms_path), model_arch=ms_arch) print(f"Connecting to Llama-server at {args.llama_url}...", flush=True) llama = LlamaServerClient(args.llama_url) kokoro = None if args.tts: print(f"Connecting to Kokoro TTS at {args.tts_url}...", flush=True) try: kokoro = KokoroFastAPIClient(args.tts_url) test_audio, _ = kokoro.synthesize("test", voice=args.voice) print(" Kokoro TTS connected!", flush=True) except Exception as e: print(f" Warning: Kokoro TTS connection failed: {e}", file=sys.stderr) kokoro = None make_aec_processor = None if args.aec: from livekit.rtc import AudioFrame from livekit.rtc.apm import AudioProcessingModule WF = 160 def _to_i16(x): s = (x * 32767).clip(-32768, 32767).astype(np.int16) return np.pad(s, (0, max(0, WF - len(s)))) if len(s) < WF else s def _frame(b): return AudioFrame(b.tobytes(), sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=WF) def make_aec_processor(): apm = AudioProcessingModule(echo_cancellation=True, noise_suppression=True) def process(mic, ref): cleaned = np.zeros_like(mic) for i in range(0, len(mic), WF): mic_f = _frame(_to_i16(mic[i:i+WF])) apm.process_reverse_stream(_frame(_to_i16(ref[i:i+WF]))) apm.process_stream(mic_f) cleaned[i:i+WF] = (np.frombuffer(bytes(mic_f.data), dtype=np.int16).astype(np.float32) / 32767)[:len(mic[i:i+WF])] return cleaned return process print(" AEC: WebRTC AEC3 (LiveKit APM)") executor = ThreadPoolExecutor(max_workers=1) chime_sound = make_chime() if args.chime else None audio_q: queue.Queue[np.ndarray] = queue.Queue() record_buf: list[np.ndarray] | None = [] if args.record else None def callback(indata, frames, time, status): if status: print(status, file=sys.stderr) chunk = indata[:, 0].copy() if record_buf is not None: record_buf.append(chunk) audio_q.put(chunk) def drain_audio_q(): while not audio_q.empty(): try: audio_q.get_nowait() except queue.Empty: break def transcribe(audio_data): return " ".join(l.text for l in moonshine.transcribe_without_streaming( audio_data.tolist(), SAMPLE_RATE).lines if l.text).strip() def llm_generate(messages, max_tokens=200, temperature=0.7, audio_data=None): """Generate response from Llama-server with optional audio input.""" return llama._call_chat(messages, max_tokens=max_tokens, temperature=temperature, audio_data=audio_data) def stream_sentences(messages, max_tokens=200, temperature=0.7, audio_data=None): """Yield sentences as LLM generates them via streaming API.""" q: queue.Queue[str | None] = queue.Queue() cancel = threading.Event() def _worker(): try: buffer = "" for token in llama._call_chat_stream(messages, max_tokens=max_tokens, temperature=temperature, audio_data=audio_data): if cancel.is_set(): return buffer += token while True: m = _SENT_END.search(buffer) if not m: break sentence = buffer[:m.end()].strip() if len(sentence) >= _SENT_MIN_CHARS: q.put(sentence) buffer = buffer[m.end():] if buffer.strip(): q.put(buffer.strip()) except Exception as e: print(f" [LLM error: {e}]", file=sys.stderr) finally: q.put(None) threading.Thread(target=_worker, daemon=True).start() try: while True: s = q.get() if s is None: return yield s finally: cancel.set() def speak_tts(text): samples, sr = kokoro.synthesize(text, voice=args.voice, lang=_lang_from_voice(args.voice)) sd.play(samples, sr); sd.wait() _mem_path = _DIR / "MEMORY.md" def _read_memory(): return _mem_path.read_text() if _mem_path.exists() else "# Memory\n" def _run_memory(prompt, max_tokens, temperature, label): try: return llama._call_chat( [{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature, ).strip() except Exception as e: print(f" [{label} failed: {e}]", file=sys.stderr) return None def update_memory(heard, response): result = _run_memory( f"Current memory:\n{_read_memory()}\n\n" f"User said: {heard}\n\n" "Did the user state a new durable fact about themselves? " "If yes, output one short fact per line starting with '- '. " "If no, output ONLY: NONE. Do not invent facts.", max_tokens=60, temperature=0.2, label="memory update", ) if result and "NONE" not in result.upper(): lines = [l for l in result.splitlines() if l.strip().startswith("-")] if lines: with open(_mem_path, "a") as f: f.write("\n" + "\n".join(lines) + "\n") print(f" [memory +{len(lines)}]", flush=True) def consolidate_memory(): if not _mem_path.exists(): return result = _run_memory( f"Here is a memory file about a user:\n\n{_read_memory()}\n\n" "Rewrite it: merge duplicates, remove transient/session-specific " "items (questions asked, topics discussed, tests), keep only " "durable facts (identity, preferences, relationships, location, " "ongoing projects). Output the cleaned file, starting with '# Memory' " "followed by bullets starting with '- '. No explanation.", max_tokens=300, temperature=0.2, label="memory consolidation", ) if result and result.startswith("# Memory"): _mem_path.write_text(result + "\n") print(" [memory consolidated]", flush=True) def _sys_messages(): sp = load_system_prompt(include_memory=args.memory) return [{"role": "system", "content": sp}] if sp else [] def _wait_for_chime_gap(): """Wait until we're in a silent gap between ticks.""" if chime_sound is None or chime_started_at[0] == 0: return CHIME_HEAD = 0.22 TICK_DUR = 0.04 TICK_EVERY = 1.5 t = _time.monotonic() - chime_started_at[0] if t < CHIME_HEAD: _time.sleep(CHIME_HEAD - t) return phase = (t - CHIME_HEAD) % TICK_EVERY if phase < TICK_DUR: _time.sleep(TICK_DUR - phase + 0.005) def play_tts_stream(sentence_source): """Play TTS for a sentence source with AEC support.""" if isinstance(sentence_source, str): sentence_iter = iter(_split_sentences(sentence_source) or [sentence_source]) else: sentence_iter = sentence_source drain_audio_q() out_stream, interrupted = None, False tts_16k_buf: list[np.ndarray] = [] _cache_arr = np.array([], dtype=np.float32) _cache_len = 0 state = {"play_start": None, "consec_speech": 0, "mic_pos": 0} aec_process = make_aec_processor() if make_aec_processor else None def _get_tts_concat(): nonlocal _cache_arr, _cache_len if len(tts_16k_buf) != _cache_len: _cache_arr = np.concatenate(tts_16k_buf) if tts_16k_buf else np.array([], dtype=np.float32) _cache_len = len(tts_16k_buf) return _cache_arr def _append_ref(chunk_samples, sr): if aec_process is None: return if sr == SAMPLE_RATE: tts_16k_buf.append(chunk_samples.astype(np.float32)) else: idx = np.arange(0, len(chunk_samples), sr / SAMPLE_RATE) tts_16k_buf.append( np.interp(idx, np.arange(len(chunk_samples)), chunk_samples).astype(np.float32) ) def check_barge_in(): if not (aec_process and state["play_start"] and _time.monotonic() - state["play_start"] >= 0.5): return False tts_concat = _get_tts_concat() if not len(tts_concat): return False while not audio_q.empty(): mic_chunk = audio_q.get_nowait() if len(mic_chunk) < CHUNK_SAMPLES: continue ref = _get_ref_segment(tts_concat, state["mic_pos"], len(mic_chunk)) state["mic_pos"] += len(mic_chunk) cleaned = aec_process(mic_chunk, ref) if _vad_prob(vad, cleaned.astype(np.float32)) > 0.8: state["consec_speech"] += 1 if state["consec_speech"] >= 5: return True else: state["consec_speech"] = 0 return False def pad_gap_and_check(): """Drain mic chunks from the inter-sentence gap with reverb blanking.""" if aec_process is None: return False blanked = 0 while not audio_q.empty(): mic_chunk = audio_q.get_nowait() if len(mic_chunk) < CHUNK_SAMPLES: continue silence_ref = np.zeros(len(mic_chunk), dtype=np.float32) tts_16k_buf.append(silence_ref) state["mic_pos"] += len(mic_chunk) if blanked < _GAP_BLANK_SAMPLES: state["consec_speech"] = 0 blanked += len(mic_chunk) continue cleaned = aec_process(mic_chunk, silence_ref) if _vad_prob(vad, cleaned.astype(np.float32)) > 0.8: state["consec_speech"] += 1 if state["consec_speech"] >= 5: return True else: state["consec_speech"] = 0 return False async def _play(): nonlocal out_stream, interrupted loop = asyncio.get_running_loop() synth_q: asyncio.Queue = asyncio.Queue(maxsize=1) async def _synthesizer(): """Run kokoro.synthesize() in a thread.""" async def _synth(text): return await loop.run_in_executor( None, lambda t=text: kokoro.synthesize( t, voice=args.voice, speed=1.0, lang=_lang_from_voice(args.voice), ), ) GROUP = 2 buf: list[str] = [] for sentence in sentence_iter: if interrupted: break buf.append(sentence) if len(buf) == GROUP: await synth_q.put(await _synth(" ".join(buf))) buf = [] if buf and not interrupted: await synth_q.put(await _synth(" ".join(buf))) await synth_q.put(None) synth_task = asyncio.create_task(_synthesizer()) first_sentence = True try: while True: item = await synth_q.get() if item is None or interrupted: break samples, sr = item if not first_sentence and pad_gap_and_check(): interrupted = True print(" [voice interrupt]", flush=True) break if out_stream is None: if chime_sound is not None: _wait_for_chime_gap() sd.stop() device = sd.default.device[1] if args.output_device is None else args.output_device out_stream = sd.OutputStream(samplerate=sr, channels=1, dtype="float32", device=device) out_stream.start() drain_audio_q() vad.reset_states() state["play_start"] = _time.monotonic() state["consec_speech"] = 0 first_sentence = False _append_ref(samples, sr) data = samples.reshape(-1, 1) for i in range(0, len(data), 4096): if select.select([sys.stdin], [], [], 0)[0]: sys.stdin.read(1); interrupted = True elif check_barge_in(): interrupted = True; print(" [voice interrupt]", flush=True) if interrupted: break out_stream.write(data[i:i+4096]) if interrupted: break finally: synth_task.cancel() try: await synth_task except asyncio.CancelledError: pass if out_stream: out_stream.stop(); out_stream.close() asyncio.run(_play()) if interrupted and state["consec_speech"] < 3: print(" [interrupted]") drain_audio_q() vad.reset_states() return interrupted def process_utterance(audio, history): print(f" ({len(audio) / SAMPLE_RATE:.1f}s)") if chime_sound is not None: print(" *chime*", flush=True) sd.play(chime_sound, CHIME_SR) chime_started_at[0] = _time.monotonic() wav_path = save_wav(audio) if args.audio_mode else None heard, response = "", "" try: messages = _sys_messages() for h in history[-MAX_HISTORY:]: messages += [{"role": "user", "content": h["user"]}, {"role": "assistant", "content": h["assistant"]}] if args.audio_mode: # Convert raw PCM to WAV bytes in memory buf = io.BytesIO() with wave.open(buf, 'wb') as wf: wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(SAMPLE_RATE) wf.writeframes((audio * 32767).clip(-32768, 32767).astype(np.int16).tobytes()) audio_bytes = buf.getvalue() # Update last message to include audio messages[-1]["content"] = [ {"type": "text", "text": "Transcribe the following speech segment. Follow these specific instructions for formatting the answer:\\n* Only output the transcription, with no newlines.\\n* When transcribing numbers, write the digits, i.e. write 1.7 and not one point seven, and write 3 instead of three."}, {"type": "input_audio", "input_audio": {"data": base64.b64encode(audio_bytes).decode(), "format": "wav"}} ] response = llm_generate(messages, audio_data=audio_bytes) print(f"\n> {response}\n", flush=True) if kokoro and response: play_tts_stream(response) elif chime_sound is not None: _wait_for_chime_gap() sd.stop() else: # Text mode: transcription -> LLM heard = transcribe(audio) print(f" [{heard}]") messages[-1]["content"] = heard response_parts: list[str] = [] def _collecting(gen): def _emit(s): response_parts.append(s) print(f"> {s}", flush=True) return s for s in gen: yield _emit(s) print() if kokoro: play_tts_stream(_collecting(stream_sentences(messages))) else: for _ in _collecting(stream_sentences(messages)): pass if chime_sound is not None: _wait_for_chime_gap() sd.stop() response = " ".join(response_parts) print() history.append({"user": heard, "assistant": response}) if len(history) > MAX_HISTORY: history.pop(0) if args.memory: update_memory(heard, response) if len(history) % 5 == 0: consolidate_memory() except Exception as e: print(f"\nError: {e}\n", file=sys.stderr) finally: if wav_path: os.unlink(wav_path) history, buf = [], [] chime_started_at = [0.0] speaking, silent_chunks = False, 0 old_term = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) mode = "audio" if args.audio_mode else "text" print(f"\nListening (mode: {mode}, tts: {args.tts}, silence: {args.silence_ms}ms, smart_turn: {args.smart_turn})") tts_hint = (" Speak or press any key to interrupt TTS." if args.aec else " Press any key to interrupt TTS.") if args.tts else "" print(f"Speak into your microphone. Ctrl+C to quit.{tts_hint}\n", flush=True) greeting = llm_generate(_sys_messages() + [ {"role": "user", "content": ( "Greet the user as Voice Loop in one short sentence. " "If my name is in memory, use it and ask how you can help. " "Otherwise, ask for my name." )}, ], max_tokens=60) print(f"> {greeting}\n", flush=True) if kokoro: speak_tts(greeting) with sd.InputStream( samplerate=SAMPLE_RATE, channels=1, dtype="float32", blocksize=CHUNK_SAMPLES, callback=callback, device=sd.default.device[0] if args.input_device is None else args.input_device, ): try: while True: chunk = audio_q.get() if len(chunk) < CHUNK_SAMPLES: continue speech_prob = _vad_prob(vad, chunk) if speech_prob > 0.5: if not speaking: speaking = True print("[listening...]", end="", flush=True) silent_chunks = 0 buf.append(chunk) elif speaking: silent_chunks += 1 buf.append(chunk) if silent_chunks < silence_limit: continue if smart_turn and buf: prob = smart_turn(np.concatenate(buf)) print(f" [turn prob: {prob:.2f}]", end="", flush=True) if prob < 0.5: silent_chunks = 0 continue # Send accumulated buffer to Gemma as raw audio audio_data = np.concatenate(buf) process_utterance(audio_data, history) buf.clear() speaking, silent_chunks = False, 0 vad.reset_states() except KeyboardInterrupt: print("\nBye!") executor.shutdown(wait=False) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_term) if args.record and record_buf: full = np.concatenate(record_buf) with wave.open(args.record, "wb") as wf: wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(SAMPLE_RATE) wf.writeframes((full * 32767).clip(-32768, 32767).astype(np.int16).tobytes()) print(f"Recorded {len(full) / SAMPLE_RATE:.1f}s to {args.record}", flush=True) if __name__ == "__main__": main()