// AI · LLMs
Building AI-Powered Features: OpenAI, Anthropic, and Gemini Integration
AI is transforming how we build applications, but integrating AI APIs properly requires careful consideration of streaming, error handling, cost management, and user experience. In this guide, we’ll build a production-ready AI integration layer that works with multiple providers.
Why Multi-Provider Support Matters
Relying on a single AI provider creates vendor lock-in and limits your flexibility. Different providers excel at different tasks:
- OpenAI (GPT-4): Best for general reasoning, code generation, and chat
- Anthropic (Claude): Superior for long-context tasks, analysis, and safety
- Google (Gemini): Excellent for multimodal tasks and cost efficiency
By supporting multiple providers, you can choose the best tool for each job and have fallback options.
Architecture Overview
We’ll build a unified API interface that abstracts provider-specific details:
// Unified interface for all AI providers
interface AIProvider {
name: string
chat(messages: Message[], options?: ChatOptions): Promise<AIResponse>
stream(messages: Message[], options?: ChatOptions): AsyncIterable<AIChunk>
embeddings(texts: string[]): Promise<number[][]>
}
interface Message {
role: 'system' | 'user' | 'assistant'
content: string
}
interface ChatOptions {
model?: string
temperature?: number
maxTokens?: number
stopSequences?: string[]
}
interface AIResponse {
content: string
usage: {
promptTokens: number
completionTokens: number
totalTokens: number
}
model: string
finishReason: 'stop' | 'length' | 'content_filter'
}
Implementation: OpenAI Provider
Let’s implement the OpenAI provider with proper error handling:
import OpenAI from 'openai'
import { z } from 'zod'
export class OpenAIProvider implements AIProvider {
name = 'openai'
private client: OpenAI
constructor(apiKey: string) {
this.client = new OpenAI({ apiKey })
}
async chat(
messages: Message[],
options: ChatOptions = {}
): Promise<AIResponse> {
try {
const response = await this.client.chat.completions.create({
model: options.model || 'gpt-4-turbo-preview',
messages: messages.map(m => ({
role: m.role,
content: m.content,
})),
temperature: options.temperature ?? 0.7,
max_tokens: options.maxTokens,
stop: options.stopSequences,
})
const choice = response.choices[0]
return {
content: choice.message.content || '',
usage: {
promptTokens: response.usage?.prompt_tokens || 0,
completionTokens: response.usage?.completion_tokens || 0,
totalTokens: response.usage?.total_tokens || 0,
},
model: response.model,
finishReason: this.mapFinishReason(choice.finish_reason),
}
} catch (error) {
throw this.handleError(error)
}
}
async *stream(
messages: Message[],
options: ChatOptions = {}
): AsyncIterable<AIChunk> {
try {
const stream = await this.client.chat.completions.create({
model: options.model || 'gpt-4-turbo-preview',
messages: messages.map(m => ({
role: m.role,
content: m.content,
})),
temperature: options.temperature ?? 0.7,
max_tokens: options.maxTokens,
stop: options.stopSequences,
stream: true,
})
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
yield {
content,
done: chunk.choices[0].finish_reason !== null,
}
}
}
} catch (error) {
throw this.handleError(error)
}
}
private handleError(error: unknown): Error {
if (error instanceof OpenAI.APIError) {
// Rate limit
if (error.status === 429) {
return new Error('Rate limit exceeded. Please try again later.')
}
// Invalid API key
if (error.status === 401) {
return new Error('Invalid API key')
}
// Context too long
if (error.code === 'context_length_exceeded') {
return new Error('Message too long. Please reduce the input length.')
}
return new Error(`OpenAI API error: ${error.message}`)
}
return new Error('Unknown error occurred')
}
private mapFinishReason(
reason: string | null
): 'stop' | 'length' | 'content_filter' {
switch (reason) {
case 'stop':
return 'stop'
case 'length':
return 'length'
case 'content_filter':
return 'content_filter'
default:
return 'stop'
}
}
}
Implementation: Anthropic Provider
Anthropic’s API is different but we can wrap it with the same interface:
import Anthropic from '@anthropic-ai/sdk'
export class AnthropicProvider implements AIProvider {
name = 'anthropic'
private client: Anthropic
constructor(apiKey: string) {
this.client = new Anthropic({ apiKey })
}
async chat(
messages: Message[],
options: ChatOptions = {}
): Promise<AIResponse> {
try {
// Extract system message if present
const systemMessage = messages.find(m => m.role === 'system')
const chatMessages = messages.filter(m => m.role !== 'system')
const response = await this.client.messages.create({
model: options.model || 'claude-3-5-sonnet-20241022',
max_tokens: options.maxTokens || 4096,
temperature: options.temperature ?? 0.7,
system: systemMessage?.content,
messages: chatMessages.map(m => ({
role: m.role === 'user' ? 'user' : 'assistant',
content: m.content,
})),
stop_sequences: options.stopSequences,
})
return {
content: response.content[0].type === 'text'
? response.content[0].text
: '',
usage: {
promptTokens: response.usage.input_tokens,
completionTokens: response.usage.output_tokens,
totalTokens: response.usage.input_tokens + response.usage.output_tokens,
},
model: response.model,
finishReason: this.mapStopReason(response.stop_reason),
}
} catch (error) {
throw this.handleError(error)
}
}
async *stream(
messages: Message[],
options: ChatOptions = {}
): AsyncIterable<AIChunk> {
try {
const systemMessage = messages.find(m => m.role === 'system')
const chatMessages = messages.filter(m => m.role !== 'system')
const stream = await this.client.messages.stream({
model: options.model || 'claude-3-5-sonnet-20241022',
max_tokens: options.maxTokens || 4096,
temperature: options.temperature ?? 0.7,
system: systemMessage?.content,
messages: chatMessages.map(m => ({
role: m.role === 'user' ? 'user' : 'assistant',
content: m.content,
})),
stop_sequences: options.stopSequences,
})
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
yield {
content: event.delta.text,
done: false,
}
}
if (event.type === 'message_stop') {
yield {
content: '',
done: true,
}
}
}
} catch (error) {
throw this.handleError(error)
}
}
private handleError(error: unknown): Error {
if (error instanceof Anthropic.APIError) {
if (error.status === 429) {
return new Error('Rate limit exceeded')
}
if (error.status === 401) {
return new Error('Invalid API key')
}
return new Error(`Anthropic API error: ${error.message}`)
}
return new Error('Unknown error occurred')
}
private mapStopReason(
reason: string | null
): 'stop' | 'length' | 'content_filter' {
switch (reason) {
case 'end_turn':
return 'stop'
case 'max_tokens':
return 'length'
case 'stop_sequence':
return 'stop'
default:
return 'stop'
}
}
}
Unified AI Service
Now create a service that manages multiple providers:
export class AIService {
private providers: Map<string, AIProvider> = new Map()
private defaultProvider: string
constructor(config: {
openai?: { apiKey: string }
anthropic?: { apiKey: string }
defaultProvider: 'openai' | 'anthropic'
}) {
if (config.openai) {
this.providers.set('openai', new OpenAIProvider(config.openai.apiKey))
}
if (config.anthropic) {
this.providers.set('anthropic', new AnthropicProvider(config.anthropic.apiKey))
}
this.defaultProvider = config.defaultProvider
}
async chat(
messages: Message[],
options?: ChatOptions & { provider?: string }
): Promise<AIResponse> {
const providerName = options?.provider || this.defaultProvider
const provider = this.providers.get(providerName)
if (!provider) {
throw new Error(`Provider ${providerName} not configured`)
}
return provider.chat(messages, options)
}
async *stream(
messages: Message[],
options?: ChatOptions & { provider?: string }
): AsyncIterable<AIChunk> {
const providerName = options?.provider || this.defaultProvider
const provider = this.providers.get(providerName)
if (!provider) {
throw new Error(`Provider ${providerName} not configured`)
}
yield* provider.stream(messages, options)
}
}
Next.js API Route with Streaming
Integrate with Next.js for streaming AI responses:
// app/api/ai/chat/route.ts
import { NextRequest } from 'next/server'
import { aiService } from '@/lib/ai-service'
export async function POST(req: NextRequest) {
const { messages, provider, model } = await req.json()
// Create a ReadableStream for streaming response
const stream = new ReadableStream({
async start(controller) {
try {
for await (const chunk of aiService.stream(messages, {
provider,
model,
})) {
// Send Server-Sent Events
const data = `data: ${JSON.stringify(chunk)}\n\n`
controller.enqueue(new TextEncoder().encode(data))
}
} catch (error) {
const errorData = `data: ${JSON.stringify({ error: error.message })}\n\n`
controller.enqueue(new TextEncoder().encode(errorData))
} finally {
controller.close()
}
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
})
}
Client-Side Hook
Create a React hook for easy consumption:
export function useAIChat() {
const [messages, setMessages] = useState<Message[]>([])
const [isStreaming, setIsStreaming] = useState(false)
const [error, setError] = useState<string | null>(null)
const sendMessage = async (
content: string,
options?: { provider?: string; model?: string }
) => {
const userMessage: Message = { role: 'user', content }
setMessages(prev => [...prev, userMessage])
setIsStreaming(true)
setError(null)
try {
const response = await fetch('/api/ai/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage],
...options,
}),
})
if (!response.ok) {
throw new Error('Failed to get AI response')
}
const reader = response.body?.getReader()
const decoder = new TextDecoder()
let assistantContent = ''
// Create assistant message placeholder
setMessages(prev => [
...prev,
{ role: 'assistant', content: '' },
])
while (true) {
const { done, value } = await reader!.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6))
if (data.error) {
throw new Error(data.error)
}
if (data.content) {
assistantContent += data.content
// Update last message
setMessages(prev => [
...prev.slice(0, -1),
{ role: 'assistant', content: assistantContent },
])
}
}
}
}
} catch (err) {
setError(err.message)
// Remove assistant message placeholder on error
setMessages(prev => prev.slice(0, -1))
} finally {
setIsStreaming(false)
}
}
return {
messages,
isStreaming,
error,
sendMessage,
}
}
Cost Optimization Strategies
1. Caching Responses
import { Redis } from '@upstash/redis'
const redis = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
})
async function getCachedResponse(
key: string,
ttl: number = 3600
): Promise<string | null> {
return redis.get(key)
}
async function cacheResponse(
key: string,
value: string,
ttl: number = 3600
): Promise<void> {
await redis.setex(key, ttl, value)
}
2. Token Counting
import { encode } from 'gpt-tokenizer'
function estimateTokens(text: string): number {
return encode(text).length
}
function estimateCost(
promptTokens: number,
completionTokens: number,
model: string
): number {
const pricing = {
'gpt-4-turbo-preview': {
prompt: 0.01 / 1000,
completion: 0.03 / 1000,
},
'claude-3-5-sonnet-20241022': {
prompt: 0.003 / 1000,
completion: 0.015 / 1000,
},
}
const rates = pricing[model]
return (
promptTokens * rates.prompt +
completionTokens * rates.completion
)
}
3. Request Batching
class BatchProcessor {
private queue: Array<{
messages: Message[]
resolve: (response: AIResponse) => void
reject: (error: Error) => void
}> = []
private batchSize = 5
private batchTimeout = 1000
async process(messages: Message[]): Promise<AIResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ messages, resolve, reject })
if (this.queue.length >= this.batchSize) {
this.flush()
} else {
setTimeout(() => this.flush(), this.batchTimeout)
}
})
}
private async flush() {
const batch = this.queue.splice(0, this.batchSize)
// Process batch in parallel
const results = await Promise.allSettled(
batch.map(item => aiService.chat(item.messages))
)
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
batch[index].resolve(result.value)
} else {
batch[index].reject(result.reason)
}
})
}
}
Error Handling Best Practices
Retry with Exponential Backoff
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3,
baseDelay: number = 1000
): Promise<T> {
let lastError: Error
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (error) {
lastError = error
// Don't retry on certain errors
if (
error.message.includes('Invalid API key') ||
error.message.includes('content_filter')
) {
throw error
}
// Exponential backoff: 1s, 2s, 4s, 8s...
const delay = baseDelay * Math.pow(2, i)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
throw lastError!
}
Production Checklist
✅ Rate Limiting: Implement per-user rate limits ✅ Monitoring: Track usage, costs, and errors ✅ Logging: Log all AI interactions for debugging ✅ Content Filtering: Check for harmful content ✅ Cost Alerts: Set up alerts for unusual spending ✅ Fallback Provider: Automatically switch on failures ✅ User Feedback: Allow users to report issues ✅ Privacy: Don’t log sensitive user data
Conclusion
Building production-ready AI integrations requires more than just calling an API. By implementing proper error handling, streaming, cost optimization, and multi-provider support, you can build robust AI-powered features that scale.
What’s Next?
Want to learn more advanced techniques like function calling, embeddings, and RAG? Upgrade to Premium to access our comprehensive AI integration course with:
- Advanced RAG implementation
- Vector database integration
- Function calling patterns
- Production deployment guides
- Cost optimization deep-dives
- Real-world case studies