Latency-aware routing for edge and AI pipelines
The Problem
Step-by-Step Deployment Recommendations
3
Wire your backend client to Flashback
# fb_s3_client.py
import boto3
from botocore.client import Config
def s3_client_for(endpoint, key_id, key_secret):
session = boto3.session.Session(
aws_access_key_id=key_id, aws_secret_access_key=key_secret
)
return session.client("s3", endpoint_url=endpoint, config=Config(signature_version="s3v4"))// fbS3Client.ts
import { S3Client } from "@aws-sdk/client-s3";
const clients = new Map<string, S3Client>();
export function s3ClientFor(endpoint: string, keyId: string, secret: string) {
if (!clients.has(endpoint)) {
clients.set(endpoint, new S3Client({
endpoint, region: "us-east-1",
credentials: { accessKeyId: keyId, secretAccessKey: secret },
forcePathStyle: true
}));
}
return clients.get(endpoint)!;
}4
Discover nodes and pull live latency
// fbNodeStats.js
const H = { Accept: "application/json", Authorization: `Bearer ${process.env.FB_JWT}` };
export async function listNodes() {
const r = await fetch("https://backend.flashback.tech/node", { headers: H });
return r.json(); // includes node metadata for routing tables
}
export async function nodeMinuteStats(bucketId) {
const url = new URL("https://backend.flashback.tech/stats/nodes/minute");
if (bucketId) url.searchParams.set("bucketId", bucketId);
const r = await fetch(url, { headers: H });
return r.json(); // includes per-node minute latency/ops
}5
Select the fastest Online node (with TTL + circuit breaker)
// pickEndpoint.ts
import { listNodes, nodeMinuteStats } from "./fbNodeStats";
const CACHE_TTL_MS = 60_000;
let cache = { endpoint: "", expires: 0 };
export async function pickEndpointFor(bucketId?: string) {
const now = Date.now();
if (cache.endpoint && cache.expires > now) return cache.endpoint;
const nodes = await listNodes(); // seed candidates
const stats = await nodeMinuteStats(bucketId); // recent latency/availability
// Rank Online nodes by recent latency; fall back to others if needed
const online = rankByLatency(merge(nodes, stats)).filter(n => n.status === "Online");
const chosen = (online[0] ?? rankByLatency(merge(nodes, stats))[0]);
cache = { endpoint: chosen.endpoint, expires: now + CACHE_TTL_MS };
return cache.endpoint;
}
// If a request times out or 5xx, invalidate cache so next call re-picks:
export function reportFailure() { cache.expires = 0; }6
Use the chosen endpoint for reads/writes
// fbStorage.ts
import { PutObjectCommand, GetObjectCommand } from "@aws-sdk/client-s3";
import { s3ClientFor } from "./fbS3Client";
import { pickEndpointFor, reportFailure } from "./pickEndpoint";
const KEY_ID = process.env.FB_KEY_ID!, SECRET = process.env.FB_KEY_SECRET!;
export async function putObject(bucket: string, key: string, body: Buffer) {
const endpoint = await pickEndpointFor(bucket);
const s3 = s3ClientFor(endpoint, KEY_ID, SECRET);
try {
await s3.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body }));
} catch (e) { reportFailure(); throw e; }
}
export async function getObject(bucket: string, key: string) {
const endpoint = await pickEndpointFor(bucket);
const s3 = s3ClientFor(endpoint, KEY_ID, SECRET);
try {
return await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
} catch (e) { reportFailure(); throw e; }
}Last updated
Was this helpful?