from fastapi import FastAPI, WebSocket, WebSocketDisconnect import whisper import webrtcvad import numpy as np from pydub import AudioSegment import scipy.io.wavfile as wavfile import io import asyncio app = FastAPI() # Whisper 모델 로드 (tiny 모델로 실시간성 유지) model = whisper.load_model("tiny") # VAD 설정 vad = webrtcvad.Vad() vad.set_mode(1) # 0~3 (3이 가장 엄격), 1은 중간 수준의 감도 # 클라이언트 관리 clients = {} # 발화 감지 설정 SAMPLE_RATE = 16000 # Whisper와 VAD가 요구하는 샘플레이트 FRAME_DURATION_MS = 30 # VAD 프레임 길이 (10, 20, 30ms 중 선택) SILENCE_DURATION = 1.0 # 침묵 지속 시간 (초) def is_speech_vad(audio_chunk): """webrtcvad를 사용한 발화 감지""" # 16-bit PCM으로 변환 audio = np.frombuffer(audio_chunk, dtype=np.int16) frame_size = (SAMPLE_RATE * FRAME_DURATION_MS / 1000) * 2 # 바이트 단위 if len(audio) < frame_size: return False return vad.is_speech(audio[:frame_size], SAMPLE_RATE) async def process_audio_in_memory(audio_buffer): """메모리에서 오디오 처리 및 텍스트 변환""" audio_segment = AudioSegment.from_file(io.BytesIO(audio_buffer), format="webm") # WAV 변환 (Mono, 16-bit PCM, 16kHz 샘플링) audio_segment = audio_segment.set_channels(1).set_frame_rate(SAMPLE_RATE).set_sample_width(2) wav_buffer = io.BytesIO() audio_segment.export(wav_buffer, format="wav") wav_buffer.seek(0) # Whisper로 음성 인식 result = model.transcribe(wav_buffer, fp16=False) return result["text"] @app.websocket("/audio-stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() client_id = str(id(websocket)) clients[client_id] = websocket print(f"Client {client_id} connected") audio_buffer = bytearray() last_speech_time = 0 silence_start = None try: while True: # 오디오 청크 수신 audio_chunk = await websocket.receive_bytes() # 오디오 데이터를 새로운 버퍼에 저장 (기존 데이터 누적 방지) audio_buffer = bytearray(audio_chunk) # 🔥 새로운 데이터로 덮어쓰기 # 수신 크기 확인 print(f"Received data size: {len(audio_chunk)} bytes") # 오디오 바이너리 데이터 => 숫자배열(numpy)로 해석 audio_np = np.frombuffer(audio_buffer, dtype=np.int16).copy() # WAV 파일로 저장 (덮어쓰기) output_file = "recorded_audio.wav" wavfile.write(output_file, 16000, audio_np) # STT 처리 stt_result = model.transcribe(output_file, language="ko") transcription = stt_result["text"] # 클라이언트에 데이터 전송 await websocket.send_text(transcription) # VAD로 발화 감지 # if is_speech_vad(audio_chunk): # last_speech_time = asyncio.get_event_loop().time() # silence_start = None # await websocket.send_text("Speech detected...") # else: # if silence_start is None: # silence_start = asyncio.get_event_loop().time() # elif (asyncio.get_event_loop().time() - silence_start) > SILENCE_DURATION and last_speech_time > 0: # # 침묵이 지속되면 음성 인식 수행 # transcription = await process_audio_in_memory(bytes(audio_buffer)) # await websocket.send_text(transcription) # audio_buffer = bytearray() # 버퍼 초기화 # silence_start = None # last_speech_time = 0 except WebSocketDisconnect: print(f"Client {client_id} disconnected") del clients[client_id] except Exception as e: print(f"Error: {e}") await websocket.send_text(f"Error: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)