Extremely Popular
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778import type { Context } from '../../context.ts'import { StreamingApi } from '../../utils/stream.ts'
export interface SSEMessage { data: string event?: string id?: string retry?: number}
export class SSEStreamingApi extends StreamingApi { constructor(writable: WritableStream, readable: ReadableStream) { super(writable, readable) }
async writeSSE(message: SSEMessage) { const data = message.data .split('\n') .map((line) => { return `data: ${line}` }) .join('\n')
const sseData = [ message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`, message.retry && `retry: ${message.retry}`, ] .filter(Boolean) .join('\n') + '\n\n'
await this.write(sseData) }}
const run = async ( stream: SSEStreamingApi, cb: (stream: SSEStreamingApi) => Promise<void>, onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>) => { try { await cb(stream) } catch (e) { if (e instanceof Error && onError) { await onError(e, stream)
await stream.writeSSE({ event: 'error', data: e.message, }) } else { console.error(e) } } finally { stream.close() }}
export const streamSSE = ( c: Context, cb: (stream: SSEStreamingApi) => Promise<void>, onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>) => { const { readable, writable } = new TransformStream() const stream = new SSEStreamingApi(writable, readable)
c.header('Transfer-Encoding', 'chunked') c.header('Content-Type', 'text/event-stream') c.header('Cache-Control', 'no-cache') c.header('Connection', 'keep-alive')
run(stream, cb, onError)
return c.newResponse(stream.responseReadable)}