# Measuring Latency

{% hint style="info" %}
When measuring latency of any client server system, you must ensure to minimise network latency  as much as possible. To do so, please run your client on Indian servers or get in touch with us for an on-premise deployment.
{% endhint %}

***

### ⚙️ Running Instructions

1. &#x20;Save the full script as \`latency.py\` in your \`bodhi-streaming-asr-example\` repo which you can find [here](https://github.com/navana-tech/bodhi-streaming-asr-example).&#x20;
2. &#x20;Activate your environment:

```
   source <env_name>/bin/activate
```

3\. Run the script:

```
   python latency.py -f path/to/audio.wav
```

> Make sure the audio file is a mono 16-bit WAV file (e.g., 8kHz or 16kHz sample rate).

### 🧵 Full Script

```python
import argparse
import asyncio
import base64
import json
import sys
import wave
import websockets
import ssl
import uuid
import os
from dotenv import load_dotenv
load_dotenv()

ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

REALTIME_RESOLUTION = 0.020  # seconds per chunk

async def run(data, api_key, customer_id, channels, sample_width, sample_rate, uri):
    byte_rate = sample_width * sample_rate * channels
    audio_cursor = 0.0

    request_headers = {
        "x-api-key": api_key,
        "x-customer-id": customer_id,
    }

    connect_kwargs = { "extra_headers": request_headers }
    if uri.startswith("wss://"):
        connect_kwargs["ssl"] = ssl_context

    async with websockets.connect(uri, **connect_kwargs) as ws:
        await ws.send(json.dumps({
            "config": {
                "sample_rate": sample_rate,
                "transaction_id": str(uuid.uuid4()),
                "model": "hi-banking-v2-8khz",
                "parse_number": True,
                "aux": True,
            }
        }))

        async def sender(ws):
            nonlocal data, audio_cursor
            try:
                while len(data):
                    i = int(byte_rate * REALTIME_RESOLUTION)
                    chunk, data = data[:i], data[i:]
                    await ws.send(chunk)
                    audio_cursor += REALTIME_RESOLUTION
                    await asyncio.sleep(REALTIME_RESOLUTION)

                await ws.send(json.dumps({ "eof": 1 }))
            except Exception as e:
                print(f'Error while sending: {e}')
                raise

        async def receiver(ws):
            nonlocal audio_cursor
            transcript_cursor = 0.0
            min_latency = float("inf")
            max_latency = 0
            avg_latency_num = 0
            avg_latency_den = 0

            try:
                async for msg in ws:
                    msg = json.loads(msg)
                    if msg['type'] == 'complete':
                        continue

                    cur_max_latency = audio_cursor - transcript_cursor

                    current_offset = 0
                    timestamps = msg['segment_meta']['timestamps']
                    if timestamps:
                        current_offset = timestamps[-1]

                    transcript_cursor = msg['segment_meta']['start_time'] + current_offset
                    cur_min_latency = audio_cursor - transcript_cursor

                    avg_latency_num += ((cur_min_latency + cur_max_latency) / 2) * current_offset
                    avg_latency_den += current_offset

                    max_latency = max(max_latency, cur_max_latency)
                    min_latency = min(min_latency, cur_min_latency)

                    print(f'Measuring... Audio sent till now = {audio_cursor:.3f}, Transcript for audio till now = {transcript_cursor:.3f}')
            except websockets.exceptions.ConnectionClosedError:
                pass

            print(f'Avg latency: {avg_latency_num / (avg_latency_den or 1):.3f}')
            print('Note all latencies include network latency')

        await asyncio.gather(sender(ws), receiver(ws), return_exceptions=True)

def main():
    customer_id = os.environ.get("CUSTOMER_ID")
    api_key = os.environ.get("API_KEY")

    if not api_key or not customer_id:
        print("Please set API key and customer ID in environment variables.")
        return

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--file", type=str, help="WAV audio file path")
    parser.add_argument("-u", "--uri", type=str, default="wss://bodhi.navana.ai", help="WebSocket server URI")
    args = parser.parse_args()

    with wave.open(args.file, 'rb') as fh:
        (channels, sample_width, sample_rate, num_samples, _, _) = fh.getparams()
        assert sample_width == 2, 'WAV must be 16-bit.'
        data = fh.readframes(num_samples)

    asyncio.run(run(data, api_key, customer_id, channels, sample_width, sample_rate, args.uri))

if __name__ == '__main__':
    sys.exit(main() or 0)
```

### 🧠 Miscellaneous Notes

#### &#x20;How to compute seconds from bytes

```python
byte_rate = sample_width * sample_rate * channels
duration = num_bytes / byte_rate
```

#### How to compute `transcript_cursor`

```python
current_offset = 0
timestamps = msg['segment_meta']['timestamps']
if timestamps:
    current_offset = timestamps[-1]


transcript_cursor = msg['segment_meta']['start_time'] + current_offset
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://navana.gitbook.io/bodhi/quickstart/streaming-websocket/measuring-latency.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
