Live Transcribe Example and Walkthrough

Last updated: 2023-12-09Contributors
Edit this page

Live Transcription is one of the advanced uses of Audio Streaming service with the support of any Live Transcribe service from platforms like Google, Watson etc.

The following quick guide will take you through a simple process of creating a local Live Transcribe server that works with RingCX Audio Streaming and Google Live Transcribe.

Important

It's assumed that you have gone through Getting Started Guide

Prerequisites

Step.1 Start ngrok

In command line, run:

ngrok http 3333

This will start a server with an http and https server URL. In this instance, we want to use the secure connection so look for:

https://xxxxxx.ngrok.io

Replace https with wss so we have

wss://xxxxxx.ngrok.io

This will be our streamingUrl.

Step.2 Setup streaming profile

Please refer to Getting Started Guide Step.1.

Step.3 Start Local Server

Since we've already setup ngrok tunnel which will publicly open our server on local port 3333 to wss://xxxxxx.ngrok.io, let's now start our server with below sample code. It receives audio streaming segments and applies Google Cloud Speech To Text service to convert them into texts.

Sample Code

WebSocket server sample code:

    // Note: It takes couple of seconds connecting to Google server, then the transcription will begin

    const WebSocket = require("ws");
    // Imports the Google Cloud client library
    const speech = require('@google-cloud/speech');

    const wss = new WebSocket.Server({
        port: 3333
        });

    // Creates a client
    const client = new speech.SpeechClient();
    const request = {
        config: {
            encoding: "MULAW",
            sampleRateHertz: 8000,
            languageCode: 'en-US',
        },
        interimResults: false, // If you want interim results, set this to true
        };

        console.log(`Server started on port: ${wss.address().port}`);

    // Handle Web Socket Connection
    wss.on("connection", function connection(ws) {
        console.log("New Connection Initiated");
        //Create a recognize stream
        const recognizeStream = client
          .streamingRecognize(request)
          .on('error', console.error)
          .on('data', data =>
            process.stdout.write(
              data.results[0] && data.results[0].alternatives[0]
                ? `========\n Transcription: ${data.results[0].alternatives[0].transcript}\n            Confidence: ${data.results[0].alternatives[0].confidence}\n`
                : '\n\nReached transcription time limit, press Ctrl+C\n'
            )
            );

        ws.on("message", function incoming(message) {
            const msg = JSON.parse(message);
            switch (msg.event) {
                case "Connected":
                    console.log(`A new call has connected.`);
                    console.log(msg);
                    break;
                case "Start":
                    console.log('Starting Media Stream');
                    callId = msg.metadata.callId;
                    break;
                case "Media":
                    switch (msg.perspective) {
                        // Here we only do client side transcription
                        case 'Conference':
                            recognizeStream.write(msg.media);
                            break;
                    }
                    break;
                case "Stop":
                    console.log(`Call Has Ended`);
                    recognizeStream.end()
                    break;
            }
        });
    });
import argparse
import asyncio
import json
import logging
import websockets
import base64
import sys
import re
import threading
from google.cloud import speech
from six.moves import queue

logging.basicConfig(level=logging.INFO)

BUFFER_COUNT = 5 # to add up audio segments to 100ms as recommended by Google

def listen_print_loop(responses):
    """Iterates through server responses and prints them.
    The responses passed is a generator that will block until a response
        is provided by the server.
    Each response may contain multiple results, and each result may contain
    multiple alternatives; for details, see https://goo.gl/tjCPAU.  Here we
        print only the transcription for the top alternative of the top result.
    In this case, responses are provided for interim results as well. If the
    response is an interim one, print a line feed at the end of it, to allow
    the next result to overwrite it, until the response is a final one. For the
    final one, print a newline to preserve the finalized transcription.
    """
    num_chars_printed = 0
    for response in responses:
        if not response.results:
                continue
        # The `results` list is consecutive. For streaming, we only care about
        # the first result being considered, since once it's `is_final`, it
        # moves on to considering the next utterance.
        result = response.results[0]
        if not result.alternatives:
                continue
        # Display the transcription of the top alternative.
        transcript = result.alternatives[0].transcript
        # Display interim results, but with a carriage return at the end of the
        # line, so subsequent lines will overwrite them.
        #
        # If the previous result was longer than this one, we need to print
        # some extra spaces to overwrite the previous result
        overwrite_chars = " " * (num_chars_printed - len(transcript))
        if not result.is_final:
            sys.stdout.write(f'{transcript}{overwrite_chars}\r')
            sys.stdout.flush()
            num_chars_printed = len(transcript)
        else:
            print(transcript + overwrite_chars)
            # Exit recognition if any of the transcribed phrases could be
            # one of our keywords.
            if re.search(r"\b(exit|quit)\b", transcript, re.I):
                print("Exiting..")
                break
            num_chars_printed = 0

class Transcoder(object):
    """
    Converts audio chunks to text
    """
    def __init__(self):
        self.buff = queue.Queue()
        self.closed = False
        self.transcript = None
        self.client = speech.SpeechClient()
        self.config = speech.RecognitionConfig(
            encoding=speech.RecognitionConfig.AudioEncoding.MULAW,
            sample_rate_hertz=8000,
            language_code="en-US",
            max_alternatives=1,
            model='phone_call'
            )
        self.streaming_config = speech.StreamingRecognitionConfig(   
            config=self.config, interim_results=True,
        )
        """Start up streaming speech call"""
        t = threading.Thread(target=self.process)
        t.isDaemon = True
        t.start()


    def process(self):
        """
        Audio stream recognition and result parsing
        """
        audio_generator = self.stream_generator()
        requests = (speech.StreamingRecognizeRequest(audio_content=content)
                    for content in audio_generator)

        responses = self.client.streaming_recognize(self.streaming_config, requests)
        listen_print_loop(responses)

    def stream_generator(self):
        while not self.closed:
            chunk = self.buff.get()
            if chunk is None:
                return
            data = [chunk]
            while True:
                try:
                    chunk = self.buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break
            yield b''.join(data)

    def write(self, data):
        """
        Writes data to the buffer
        """
        self.buff.put(data)

    def exit(self):
        self.closed = True

def log_message(message: str) -> None:
    logging.info(f"Message: {message}")

async def handle(websocket, path):
    logging.info(path) 
    transcoder = Transcoder()
    buffer_counter=0
    buffer = b""
    async for messageStr in websocket:
        # logging.info(messageStr)
        message = json.loads(messageStr)
        if message["event"] is not None and message["event"] == "Connected":
            logging.info("Consumed ACK")
        elif message["event"] is not None and message["event"] == "Start":
            print("start")
        elif message["event"] is not None and message["event"] == "Media":
            buffer_counter += 1
            media = message["media"]
            media_bytes = base64.b64decode(media)
            if buffer_counter > BUFFER_COUNT:
                transcoder.write(buffer)
                buffer_counter=0
                buffer = b""
            else:
                buffer = buffer + media_bytes
        elif message["event"] is not None and message["event"] == "Stop":
            transcoder.exit()
            break

async def main():
    parser = argparse.ArgumentParser(description='Starts up a SimpleWebSocket Server, will send         messages to all conencted consumers')
    parser.add_argument('--port',"-p", help='port number of the producer sending websocket data')
    parser.add_argument('--hostname',"-n", help='hostname of the producer websocket')

    args = parser.parse_args()

    if args.hostname is None:
        logging.info('No Hostname was supplied, defaulting to 127.0.0.1')
        args.hostname = '127.0.0.1'

    if args.port is None:
        logging.info('No port was supplied, defaulting to 3333')
        args.port = 3333

    logging.info("Server started on host: " + args.hostname + ":" + str(args.port))
    async with websockets.serve(handle, args.hostname, args.port, ping_interval=1,      ping_timeout=500000):
        await asyncio.Future()  # run forever

if __name__ == "__main__":  
    asyncio.run(main())

Start the server and make a phone call. It then will take couple of seconds to connect to Google service. After that, transcribed texts will start to show in your console like in below.

  • NodeJs

  • Python

Additional Notes

To achieve better performance overall, you might want to look into the details on: