Skip to content

Commit

Permalink
chore: add client code
Browse files Browse the repository at this point in the history
  • Loading branch information
drochetti committed Oct 30, 2023
1 parent f8b4e2e commit d8a1c83
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 3 deletions.
100 changes: 100 additions & 0 deletions lib/client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import './config.dart';
import './exception.dart';
import './http.dart';
import './queue.dart';

abstract class Client {
Queue get queue;

Future<Map<String, dynamic>> run(
String id, {
String method = 'post',
String path = '',
Map<String, dynamic>? input,
});

Future<Map<String, dynamic>> subscribe(
String id, {
Map<String, dynamic>? input,
int pollInterval = 3000,
bool logs = false,
});
}

class FalClient implements Client {
final Config config;

@override
final Queue queue;

FalClient({
required this.config,
}) : queue = QueueClient(config: config);

@override
Future<Map<String, dynamic>> run(
String id, {
String method = 'post',
String path = '',
Map<String, dynamic>? input,
}) async {
return await sendRequest(
id,
config: config,
method: method,
input: input,
);
}

@override
Future<Map<String, dynamic>> subscribe(String id,
{Map<String, dynamic>? input,
int pollInterval = 3000, // 3 seconds
int timeout = 300000, // 5 minutes
bool logs = false,
Function(String)? onEnqueue,
Function(QueueStatus)? onQueueUpdate}) async {
final enqueued = await queue.submit(id, input: input);
final requestId = enqueued.requestId;

if (onEnqueue != null) {
onEnqueue(requestId);
}

return _pollForResult(id,
requestId: requestId,
pollInterval: pollInterval,
onQueueUpdate: onQueueUpdate,
timeout: timeout);
}

Future<Map<String, dynamic>> _pollForResult(
String id, {
required String requestId,
required int pollInterval,
required int timeout,
Function(QueueStatus)? onQueueUpdate,
}) async {
final expiryTime = DateTime.now().add(Duration(milliseconds: timeout));

while (true) {
if (DateTime.now().isAfter(expiryTime)) {
throw FalApiException(
message: 'Request timed out after \$timeout milliseconds.',
status: 408);
}
final queueStatus = await queue.status(id, requestId: requestId);

if (onQueueUpdate != null) {
onQueueUpdate(queueStatus);
}

if (queueStatus is CompletedStatus) {
return await queue.result(id, requestId: requestId);
}
await Future.delayed(Duration(milliseconds: pollInterval));
}
}
}

// final fal = FalClient(config: DEFAULT_CONFIG);
11 changes: 11 additions & 0 deletions lib/config.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class Config {
String credentials;
String host;
String? proxyUrl;

Config({
this.credentials = '',
this.host = 'gateway.alpha.fal.ai',
this.proxyUrl,
});
}
56 changes: 56 additions & 0 deletions lib/exception.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
class FalApiException implements Exception {
final int status;
final String message;
final Map<String, dynamic>? body;

FalApiException({
required this.status,
required this.message,
this.body,
});

@override
String toString() {
return 'ApiException: $status - $message';
}
}

class ValidationErrorInfo {
final String msg;
final String type;

ValidationErrorInfo({
required this.msg,
required this.type,
});

factory ValidationErrorInfo.fromMap(Map<String, dynamic> json) {
return ValidationErrorInfo(
msg: json['msg'],
type: json['type'],
);
}
}

class ValidationException extends FalApiException {
final List<ValidationErrorInfo> errors;

ValidationException({
required int status,
required String message,
required this.errors,
}) : super(
status: status,
message: message,
);

factory ValidationException.fromMap(Map<String, dynamic> json) {
return ValidationException(
status: json['status'],
message: json['message'],
errors: (json['body'] as List<dynamic>)
.map((e) => ValidationErrorInfo.fromMap(e as Map<String, dynamic>))
.toList(),
);
}
}
75 changes: 75 additions & 0 deletions lib/http.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import 'dart:convert';

import 'package:http/http.dart' as http;

import './config.dart';
import './exception.dart';

String buildUrl(
String id, {
required Config config,
String method = 'post',
String path = '',
Map<String, dynamic>? input,
}) {
final pathValue = path.replaceAll(RegExp(r'^/|/{2,}'), '');
final params = method.toLowerCase() == 'get' && input != null
? Uri(queryParameters: input)
: null;
final queryParams =
params != null && params.query.isNotEmpty ? '?${params.query}' : '';

return 'https://$id.${config.host}/$pathValue$queryParams';
}

Future<Map<String, dynamic>> sendRequest(
String id, {
required Config config,
String method = 'post',
String path = '',
Map<String, dynamic>? input,
}) async {
final url = buildUrl(
id,
config: config,
method: method,
path: path,
input: input,
);
final headers = {
'Accept': 'application/json',
'Content-Type': 'application/json; charset=utf-8',
// 'User-Agent': getUserAgent(),
};
if (config.credentials.trim().isNotEmpty) {
headers['Authorization'] = 'Key ${config.credentials}';
}
if (config.proxyUrl != null) {
headers['x-fal-target-url'] = url;
}

final request = http.Request(
method.toUpperCase(),
Uri.parse(config.proxyUrl ?? url),
);
request.headers.addAll(headers);

if (input != null) {
request.body = jsonEncode(input);
}

final response = await request.send();
final body = await response.stream.bytesToString();

if (response.statusCode >= 200 && response.statusCode < 300) {
return jsonDecode(body);
}

if (response.statusCode == 422) {
final error = jsonDecode(body);
throw ValidationException.fromMap(error);
}

throw FalApiException(
status: response.statusCode, message: body, body: jsonDecode(body));
}
150 changes: 150 additions & 0 deletions lib/queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import './config.dart';
import './http.dart';

class EnqueueResult {
String requestId;

EnqueueResult(this.requestId);

factory EnqueueResult.fromMap(Map<String, dynamic> json) {
return EnqueueResult(json['request_id']);
}
}

class RequestLog {
String message;
String timestamp;

RequestLog({
required this.message,
required this.timestamp,
});

factory RequestLog.fromMap(Map<String, dynamic> json) {
return RequestLog(
message: json['message'],
timestamp: json['timestamp'],
);
}
}

abstract class QueueStatus {
String status;
String responseUrl;

QueueStatus(this.status, this.responseUrl);

factory QueueStatus.fromMap(Map<String, dynamic> json) {
switch (json['status']) {
case 'IN_PROGRESS':
return InProgressStatus.fromMap(json);
case 'COMPLETED':
return CompletedStatus.fromMap(json);
case 'IN_QUEUE':
return InQueueStatus.fromMap(json);
default:
throw Exception('Unknown status: ${json['status']}');
}
}
}

class InProgressStatus extends QueueStatus {
List<RequestLog> logs;

InProgressStatus({
required String responseUrl,
required this.logs,
}) : super('IN_PROGRESS', responseUrl);

factory InProgressStatus.fromMap(Map<String, dynamic> json) {
return InProgressStatus(
responseUrl: json['response_url'],
logs: (json['logs'] as List<dynamic>)
.map((e) => RequestLog.fromMap(e as Map<String, dynamic>))
.toList(),
);
}
}

class CompletedStatus extends QueueStatus {
List<RequestLog> logs;

CompletedStatus({
required String responseUrl,
required this.logs,
}) : super('COMPLETED', responseUrl);

factory CompletedStatus.fromMap(Map<String, dynamic> json) {
return CompletedStatus(
responseUrl: json['response_url'],
logs: (json['logs'] as List<dynamic>)
.map((e) => RequestLog.fromMap(e as Map<String, dynamic>))
.toList(),
);
}
}

class InQueueStatus extends QueueStatus {
int queuePosition;

InQueueStatus({
required String responseUrl,
required this.queuePosition,
}) : super('IN_QUEUE', responseUrl);

factory InQueueStatus.fromMap(Map<String, dynamic> json) {
return InQueueStatus(
responseUrl: json['response_url'],
queuePosition: json['queue_position'],
);
}
}

abstract class Queue {
Future<EnqueueResult> submit(
String id, {
String path = '',
Map<String, dynamic>? input,
});

Future<QueueStatus> status(
String id, {
required String requestId,
bool logs,
});

Future<Map<String, dynamic>> result(String id, {required String requestId});
}

class QueueClient implements Queue {
final Config config;

QueueClient({required this.config});

@override
Future<EnqueueResult> submit(String id,
{String path = '', Map<String, dynamic>? input}) async {
final result = await sendRequest(id,
config: config, path: '$path/fal/queue/submit', input: input);
return EnqueueResult.fromMap(result);
}

@override
Future<QueueStatus> status(String id,
{required String requestId, bool logs = false}) async {
final result = await sendRequest(id,
config: config,
method: 'get',
path: '/fal/queue/requests/$requestId/status');
return QueueStatus.fromMap(result);
}

@override
Future<Map<String, dynamic>> result(String id,
{required String requestId}) async {
return sendRequest(id,
config: config,
method: 'get',
path: '/fal/queue/requests/$requestId/response');
}
}
Loading

0 comments on commit d8a1c83

Please sign in to comment.