Noise is added to the voice data of OpenAI Realtime Speech-to-Speech in Workers

For Workers & Pages, what is the name of the domain?

https://cwhdt-hono-api.dev-a42.workers.dev/

What is the error number?

I have no error number.

What is the error message?

I have no error message.

What is the issue or error you’re encountering

I am trying to implement the Twilio + OpenAI Realtime API using WebSocket in Workers. The interactions using OpenAI’s text-based Realtime API work correctly on Cloudflare, and the Realtime Speech-to-Speech in a Node.js environment on local or AWS ECS is able to correctly send and receive voice data, which should essentially be doing the same processing. I suspect there may be some issues with the handling of voice data over WebSocket in the Cloudflare environment (perhaps a bug in the library?), but the investigation has reached a dead end.

What steps have you taken to resolve the issue?

I have been investigating the following:

  • I rewrote the Twilio official sample at GitHub - twilio-samples/speech-assistant-openai-realtime-api-node, which uses Fastify, using Hono (due to my level of familiarity).
  • I confirmed that voice data can be exchanged using Twilio + Realtime API in both a local Node.js environment and AWS ECS.
  • I verified the operation of the text-based Realtime API in the same two environments.
  • I modified the code for both text-based and voice-based APIs for the Cloudflare environment.
  • I confirmed that the text-based application works in the deployed environment and when starting the local environment with wrangler dev.
  • When using Twilio for voice-based communication in the local environment with wrangler dev and in the deployed environment, I found that the audio data from OpenAI is severely noisy and unintelligible (possibly the data is corrupted rather than just noisy).

What are the steps to reproduce the issue?

Use twilio, build an Hono app, and set the wsVoiceHandler in the following code.

/**
 * Cloudflare環境用のWebSocket Voice ハンドラー(統合版)
 *
 * このファイルはCloudflare環境でOpenAI Realtime APIを使用した音声対話サーバーを提供します。
 * 必要な依存関係をすべて1つのファイルにまとめています。
 */

import type { Context } from "hono";
import type { Env } from "hono/types";

// ========== 定数 ==========

// OpenAI Realtime APIのURL
const REALTIME_API_URL =
	"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01";

// システムメッセージ
const SYSTEM_MESSAGE = "Respond simply.";

// 音声設定
const VOICE = "alloy";

// ========== ロガー関数 ==========

/**
 * ログレベルの型定義
 */
type LogLevel = "log" | "warn" | "error";

/**
 * ログ保存結果の型定義
 */
type LogResult = {
	success: boolean;
	error?: string | Error;
	[key: string]: unknown;
};

/**
 * 汎用ロガー関数
 *
 * @param message ログメッセージ
 * @param isLocal ローカル環境かどうか (true: ローカル環境, false: 本番環境)
 * @param level ログレベル ('log' | 'warn' | 'error')
 * @param sessionId セッションID (オプション、ログファイル名に使用)
 * @returns Promise<LogResult> ログ保存の結果
 */
async function logMessage(
	message: string,
	isLocal: boolean,
	level: LogLevel = "log",
	sessionId = "app",
): Promise<LogResult> {
	// ログメッセージにレベルを付加
	const formattedMessage = `[${level.toUpperCase()}] ${message}`;

	try {
		if (isLocal) {
			// ローカル環境: audio-saver-api を使用してファイルに保存
			const response = await fetch(
				`http://localhost:3001/save-log?sessionId=${sessionId}`,
				{
					method: "POST",
					body: formattedMessage,
					headers: {
						"Content-Type": "text/plain",
					},
				},
			);
			// レスポンスを解析して結果を返す
			return (await response.json()) as LogResult;
		}

		// 本番環境: console にのみ出力
		console[level](formattedMessage);
		return { success: true } as LogResult;
	} catch (error) {
		// エラーハンドリング - 結果オブジェクトのみを返す
		return { success: false, error } as LogResult;
	}
}

// ========== ユーティリティ関数 ==========

/**
 * 現在の日本時間を取得する
 */
function nowJst() {
	const now = new Date();
	return now.toLocaleString("ja-JP", { timeZone: "Asia/Tokyo" });
}

/**
 * Cloudflare Workers環境でOpenAI Realtime APIに接続するためのWebSocketを作成する
 *
 * @param openai_api_key OpenAI APIキー
 * @returns Cloudflare Workers環境でのWebSocket接続
 */
async function createCloudflareRealtimeApiWebSocket(
	openai_api_key: string,
): Promise<WebSocket> {
	try {
		// OpenAI Realtime APIのURLを準備
		const apiUrl = REALTIME_API_URL.replace("wss://", "https://");
		console.log("👺Connecting to OpenAI Realtime API:", apiUrl);

		// fetch APIを使用してWebSocketアップグレードリクエストを送信
		const response = await fetch(apiUrl, {
			headers: {
				Authorization: `Bearer ${openai_api_key}`,
				"OpenAI-Beta": "realtime=v1",
				Upgrade: "websocket",
				Connection: "Upgrade",
				"Sec-WebSocket-Version": "13",
				"Sec-WebSocket-Key": btoa(Math.random().toString(36).substring(2, 15)),
			},
		});

		// @ts-ignore - Cloudflare Workers固有のAPIのため型エラーを無視
		const webSocket = response.webSocket;

		if (!webSocket) {
			throw new Error(
				"WebSocket接続の確立に失敗しました: response.webSocketがnull",
			);
		}

		// WebSocket接続を確立
		// @ts-ignore - Cloudflare Workers固有のAPIのため型エラーを無視
		webSocket.accept();

		// エラーハンドリングを追加
		webSocket.addEventListener("error", (error: Event) => {
			console.error("👺WebSocket接続エラー:", error);
		});

		console.log("👺OpenAI Realtime API WebSocket connection established");
		return webSocket;
	} catch (error) {
		console.error("👺WebSocket接続エラー:", error);
		throw error;
	}
}

// ========== WebSocket共通インターフェースと関数 ==========

/**
 * WebSocketの共通インターフェース
 * CloudflareとNode.jsの両方の環境で動作するように
 */
interface WebSocketLike {
	send(data: string): void;
	readyState?: number;
}

/**
 * セッション初期化用のメッセージを作成
 */
const createSessionUpdateMessage = () => {
	return {
		type: "session.update",
		session: {
			turn_detection: { type: "server_vad" },
			input_audio_format: "g711_ulaw",
			output_audio_format: "g711_ulaw",
			voice: VOICE,
			instructions: SYSTEM_MESSAGE,
			modalities: ["text", "audio"],
			temperature: 0.8,
		},
	};
};

/**
 * 音声入力が開始された時の処理
 * AIの応答を途中で切り上げる
 */
const handleSpeechStartedEvent = (
	markQueue: string[],
	responseStartTimestampTwilio: number | null,
	latestMediaTimestamp: number,
	lastAssistantItem: string | null,
	openAiWs: WebSocketLike,
	serverWs: WebSocketLike,
	streamSid: string | null,
) => {
	if (markQueue.length > 0 && responseStartTimestampTwilio != null) {
		const elapsedTime = latestMediaTimestamp - responseStartTimestampTwilio;

		if (lastAssistantItem) {
			const truncateEvent = {
				type: "conversation.item.truncate",
				item_id: lastAssistantItem,
				content_index: 0,
				audio_end_ms: elapsedTime,
			};
			openAiWs.send(JSON.stringify(truncateEvent));
		}

		serverWs.send(
			JSON.stringify({
				event: "clear",
				streamSid: streamSid,
			}),
		);

		return {
			markQueue: [],
			lastAssistantItem: null,
			responseStartTimestampTwilio: null,
		};
	}

	return { markQueue, lastAssistantItem, responseStartTimestampTwilio };
};

/**
 * マークメッセージを送信
 * AIの応答再生が完了したかどうかを確認するため
 */
const sendMark = (
	connection: WebSocketLike,
	streamSid: string | null,
	markQueue: string[],
) => {
	if (streamSid) {
		const markEvent = {
			event: "mark",
			streamSid: streamSid,
			mark: { name: "responsePart" },
		};
		connection.send(JSON.stringify(markEvent));
		markQueue.push("responsePart");
	}
	return markQueue;
};

/**
 * OpenAIからの音声データを処理
 */
const handleAudioDelta = (
	response: {
		delta: string;
		item_id?: string;
	},
	streamSid: string | null,
	serverWs: WebSocketLike,
	responseStartTimestampTwilio: number | null,
	latestMediaTimestamp: number,
	lastAssistantItem: string | null,
	markQueue: string[],
) => {
	const audioDelta = {
		event: "media",
		streamSid: streamSid,
		media: { payload: response.delta },
	};
	serverWs.send(JSON.stringify(audioDelta));

	// First delta from a new response starts the elapsed time counter
	let newResponseStartTimestampTwilio = responseStartTimestampTwilio;
	if (!responseStartTimestampTwilio) {
		newResponseStartTimestampTwilio = latestMediaTimestamp;
	}

	let newLastAssistantItem = lastAssistantItem;
	if (response.item_id) {
		newLastAssistantItem = response.item_id;
	}

	const newMarkQueue = sendMark(serverWs, streamSid, [...markQueue]);

	return {
		responseStartTimestampTwilio: newResponseStartTimestampTwilio,
		lastAssistantItem: newLastAssistantItem,
		markQueue: newMarkQueue,
	};
};

/**
 * Twilioからのメディアメッセージを処理
 */
const handleMediaMessage = (
	data: {
		media: {
			payload: string;
			timestamp?: number;
		};
	},
	openAiWs: WebSocketLike,
) => {
	const audioAppend = {
		type: "input_audio_buffer.append",
		audio: data.media.payload,
	};
	openAiWs.send(JSON.stringify(audioAppend));
};

/**
 * 会話開始メッセージを作成
 */
const createConversationItem = () => {
	return {
		type: "conversation.item.create",
		item: {
			type: "message",
			role: "user",
			content: [],
		},
	};
};

/**
 * レスポンス作成リクエストを作成
 */
const createResponseItem = () => {
	return {
		type: "response.create",
		response: {
			modalities: ["text", "audio"],
			instructions: SYSTEM_MESSAGE,
		},
	};
};

// ========== メインハンドラー関数 ==========

// 環境変数の型拡張
type EnvWithOpenAI = Env & {
	OPENAI_API_KEY?: string;
	ENVIRONMENT?: string;
};

/**
 * Cloudflare環境用のWebSocketサーバーを設定するための関数
 * OpenAI Realtime APIを使用した音声対話を処理します
 */
export const wsVoiceHandler = async (
	c: Context<{ Bindings: EnvWithOpenAI }>,
) => {
	// 環境変数の判定(ローカル環境かどうか)
	// 標準ではfalse、環境変数があり、かつ値がLOCALである場合にのみtrue
	const isLocalEnvironment: boolean = Boolean(
		c.env.ENVIRONMENT && c.env.ENVIRONMENT === "LOCAL",
	);

	// Node.js版は別処理
	// WebSocketサーバーを作成
	const webSocketPair = new WebSocketPair();
	// Cloudflareのエッジとクライアント(Twilio)間のWebSocket接続
	const client = webSocketPair[0];
	// Workerスクリプト内で操作するWebSocket接続
	const server = webSocketPair[1];

	try {
		// Node.js版にはない
		// WebSocketの接続をアップグレード
		const upgradeHeader = c.req.header("Upgrade");
		if (!upgradeHeader || upgradeHeader !== "websocket") {
			return c.text("Expected Upgrade: websocket", 400);
		}

		// Connection-specific state
		let streamSid: string | null = null;
		let latestMediaTimestamp = 0;
		let lastAssistantItem: string | null = null;
		let markQueue: string[] = [];
		let responseStartTimestampTwilio: number | null = null;

		// 環境変数から OPENAI_API_KEY を取得
		const OPENAI_API_KEY = c.env.OPENAI_API_KEY;
		if (!OPENAI_API_KEY) {
			console.error("OpenAI API Key is not set");
			return c.text("OpenAI API Key is not set", 500);
		}

		// Node.js版にはない
		// OpenAIサーバーとの接続状態管理
		let openAiConnected = false;
		let conversationStarted = false;

		// OpenAIとのWebSocket接続を作成
		const openAiWs = await createCloudflareRealtimeApiWebSocket(OPENAI_API_KEY);

		// セッション初期化
		const initializeSession = () => {
			const sessionUpdate = createSessionUpdateMessage();
			openAiWs.send(JSON.stringify(sessionUpdate));
		};

		// OpenAIサーバーとの接続が確立したときのハンドラー
		openAiWs.addEventListener("open", async () => {
			await logMessage(
				"Connected to the OpenAI Realtime API",
				isLocalEnvironment,
				"log",
				streamSid || "unknown",
			);
			openAiConnected = true; // Node.js版にはない
			setTimeout(initializeSession, 100);
		});

		// Listen for messages from the OpenAI WebSocket (and send to client if necessary)
		openAiWs.addEventListener("message", async (event: MessageEvent) => {
			try {
				// データがArrayBufferかどうかをチェック
				const response =
					event.data instanceof ArrayBuffer
						? JSON.parse(new TextDecoder().decode(event.data))
						: JSON.parse(event.data);

				// エラーイベントのみログ出力
				if (response.type === "error") {
					await logMessage(
						`Response: ${JSON.stringify(response)}`,
						isLocalEnvironment,
						"log",
						streamSid || "unknown",
					);
				}

				// Node.js版にはない
				if (response.type === "session.created") {
					openAiConnected = true;
				}

				if (response.type === "response.audio.delta" && response.delta) {
					// 共通ロジックを使用して音声データを処理
					const result = handleAudioDelta(
						response,
						streamSid,
						server,
						responseStartTimestampTwilio,
						latestMediaTimestamp,
						lastAssistantItem,
						markQueue,
					);

					responseStartTimestampTwilio = result.responseStartTimestampTwilio;
					lastAssistantItem = result.lastAssistantItem;
					markQueue = result.markQueue;
				}

				if (response.type === "input_audio_buffer.speech_started") {
					const result = handleSpeechStartedEvent(
						markQueue,
						responseStartTimestampTwilio,
						latestMediaTimestamp,
						lastAssistantItem,
						openAiWs,
						server,
						streamSid,
					);
					markQueue = result.markQueue;
					lastAssistantItem = result.lastAssistantItem;
					responseStartTimestampTwilio = result.responseStartTimestampTwilio;
				}
			} catch (error) {
				const errorMessage = `Error processing OpenAI message: ${error}, Raw message: ${typeof event.data === "string" ? event.data : "binary data"}`;
				await logMessage(
					errorMessage,
					isLocalEnvironment,
					"error",
					streamSid || "unknown",
				);
			}
		});

		// Handle incoming messages from Twilio
		server.addEventListener("message", async (event: MessageEvent) => {
			try {
				// データがArrayBufferかどうかをチェック
				const data =
					event.data instanceof ArrayBuffer
						? JSON.parse(new TextDecoder().decode(event.data))
						: JSON.parse(event.data);

				switch (data.event) {
					case "media":
						latestMediaTimestamp = data.media.timestamp;

						if (openAiWs.readyState === WebSocket.OPEN) {
							// 共通ロジックを使用してメディアメッセージを処理
							handleMediaMessage(data, openAiWs);

							// Node.js版にはない
							// 会話がまだ開始されていない場合は、会話を開始する
							if (openAiConnected && !conversationStarted) {
								// 空の会話アイテムを作成(音声入力用)
								openAiWs.send(JSON.stringify(createConversationItem()));
								// レスポンス作成リクエストを送信
								openAiWs.send(JSON.stringify(createResponseItem()));
								conversationStarted = true;
							}
						}
						break;
					case "start":
						streamSid = data.start.streamSid;
						await logMessage(
							`Incoming stream has started: ${streamSid}`,
							isLocalEnvironment,
							"log",
							streamSid || "unknown",
						);

						// Reset start and media timestamp on a new stream
						responseStartTimestampTwilio = null;
						latestMediaTimestamp = 0;
						break;
					case "mark":
						if (markQueue.length > 0) {
							markQueue.shift();
						}
						break;
					default:
						console.log("👺Received non-media event:", data.event);
						break;
				}
			} catch (error) {
				const errorMessage = `Error parsing Twilio message: ${error}, Message: ${typeof event.data === "string" ? event.data : "binary data"}`;
				await logMessage(
					errorMessage,
					isLocalEnvironment,
					"error",
					streamSid || "unknown",
				);
			}
		});

		// Handle connection close
		server.addEventListener("close", async () => {
			await logMessage(
				"Client disconnected",
				isLocalEnvironment,
				"log",
				streamSid || "unknown",
			);
			if (openAiWs.readyState === WebSocket.OPEN) openAiWs.close();
		});

		// Handle WebSocket close and errors
		openAiWs.addEventListener("close", async () => {
			await logMessage(
				"Disconnected from the OpenAI Realtime API",
				isLocalEnvironment,
				"log",
				streamSid || "unknown",
			);
			// Node.js版にはない
			openAiConnected = false;
		});

		// OpenAI WebSocket側のエラー発生時のハンドリング
		openAiWs.addEventListener("error", async (error: Event) => {
			await logMessage(
				`Error in the OpenAI WebSocket: ${error}`,
				isLocalEnvironment,
				"error",
				streamSid || "unknown",
			);
		});
	} catch (e) {
		const errorMessage = `Error setting up WebSocket: ${e}`;
		await logMessage(errorMessage, isLocalEnvironment, "error", "setup_error");
		return c.text("Internal Server Error", 500);
	}

	// Node.js版にはない
	// WebSocketの接続を開始
	server.accept();
	// レスポンスを返す
	return new Response(null, {
		status: 101,
		webSocket: client,
	});
};