Streaming - Websocket
Bodhi’s Streaming API delivers low-latency, real-time speech recognition in 12+ Indian languages; built for fast, accurate voice experiences.
📚 Requirements
Customer ID
Api Key
Both can be found by creating an account here
SDK Example
To transcribe audio from an audio stream using one of Bodhi SDKs, follow these steps.
Install the SDK
Open a terminal, go to the directory where you'd like to set up your project, and install the Bodhi SDK.
# Set environment variables
# export CUSTOMER_ID=<bodhi_customer_id>
# export API_KEY=<bodhi_api_key>
# Install the Bodhi Python SDK
pip install bodhi-sdk
Common Setup
from bodhi import (
BodhiClient,
TranscriptionResponse,
LiveTranscriptionEvents,
)
client = BodhiClient(api_key=API_KEY, customer_id=CUSTOMER_ID)
client.on(LiveTranscriptionEvents.Transcript, on_transcript)
client.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
client.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
client.on(LiveTranscriptionEvents.Error, on_error)
client.on(LiveTranscriptionEvents.Close, on_close)
Event handler callbacks receive real-time updates:
These optional asynchronous callbacks let you respond to real-time events during a transcription session. You can implement only the ones relevant to your use case—none are required.
on_transcript
– Triggered whenever a transcript is availableon_utterance_end
– Triggered when the system detects the end of a spoken utterance.on_speech_started
– Triggered each time the user starts speaking after the completion of a previous utterance, marking the start of a new speech segmenton_error
– Triggered if any error occurs during processing.on_close
– Triggered when the WebSocket connection is closed.
async def on_transcript(response: TranscriptionResponse):
print(f"Transcript: {response.text}")
async def on_utterance_end(response: TranscriptionResponse):
print(f"UtteranceEnd: {response}")
async def on_speech_started(response: TranscriptionResponse):
print(f"SpeechStarted: {response}")
async def on_error(e: Exception):
print(f"Error: {str(e)}")
async def on_close():
print("WebSocket connection closed.")
Transcribing from Live Streaming
import wave
config = TranscriptionConfig(model="hi-banking-v2-8khz")
# Start streaming connection
# Mock live audio stream by reading a local file in chunks
# Replace stream of audio coming from your telephony provider
with wave.open("loan.wav", "rb") as wf:
sample_rate = wf.getframerate()
config.sample_rate = sample_rate
await client.start_connection(config=config)
REALTIME_RESOLUTION = 0.02 # 20ms
byte_rate = sample_rate * wf.getsampwidth() * wf.getnchannels()
data = wf.readframes(wf.getnframes())
audio_cursor = 0
while len(data):
i = int(byte_rate * REALTIME_RESOLUTION)
chunk, data = data[:i], data[i:]
await client.send_audio_stream(chunk)
audio_cursor += REALTIME_RESOLUTION
await asyncio.sleep(REALTIME_RESOLUTION)
# Close connection and get final transcription result
result = await client.close_connection()
print("Final result:", result)
Transcribing from a Remote File URL
config = TranscriptionConfig(model="hi-banking-v2-8khz")
audio_url = "https://bodhi.navana.ai/audios/loan.wav"
result = await client.transcribe_remote_url(audio_url, config=config)
print("Final result:", result)
from a Local File
import os
config = TranscriptionConfig(model="hi-banking-v2-8khz")
audio_file = os.path.join(os.path.dirname(__file__), "loan.wav")
result = await client.transcribe_local_file(audio_file, config=config)
print("Final result:", result)
Non-SDK Example
This script demonstrates how to stream audio data from a .wav file to an ASR server in real-time using WebSocket. This is a good way to get comfortable with the api before connecting it up for live calling. You can learn more about this connection lifecycle here.
All code snippets in this document come from the following python demo. These snippets are meant for educational purposes. Please refer to the demo link for the latest and complete code.
Establish connection
Connect to the server using the WebSocket URI, including necessary authentication headers (x-api-key
and x-customer-id
).
# Set environment variables
#export CUSTOMER_ID=<bodhi_customer_id>
#export API_KEY=<bodhi_api_key>
request_headers = {
"x-api-key": api_key,
"x-customer-id": customer_id,
}
chunk_duration_ms = 100
connector = aiohttp.TCPConnector(ssl=ssl_context if uri.startswith("wss://") else None)
async with aiohttp.ClientSession(connector=connector, headers=request_headers) as session:
try:
async with session.ws_connect(uri) as ws:
wf = wave.open(filepath, "rb")
channels, sample_width, sample_rate, num_samples, _, _ = wf.getparams()
print(
f"Channels = {channels}, Sample Rate = {sample_rate} Hz, Sample width = {sample_width} bytes",
file=sys.stderr,
)
except aiohttp.WSServerHandshakeError as e:
print(f"WebSocket handshake failed with status code: {e.status}", file=sys.stderr)
if e.status == 401:
print("Invalid API key or customer ID.", file=sys.stderr)
elif e.status == 402:
print("Insufficient balance.", file=sys.stderr)
elif e.status == 403:
print("Customer has been deactivated", file=sys.stderr)
except aiohttp.ClientConnectionError as e:
print(f"Connection error: {str(e)}", file=sys.stderr)
except Exception as e:
print(f"An error occurred: {str(e)}", file=sys.stderr)
import traceback
print("Full error traceback:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
After this block of code succeeds, you have created a persistent connection with the websocket server.
The code also illustrates a list of errors that can occur during websocket connection. Read more here
Configure connection
Now, you need to send a configuration containing the sample_rate
, a unique transaction_id
, and the model
you wish to use. A list of available models can be found here ↗️
config_msg = json.dumps(
{
"config": {
"sample_rate": sample_rate,
"transaction_id": str(uuid.uuid4()),
"model": "hi-banking-v2-8khz",
}
}
)
await ws.send_str(config_msg)
8000 (8khz) is the default sample rate for telephone channels. All Bodhi models are specifically optimised for 8khz audio files.
Send and Receive Audio
In order for real time transcription to take place, you need to send and receive audio at the same time.
Send Audio
async def send_audio(ws, wf, buffer_size, interval_seconds):
while True:
data = wf.readframes(buffer_size)
if not data:
break
await ws.send_bytes(data)
await asyncio.sleep(interval_seconds)
# Send EOF JSON message
EOF_MESSAGE = '{"eof": 1}'
await ws.send_str(EOF_MESSAGE)
To send audio, we iteratively loop through bytes in the audio file and send the data to the server. After sending a chunk of data, we sleep for some time (interval_seconds
) to simulate real time streaming. Finally, we indicate to the server that we will not be sending any more data by sending the eof
signal. This helps the Bodhi process and finalize the transcription response, ensuring that the client receives the final results after the entire audio is processed.
Receive Audio
async def receive_transcription(ws):
complete_sentences = []
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
response_data = json.loads(msg.data)
call_id = response_data.get("call_id")
segment_id = response_data.get("segment_id")
transcript_type = response_data.get("type")
transcript_text = response_data.get("text")
end_of_stream = response_data.get("eos", False)
if transcript_type == "complete" and transcript_text != "":
complete_sentences.append(transcript_text)
print(
f"Received data: Call_id={call_id}, "
f"Segment_id={segment_id}, "
f"EOS={end_of_stream}, "
f"Type={transcript_type}, "
f"Text={transcript_text}"
)
if end_of_stream:
print("Complete transcript: ", ", ".join(complete_sentences))
break
except json.JSONDecodeError:
print(f"Received a non-JSON response: {msg.data}")
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
Run them together
buffer_size = int(sample_rate * chunk_duration_ms / 1000)
interval_seconds = chunk_duration_ms / 1000.0
send_task = asyncio.create_task(send_audio(ws, wf, buffer_size, interval_seconds))
recv_task = asyncio.create_task(receive_transcription(ws))
await asyncio.gather(send_task, recv_task)
After sending the audio data, you will receive partial and complete transcription responses from the server. You can process partial responses for real-time feedback however it is possible that a response changes in nature by the time you receive a complete transcript. This is because as the server receives more audio context it is possible that the model will modify its previous hypothesis to improve accuracy.
Next Steps
Check the detailed response structure to see what information
Check the error responses to ensure they are being handled appropriately
Check the feature overview to see how to improve accuracy, background handling and more.
Last updated