| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
- import whisper
- import webrtcvad
- import numpy as np
- from pydub import AudioSegment
- import scipy.io.wavfile as wavfile
- import io
- import asyncio
- app = FastAPI()
- # Whisper 모델 로드 (tiny 모델로 실시간성 유지)
- model = whisper.load_model("tiny")
- # VAD 설정
- vad = webrtcvad.Vad()
- vad.set_mode(1) # 0~3 (3이 가장 엄격), 1은 중간 수준의 감도
- # 클라이언트 관리
- clients = {}
- # 발화 감지 설정
- SAMPLE_RATE = 16000 # Whisper와 VAD가 요구하는 샘플레이트
- FRAME_DURATION_MS = 30 # VAD 프레임 길이 (10, 20, 30ms 중 선택)
- SILENCE_DURATION = 1.0 # 침묵 지속 시간 (초)
- def is_speech_vad(audio_chunk):
- """webrtcvad를 사용한 발화 감지"""
- # 16-bit PCM으로 변환
- audio = np.frombuffer(audio_chunk, dtype=np.int16)
- frame_size = (SAMPLE_RATE * FRAME_DURATION_MS / 1000) * 2 # 바이트 단위
- if len(audio) < frame_size:
- return False
- return vad.is_speech(audio[:frame_size], SAMPLE_RATE)
- async def process_audio_in_memory(audio_buffer):
- """메모리에서 오디오 처리 및 텍스트 변환"""
- audio_segment = AudioSegment.from_file(io.BytesIO(audio_buffer), format="webm")
- # WAV 변환 (Mono, 16-bit PCM, 16kHz 샘플링)
- audio_segment = audio_segment.set_channels(1).set_frame_rate(SAMPLE_RATE).set_sample_width(2)
- wav_buffer = io.BytesIO()
- audio_segment.export(wav_buffer, format="wav")
- wav_buffer.seek(0)
- # Whisper로 음성 인식
- result = model.transcribe(wav_buffer, fp16=False)
- return result["text"]
- @app.websocket("/audio-stream")
- async def websocket_endpoint(websocket: WebSocket):
- await websocket.accept()
- client_id = str(id(websocket))
- clients[client_id] = websocket
- print(f"Client {client_id} connected")
- audio_buffer = bytearray()
- last_speech_time = 0
- silence_start = None
- try:
- while True:
- # 오디오 청크 수신
- audio_chunk = await websocket.receive_bytes()
- # 오디오 데이터를 새로운 버퍼에 저장 (기존 데이터 누적 방지)
- audio_buffer = bytearray(audio_chunk) # 🔥 새로운 데이터로 덮어쓰기
- # 수신 크기 확인
- print(f"Received data size: {len(audio_chunk)} bytes")
-
- # 오디오 바이너리 데이터 => 숫자배열(numpy)로 해석
- audio_np = np.frombuffer(audio_buffer, dtype=np.int16).copy()
- # WAV 파일로 저장 (덮어쓰기)
- output_file = "recorded_audio.wav"
- wavfile.write(output_file, 16000, audio_np)
- # STT 처리
- stt_result = model.transcribe(output_file, language="ko")
- transcription = stt_result["text"]
-
- # 클라이언트에 데이터 전송
- await websocket.send_text(transcription)
-
- # VAD로 발화 감지
- # if is_speech_vad(audio_chunk):
- # last_speech_time = asyncio.get_event_loop().time()
- # silence_start = None
- # await websocket.send_text("Speech detected...")
- # else:
- # if silence_start is None:
- # silence_start = asyncio.get_event_loop().time()
- # elif (asyncio.get_event_loop().time() - silence_start) > SILENCE_DURATION and last_speech_time > 0:
- # # 침묵이 지속되면 음성 인식 수행
- # transcription = await process_audio_in_memory(bytes(audio_buffer))
- # await websocket.send_text(transcription)
- # audio_buffer = bytearray() # 버퍼 초기화
- # silence_start = None
- # last_speech_time = 0
- except WebSocketDisconnect:
- print(f"Client {client_id} disconnected")
- del clients[client_id]
- except Exception as e:
- print(f"Error: {e}")
- await websocket.send_text(f"Error: {str(e)}")
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
|