Data Flow Architecture
Overview
This document describes how data flows through Solatis from ingestion to storage, processing, and retrieval.
System Components
┌──────────────┐
│ Clients │ Web, Mobile, API
└──────┬───────┘
│
┌──────▼───────┐
│ API Layer │ Supabase Edge Functions
│ + Auth │ JWT Validation
└──────┬───────┘
│
┌──┴──────┬────────────┬─────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────┐ ┌──────────┐ ┌─────────┐
│Database│ │ AI │ │ Storage │ │ Queue │
│PostgeSQL│ │OpenAI│ │ AWS S3 │ │ Redis │
└────────┘ └──────┘ └──────────┘ └─────────┘Data Flows
Document Upload Flow
1. User uploads document
↓
2. Client: Validate (size, type)
↓
3. API: Generate signed S3 URL
↓
4. Client: Upload directly to S3
↓
5. S3: Trigger webhook
↓
6. Edge Function: Process document
- Extract text
- Generate embeddings
- Create summary
↓
7. Store in PostgreSQL:
- documents table
- document_embeddings table
↓
8. Real-time: Notify client
↓
9. Client: Update UIDetailed Steps:
Step 1-3: Pre-Upload
typescript
// Client requests signed URL
const { data } = await supabase.functions.invoke('get-upload-url', {
body: {
filename: 'document.pdf',
contentType: 'application/pdf',
size: 1024000
}
});
// Returns:
{
uploadUrl: 'https://s3.amazonaws.com/...',
documentId: 'uuid',
expiresIn: 3600
}Step 4: Direct Upload
typescript
// Upload directly to S3 (no server proxy)
await fetch(uploadUrl, {
method: 'PUT',
body: file,
headers: {
'Content-Type': 'application/pdf'
}
});Step 5-6: Processing
typescript
// S3 webhook triggers Edge Function
export async function handleUpload(event: S3Event) {
const { bucket, key } = event.Records[0].s3;
// Download from S3
const file = await s3.getObject({ Bucket: bucket, Key: key });
// Extract text
const text = await extractText(file.Body);
// Generate embeddings
const chunks = chunkText(text);
const embeddings = await Promise.all(
chunks.map(chunk => openai.embeddings.create({
model: 'text-embedding-3-large',
input: chunk
}))
);
// Store in database
await supabase.from('documents').update({
content: text,
processing_status: 'complete'
}).eq('id', documentId);
await supabase.from('document_embeddings').insert(
embeddings.map((emb, i) => ({
document_id: documentId,
chunk_index: i,
content: chunks[i],
embedding: emb.data[0].embedding
}))
);
}Meeting Recording Flow
1. Google Meet bot joins meeting
↓
2. Records audio/video stream
↓
3. Meeting ends → Upload to S3
↓
4. Queue transcription job
↓
5. Whisper API: Transcribe
↓
6. Store transcript in PostgreSQL
↓
7. Queue AI analysis job
↓
8. GPT-4: Generate summary, extract action items
↓
9. Store analysis results
↓
10. Create tasks in Jira/Asana
↓
11. Send notifications (Slack, Email)Search Query Flow
1. User enters search query
↓
2. Generate query embedding
↓
3. Vector similarity search (pgvector)
↓
4. Retrieve top K candidates (k=100)
↓
5. Apply filters (workspace, date, type)
↓
6. Optional: Re-rank results
↓
7. Return top N results (n=20)
↓
8. Client displays with highlightsPerformance:
Average latency: 200-500ms
- Embedding generation: 50-100ms
- Vector search: 50-150ms
- Filtering & ranking: 50-100ms
- Network transfer: 50-150msReal-Time Collaboration
User A edits document
↓
1. Client: Optimistic update
↓
2. API: Validate & save
↓
3. PostgreSQL: Write to documents table
↓
4. Trigger: Notify Realtime channel
↓
5. Realtime: Broadcast to subscribers
↓
6. User B's Client: Receive update
↓
7. User B's Client: Apply changeWebSocket Protocol:
typescript
// Subscribe to changes
const channel = supabase
.channel(`document:${documentId}`)
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'documents',
filter: `id=eq.${documentId}`
},
(payload) => {
// Update UI with new content
updateDocument(payload.new);
}
)
.subscribe();Processing Pipelines
Document Processing Pipeline
┌─────────────────────────────────────────┐
│ Document Upload │
└────────────────┬────────────────────────┘
│
┌─────────▼──────────┐
│ Validation Queue │
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ Extraction Queue │ Extract text, OCR
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ AI Processing │ Embeddings, Summary
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ Storage Queue │ Save to DB
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ Indexing Queue │ Create indexes
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ Complete │
└────────────────────┘Meeting Processing Pipeline
Recording → Transcription → Speaker ID →
Analysis → Summary → Action Items → Task Creation →
Notifications → CompleteQueue Management:
typescript
// Redis-based job queue
import { Queue, Worker } from 'bullmq';
const transcriptionQueue = new Queue('transcription', {
connection: redis
});
// Add job
await transcriptionQueue.add('transcribe', {
meetingId: 'uuid',
audioUrl: 's3://...',
language: 'en'
}, {
priority: 1,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
}
});
// Process jobs
const worker = new Worker('transcription', async (job) => {
const { meetingId, audioUrl } = job.data;
// Download audio
const audio = await downloadFromS3(audioUrl);
// Transcribe
const transcript = await whisper.transcribe(audio);
// Save
await supabase.from('meetings').update({
transcript,
status: 'transcribed'
}).eq('id', meetingId);
// Queue next step
await analysisQueue.add('analyze', { meetingId });
}, {
connection: redis,
concurrency: 5
});Data Storage
Hot Storage (PostgreSQL)
Active Data:
- Recent documents (< 90 days)
- Active meetings
- Current conversations
- User sessions
Characteristics:
- Fast read/write
- Indexed for search
- Real-time updates
- High availability
Warm Storage (S3)
Older Data:
- Documents > 90 days
- Archived meetings
- Processed recordings
- Backups
Characteristics:
- Lower cost
- Higher latency
- Long-term retention
- Lifecycle policies
Cache Layer (Redis)
Cached Data:
- Session data
- Frequently accessed docs
- API rate limits
- Real-time presence
TTL Examples:
Sessions: 1 hour
Documents: 15 minutes
Search results: 5 minutes
Rate limits: 1 minuteData Consistency
ACID Transactions
sql
BEGIN;
-- Update document
UPDATE documents
SET content = 'new content',
updated_at = NOW()
WHERE id = 'doc-uuid';
-- Log change
INSERT INTO audit_logs (
entity_type,
entity_id,
action,
user_id
) VALUES (
'document',
'doc-uuid',
'update',
'user-uuid'
);
COMMIT;Eventual Consistency
Search Index:
Document updated →
Write to PostgreSQL (immediate) →
Reindex for search (async, ~1-5 sec delay) →
Search results updatedMonitoring & Observability
Metrics Tracked
System Metrics:
- Request rate
- Response time (p50, p95, p99)
- Error rate
- Database connections
- Queue depth
Business Metrics:
- Documents processed
- Searches performed
- API calls
- Active users
- Storage used
Logging
typescript
// Structured logging
logger.info('Document processed', {
documentId,
userId,
processingTime: 1234,
fileSize: 102400,
chunks: 15,
cost: 0.05
});Tracing
typescript
// Distributed tracing with OpenTelemetry
const span = tracer.startSpan('process-document');
span.setAttribute('document.id', documentId);
span.setAttribute('document.size', fileSize);
try {
await processDocument(documentId);
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
} finally {
span.end();
}Scalability Patterns
Horizontal Scaling
Stateless Services:
- Edge Functions auto-scale
- Multiple instances
- Load balanced
- No shared state
Vertical Scaling
Database:
- Larger instance sizes
- More CPU/memory
- Read replicas
- Connection pooling
Sharding (Future)
Partition Strategy:
Shard by organization_id
- Shard 0: org_id % 4 == 0
- Shard 1: org_id % 4 == 1
- Shard 2: org_id % 4 == 2
- Shard 3: org_id % 4 == 3Next Steps
- Supabase Architecture - Database design
- API Authentication - API usage
- MCP Server - AI server architecture
Last Updated: October 11, 2025