back to learning

// AI · LLMs

Building AI-Powered Features: OpenAI, Anthropic, and Gemini Integration


AI is transforming how we build applications, but integrating AI APIs properly requires careful consideration of streaming, error handling, cost management, and user experience. In this guide, we’ll build a production-ready AI integration layer that works with multiple providers.

Why Multi-Provider Support Matters

Relying on a single AI provider creates vendor lock-in and limits your flexibility. Different providers excel at different tasks:

By supporting multiple providers, you can choose the best tool for each job and have fallback options.

Architecture Overview

We’ll build a unified API interface that abstracts provider-specific details:

// Unified interface for all AI providers
interface AIProvider {
  name: string
  chat(messages: Message[], options?: ChatOptions): Promise<AIResponse>
  stream(messages: Message[], options?: ChatOptions): AsyncIterable<AIChunk>
  embeddings(texts: string[]): Promise<number[][]>
}

interface Message {
  role: 'system' | 'user' | 'assistant'
  content: string
}

interface ChatOptions {
  model?: string
  temperature?: number
  maxTokens?: number
  stopSequences?: string[]
}

interface AIResponse {
  content: string
  usage: {
    promptTokens: number
    completionTokens: number
    totalTokens: number
  }
  model: string
  finishReason: 'stop' | 'length' | 'content_filter'
}

Implementation: OpenAI Provider

Let’s implement the OpenAI provider with proper error handling:

import OpenAI from 'openai'
import { z } from 'zod'

export class OpenAIProvider implements AIProvider {
  name = 'openai'
  private client: OpenAI

  constructor(apiKey: string) {
    this.client = new OpenAI({ apiKey })
  }

  async chat(
    messages: Message[],
    options: ChatOptions = {}
  ): Promise<AIResponse> {
    try {
      const response = await this.client.chat.completions.create({
        model: options.model || 'gpt-4-turbo-preview',
        messages: messages.map(m => ({
          role: m.role,
          content: m.content,
        })),
        temperature: options.temperature ?? 0.7,
        max_tokens: options.maxTokens,
        stop: options.stopSequences,
      })

      const choice = response.choices[0]

      return {
        content: choice.message.content || '',
        usage: {
          promptTokens: response.usage?.prompt_tokens || 0,
          completionTokens: response.usage?.completion_tokens || 0,
          totalTokens: response.usage?.total_tokens || 0,
        },
        model: response.model,
        finishReason: this.mapFinishReason(choice.finish_reason),
      }
    } catch (error) {
      throw this.handleError(error)
    }
  }

  async *stream(
    messages: Message[],
    options: ChatOptions = {}
  ): AsyncIterable<AIChunk> {
    try {
      const stream = await this.client.chat.completions.create({
        model: options.model || 'gpt-4-turbo-preview',
        messages: messages.map(m => ({
          role: m.role,
          content: m.content,
        })),
        temperature: options.temperature ?? 0.7,
        max_tokens: options.maxTokens,
        stop: options.stopSequences,
        stream: true,
      })

      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content
        if (content) {
          yield {
            content,
            done: chunk.choices[0].finish_reason !== null,
          }
        }
      }
    } catch (error) {
      throw this.handleError(error)
    }
  }

  private handleError(error: unknown): Error {
    if (error instanceof OpenAI.APIError) {
      // Rate limit
      if (error.status === 429) {
        return new Error('Rate limit exceeded. Please try again later.')
      }

      // Invalid API key
      if (error.status === 401) {
        return new Error('Invalid API key')
      }

      // Context too long
      if (error.code === 'context_length_exceeded') {
        return new Error('Message too long. Please reduce the input length.')
      }

      return new Error(`OpenAI API error: ${error.message}`)
    }

    return new Error('Unknown error occurred')
  }

  private mapFinishReason(
    reason: string | null
  ): 'stop' | 'length' | 'content_filter' {
    switch (reason) {
      case 'stop':
        return 'stop'
      case 'length':
        return 'length'
      case 'content_filter':
        return 'content_filter'
      default:
        return 'stop'
    }
  }
}

Implementation: Anthropic Provider

Anthropic’s API is different but we can wrap it with the same interface:

import Anthropic from '@anthropic-ai/sdk'

export class AnthropicProvider implements AIProvider {
  name = 'anthropic'
  private client: Anthropic

  constructor(apiKey: string) {
    this.client = new Anthropic({ apiKey })
  }

  async chat(
    messages: Message[],
    options: ChatOptions = {}
  ): Promise<AIResponse> {
    try {
      // Extract system message if present
      const systemMessage = messages.find(m => m.role === 'system')
      const chatMessages = messages.filter(m => m.role !== 'system')

      const response = await this.client.messages.create({
        model: options.model || 'claude-3-5-sonnet-20241022',
        max_tokens: options.maxTokens || 4096,
        temperature: options.temperature ?? 0.7,
        system: systemMessage?.content,
        messages: chatMessages.map(m => ({
          role: m.role === 'user' ? 'user' : 'assistant',
          content: m.content,
        })),
        stop_sequences: options.stopSequences,
      })

      return {
        content: response.content[0].type === 'text'
          ? response.content[0].text
          : '',
        usage: {
          promptTokens: response.usage.input_tokens,
          completionTokens: response.usage.output_tokens,
          totalTokens: response.usage.input_tokens + response.usage.output_tokens,
        },
        model: response.model,
        finishReason: this.mapStopReason(response.stop_reason),
      }
    } catch (error) {
      throw this.handleError(error)
    }
  }

  async *stream(
    messages: Message[],
    options: ChatOptions = {}
  ): AsyncIterable<AIChunk> {
    try {
      const systemMessage = messages.find(m => m.role === 'system')
      const chatMessages = messages.filter(m => m.role !== 'system')

      const stream = await this.client.messages.stream({
        model: options.model || 'claude-3-5-sonnet-20241022',
        max_tokens: options.maxTokens || 4096,
        temperature: options.temperature ?? 0.7,
        system: systemMessage?.content,
        messages: chatMessages.map(m => ({
          role: m.role === 'user' ? 'user' : 'assistant',
          content: m.content,
        })),
        stop_sequences: options.stopSequences,
      })

      for await (const event of stream) {
        if (event.type === 'content_block_delta' &&
            event.delta.type === 'text_delta') {
          yield {
            content: event.delta.text,
            done: false,
          }
        }

        if (event.type === 'message_stop') {
          yield {
            content: '',
            done: true,
          }
        }
      }
    } catch (error) {
      throw this.handleError(error)
    }
  }

  private handleError(error: unknown): Error {
    if (error instanceof Anthropic.APIError) {
      if (error.status === 429) {
        return new Error('Rate limit exceeded')
      }
      if (error.status === 401) {
        return new Error('Invalid API key')
      }
      return new Error(`Anthropic API error: ${error.message}`)
    }
    return new Error('Unknown error occurred')
  }

  private mapStopReason(
    reason: string | null
  ): 'stop' | 'length' | 'content_filter' {
    switch (reason) {
      case 'end_turn':
        return 'stop'
      case 'max_tokens':
        return 'length'
      case 'stop_sequence':
        return 'stop'
      default:
        return 'stop'
    }
  }
}

Unified AI Service

Now create a service that manages multiple providers:

export class AIService {
  private providers: Map<string, AIProvider> = new Map()
  private defaultProvider: string

  constructor(config: {
    openai?: { apiKey: string }
    anthropic?: { apiKey: string }
    defaultProvider: 'openai' | 'anthropic'
  }) {
    if (config.openai) {
      this.providers.set('openai', new OpenAIProvider(config.openai.apiKey))
    }

    if (config.anthropic) {
      this.providers.set('anthropic', new AnthropicProvider(config.anthropic.apiKey))
    }

    this.defaultProvider = config.defaultProvider
  }

  async chat(
    messages: Message[],
    options?: ChatOptions & { provider?: string }
  ): Promise<AIResponse> {
    const providerName = options?.provider || this.defaultProvider
    const provider = this.providers.get(providerName)

    if (!provider) {
      throw new Error(`Provider ${providerName} not configured`)
    }

    return provider.chat(messages, options)
  }

  async *stream(
    messages: Message[],
    options?: ChatOptions & { provider?: string }
  ): AsyncIterable<AIChunk> {
    const providerName = options?.provider || this.defaultProvider
    const provider = this.providers.get(providerName)

    if (!provider) {
      throw new Error(`Provider ${providerName} not configured`)
    }

    yield* provider.stream(messages, options)
  }
}

Next.js API Route with Streaming

Integrate with Next.js for streaming AI responses:

// app/api/ai/chat/route.ts
import { NextRequest } from 'next/server'
import { aiService } from '@/lib/ai-service'

export async function POST(req: NextRequest) {
  const { messages, provider, model } = await req.json()

  // Create a ReadableStream for streaming response
  const stream = new ReadableStream({
    async start(controller) {
      try {
        for await (const chunk of aiService.stream(messages, {
          provider,
          model,
        })) {
          // Send Server-Sent Events
          const data = `data: ${JSON.stringify(chunk)}\n\n`
          controller.enqueue(new TextEncoder().encode(data))
        }
      } catch (error) {
        const errorData = `data: ${JSON.stringify({ error: error.message })}\n\n`
        controller.enqueue(new TextEncoder().encode(errorData))
      } finally {
        controller.close()
      }
    },
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  })
}

Client-Side Hook

Create a React hook for easy consumption:

export function useAIChat() {
  const [messages, setMessages] = useState<Message[]>([])
  const [isStreaming, setIsStreaming] = useState(false)
  const [error, setError] = useState<string | null>(null)

  const sendMessage = async (
    content: string,
    options?: { provider?: string; model?: string }
  ) => {
    const userMessage: Message = { role: 'user', content }
    setMessages(prev => [...prev, userMessage])
    setIsStreaming(true)
    setError(null)

    try {
      const response = await fetch('/api/ai/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: [...messages, userMessage],
          ...options,
        }),
      })

      if (!response.ok) {
        throw new Error('Failed to get AI response')
      }

      const reader = response.body?.getReader()
      const decoder = new TextDecoder()
      let assistantContent = ''

      // Create assistant message placeholder
      setMessages(prev => [
        ...prev,
        { role: 'assistant', content: '' },
      ])

      while (true) {
        const { done, value } = await reader!.read()
        if (done) break

        const chunk = decoder.decode(value)
        const lines = chunk.split('\n\n')

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6))

            if (data.error) {
              throw new Error(data.error)
            }

            if (data.content) {
              assistantContent += data.content
              // Update last message
              setMessages(prev => [
                ...prev.slice(0, -1),
                { role: 'assistant', content: assistantContent },
              ])
            }
          }
        }
      }
    } catch (err) {
      setError(err.message)
      // Remove assistant message placeholder on error
      setMessages(prev => prev.slice(0, -1))
    } finally {
      setIsStreaming(false)
    }
  }

  return {
    messages,
    isStreaming,
    error,
    sendMessage,
  }
}

Cost Optimization Strategies

1. Caching Responses

import { Redis } from '@upstash/redis'

const redis = new Redis({
  url: process.env.UPSTASH_REDIS_REST_URL!,
  token: process.env.UPSTASH_REDIS_REST_TOKEN!,
})

async function getCachedResponse(
  key: string,
  ttl: number = 3600
): Promise<string | null> {
  return redis.get(key)
}

async function cacheResponse(
  key: string,
  value: string,
  ttl: number = 3600
): Promise<void> {
  await redis.setex(key, ttl, value)
}

2. Token Counting

import { encode } from 'gpt-tokenizer'

function estimateTokens(text: string): number {
  return encode(text).length
}

function estimateCost(
  promptTokens: number,
  completionTokens: number,
  model: string
): number {
  const pricing = {
    'gpt-4-turbo-preview': {
      prompt: 0.01 / 1000,
      completion: 0.03 / 1000,
    },
    'claude-3-5-sonnet-20241022': {
      prompt: 0.003 / 1000,
      completion: 0.015 / 1000,
    },
  }

  const rates = pricing[model]
  return (
    promptTokens * rates.prompt +
    completionTokens * rates.completion
  )
}

3. Request Batching

class BatchProcessor {
  private queue: Array<{
    messages: Message[]
    resolve: (response: AIResponse) => void
    reject: (error: Error) => void
  }> = []

  private batchSize = 5
  private batchTimeout = 1000

  async process(messages: Message[]): Promise<AIResponse> {
    return new Promise((resolve, reject) => {
      this.queue.push({ messages, resolve, reject })

      if (this.queue.length >= this.batchSize) {
        this.flush()
      } else {
        setTimeout(() => this.flush(), this.batchTimeout)
      }
    })
  }

  private async flush() {
    const batch = this.queue.splice(0, this.batchSize)

    // Process batch in parallel
    const results = await Promise.allSettled(
      batch.map(item => aiService.chat(item.messages))
    )

    results.forEach((result, index) => {
      if (result.status === 'fulfilled') {
        batch[index].resolve(result.value)
      } else {
        batch[index].reject(result.reason)
      }
    })
  }
}

Error Handling Best Practices

Retry with Exponential Backoff

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries: number = 3,
  baseDelay: number = 1000
): Promise<T> {
  let lastError: Error

  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn()
    } catch (error) {
      lastError = error

      // Don't retry on certain errors
      if (
        error.message.includes('Invalid API key') ||
        error.message.includes('content_filter')
      ) {
        throw error
      }

      // Exponential backoff: 1s, 2s, 4s, 8s...
      const delay = baseDelay * Math.pow(2, i)
      await new Promise(resolve => setTimeout(resolve, delay))
    }
  }

  throw lastError!
}

Production Checklist

Rate Limiting: Implement per-user rate limits ✅ Monitoring: Track usage, costs, and errors ✅ Logging: Log all AI interactions for debugging ✅ Content Filtering: Check for harmful content ✅ Cost Alerts: Set up alerts for unusual spending ✅ Fallback Provider: Automatically switch on failures ✅ User Feedback: Allow users to report issues ✅ Privacy: Don’t log sensitive user data

Conclusion

Building production-ready AI integrations requires more than just calling an API. By implementing proper error handling, streaming, cost optimization, and multi-provider support, you can build robust AI-powered features that scale.

What’s Next?

Want to learn more advanced techniques like function calling, embeddings, and RAG? Upgrade to Premium to access our comprehensive AI integration course with:

Upgrade to Premium