Skip to content

Data Flow Architecture

Overview

This document describes how data flows through Solatis from ingestion to storage, processing, and retrieval.

System Components

┌──────────────┐
│   Clients    │  Web, Mobile, API
└──────┬───────┘

┌──────▼───────┐
│  API Layer   │  Supabase Edge Functions
│  + Auth      │  JWT Validation
└──────┬───────┘

    ┌──┴──────┬────────────┬─────────┐
    ▼         ▼            ▼         ▼
┌────────┐ ┌──────┐ ┌──────────┐ ┌─────────┐
│Database│ │ AI   │ │ Storage  │ │  Queue  │
│PostgeSQL│ │OpenAI│ │  AWS S3  │ │  Redis  │
└────────┘ └──────┘ └──────────┘ └─────────┘

Data Flows

Document Upload Flow

1. User uploads document

2. Client: Validate (size, type)

3. API: Generate signed S3 URL

4. Client: Upload directly to S3

5. S3: Trigger webhook

6. Edge Function: Process document
   - Extract text
   - Generate embeddings
   - Create summary

7. Store in PostgreSQL:
   - documents table
   - document_embeddings table

8. Real-time: Notify client

9. Client: Update UI

Detailed Steps:

Step 1-3: Pre-Upload

typescript
// Client requests signed URL
const { data } = await supabase.functions.invoke('get-upload-url', {
  body: {
    filename: 'document.pdf',
    contentType: 'application/pdf',
    size: 1024000
  }
});

// Returns:
{
  uploadUrl: 'https://s3.amazonaws.com/...',
  documentId: 'uuid',
  expiresIn: 3600
}

Step 4: Direct Upload

typescript
// Upload directly to S3 (no server proxy)
await fetch(uploadUrl, {
  method: 'PUT',
  body: file,
  headers: {
    'Content-Type': 'application/pdf'
  }
});

Step 5-6: Processing

typescript
// S3 webhook triggers Edge Function
export async function handleUpload(event: S3Event) {
  const { bucket, key } = event.Records[0].s3;
  
  // Download from S3
  const file = await s3.getObject({ Bucket: bucket, Key: key });
  
  // Extract text
  const text = await extractText(file.Body);
  
  // Generate embeddings
  const chunks = chunkText(text);
  const embeddings = await Promise.all(
    chunks.map(chunk => openai.embeddings.create({
      model: 'text-embedding-3-large',
      input: chunk
    }))
  );
  
  // Store in database
  await supabase.from('documents').update({
    content: text,
    processing_status: 'complete'
  }).eq('id', documentId);
  
  await supabase.from('document_embeddings').insert(
    embeddings.map((emb, i) => ({
      document_id: documentId,
      chunk_index: i,
      content: chunks[i],
      embedding: emb.data[0].embedding
    }))
  );
}

Meeting Recording Flow

1. Google Meet bot joins meeting

2. Records audio/video stream

3. Meeting ends → Upload to S3

4. Queue transcription job

5. Whisper API: Transcribe

6. Store transcript in PostgreSQL

7. Queue AI analysis job

8. GPT-4: Generate summary, extract action items

9. Store analysis results

10. Create tasks in Jira/Asana

11. Send notifications (Slack, Email)

Search Query Flow

1. User enters search query

2. Generate query embedding

3. Vector similarity search (pgvector)

4. Retrieve top K candidates (k=100)

5. Apply filters (workspace, date, type)

6. Optional: Re-rank results

7. Return top N results (n=20)

8. Client displays with highlights

Performance:

Average latency: 200-500ms
- Embedding generation: 50-100ms
- Vector search: 50-150ms
- Filtering & ranking: 50-100ms
- Network transfer: 50-150ms

Real-Time Collaboration

User A edits document

1. Client: Optimistic update

2. API: Validate & save

3. PostgreSQL: Write to documents table

4. Trigger: Notify Realtime channel

5. Realtime: Broadcast to subscribers

6. User B's Client: Receive update

7. User B's Client: Apply change

WebSocket Protocol:

typescript
// Subscribe to changes
const channel = supabase
  .channel(`document:${documentId}`)
  .on(
    'postgres_changes',
    {
      event: 'UPDATE',
      schema: 'public',
      table: 'documents',
      filter: `id=eq.${documentId}`
    },
    (payload) => {
      // Update UI with new content
      updateDocument(payload.new);
    }
  )
  .subscribe();

Processing Pipelines

Document Processing Pipeline

┌─────────────────────────────────────────┐
│           Document Upload               │
└────────────────┬────────────────────────┘

       ┌─────────▼──────────┐
       │  Validation Queue  │
       └─────────┬──────────┘

       ┌─────────▼──────────┐
       │  Extraction Queue  │ Extract text, OCR
       └─────────┬──────────┘

       ┌─────────▼──────────┐
       │  AI Processing     │ Embeddings, Summary
       └─────────┬──────────┘

       ┌─────────▼──────────┐
       │  Storage Queue     │ Save to DB
       └─────────┬──────────┘

       ┌─────────▼──────────┐
       │  Indexing Queue    │ Create indexes
       └─────────┬──────────┘

       ┌─────────▼──────────┐
       │     Complete       │
       └────────────────────┘

Meeting Processing Pipeline

Recording → Transcription → Speaker ID → 
Analysis → Summary → Action Items → Task Creation → 
Notifications → Complete

Queue Management:

typescript
// Redis-based job queue
import { Queue, Worker } from 'bullmq';

const transcriptionQueue = new Queue('transcription', {
  connection: redis
});

// Add job
await transcriptionQueue.add('transcribe', {
  meetingId: 'uuid',
  audioUrl: 's3://...',
  language: 'en'
}, {
  priority: 1,
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000
  }
});

// Process jobs
const worker = new Worker('transcription', async (job) => {
  const { meetingId, audioUrl } = job.data;
  
  // Download audio
  const audio = await downloadFromS3(audioUrl);
  
  // Transcribe
  const transcript = await whisper.transcribe(audio);
  
  // Save
  await supabase.from('meetings').update({
    transcript,
    status: 'transcribed'
  }).eq('id', meetingId);
  
  // Queue next step
  await analysisQueue.add('analyze', { meetingId });
}, {
  connection: redis,
  concurrency: 5
});

Data Storage

Hot Storage (PostgreSQL)

Active Data:

  • Recent documents (< 90 days)
  • Active meetings
  • Current conversations
  • User sessions

Characteristics:

  • Fast read/write
  • Indexed for search
  • Real-time updates
  • High availability

Warm Storage (S3)

Older Data:

  • Documents > 90 days
  • Archived meetings
  • Processed recordings
  • Backups

Characteristics:

  • Lower cost
  • Higher latency
  • Long-term retention
  • Lifecycle policies

Cache Layer (Redis)

Cached Data:

  • Session data
  • Frequently accessed docs
  • API rate limits
  • Real-time presence

TTL Examples:

Sessions: 1 hour
Documents: 15 minutes
Search results: 5 minutes
Rate limits: 1 minute

Data Consistency

ACID Transactions

sql
BEGIN;

-- Update document
UPDATE documents
SET content = 'new content',
    updated_at = NOW()
WHERE id = 'doc-uuid';

-- Log change
INSERT INTO audit_logs (
  entity_type,
  entity_id,
  action,
  user_id
) VALUES (
  'document',
  'doc-uuid',
  'update',
  'user-uuid'
);

COMMIT;

Eventual Consistency

Search Index:

Document updated →
Write to PostgreSQL (immediate) →
Reindex for search (async, ~1-5 sec delay) →
Search results updated

Monitoring & Observability

Metrics Tracked

System Metrics:

  • Request rate
  • Response time (p50, p95, p99)
  • Error rate
  • Database connections
  • Queue depth

Business Metrics:

  • Documents processed
  • Searches performed
  • API calls
  • Active users
  • Storage used

Logging

typescript
// Structured logging
logger.info('Document processed', {
  documentId,
  userId,
  processingTime: 1234,
  fileSize: 102400,
  chunks: 15,
  cost: 0.05
});

Tracing

typescript
// Distributed tracing with OpenTelemetry
const span = tracer.startSpan('process-document');
span.setAttribute('document.id', documentId);
span.setAttribute('document.size', fileSize);

try {
  await processDocument(documentId);
  span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
  span.recordException(error);
  span.setStatus({ code: SpanStatusCode.ERROR });
} finally {
  span.end();
}

Scalability Patterns

Horizontal Scaling

Stateless Services:

  • Edge Functions auto-scale
  • Multiple instances
  • Load balanced
  • No shared state

Vertical Scaling

Database:

  • Larger instance sizes
  • More CPU/memory
  • Read replicas
  • Connection pooling

Sharding (Future)

Partition Strategy:

Shard by organization_id
- Shard 0: org_id % 4 == 0
- Shard 1: org_id % 4 == 1
- Shard 2: org_id % 4 == 2
- Shard 3: org_id % 4 == 3

Next Steps


Last Updated: October 11, 2025

Released under the MIT License.