Skip to content

Commit

Permalink
Merge pull request #3 from gabrielmeloc22/message-concurrency-control
Browse files Browse the repository at this point in the history
add concurrency control
  • Loading branch information
gabrielmeloc22 authored Aug 14, 2024
2 parents 39edbe5 + ff5c100 commit d9c2e7c
Show file tree
Hide file tree
Showing 35 changed files with 1,699 additions and 179 deletions.
2 changes: 1 addition & 1 deletion apps/server/env.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
declare namespace NodeJS {
export interface ProcessEnv {
NODE_ENV: "development" | "production";
NODE_ENV: "development" | "production" | "test";
PORT?: string;
WS_PORT?: string;
DATABASE_URL: string;
Expand Down
12 changes: 10 additions & 2 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"build": "pnpm generate:schema && tsup src/index.ts",
"start": "node dist/index.js",
"dev": "pnpm generate:schema && cp ./schema/schema.graphql ../web/data && tsx watch --require dotenv/config src/index.ts",
"test": "vitest run",
"test:watch": "vitest",
"format": "pnpm biome format ./src --write",
"lint": "pnpm biome lint ./src",
"check": "pnpm biome check --apply ./src",
Expand All @@ -16,6 +18,7 @@
"dependencies": {
"@entria/graphql-mongo-helpers": "^1.1.2",
"@entria/graphql-mongoose-loader": "^4.4.0",
"@faker-js/faker": "^8.4.1",
"@koa/bodyparser": "^5.1.1",
"@koa/cors": "^5.0.0",
"@koa/router": "^12.0.1",
Expand All @@ -37,22 +40,27 @@
"koa": "^2.15.3",
"koa-logger": "^3.2.1",
"koa-mount": "^4.0.0",
"mongodb-memory-server": "^10.0.0",
"mongoose": "^8.4.3",
"regex-escape": "^3.4.10",
"supertest": "^7.0.0",
"tsup": "^8.1.0",
"ws": "^8.18.0"
},
"devDependencies": {
"@biomejs/biome": "^1.8.1",
"@types/ws": "^8.5.11",
"@types/bcrypt": "^5.0.2",
"@types/jsonwebtoken": "^9.0.6",
"@types/koa": "^2.15.0",
"@types/koa-logger": "^3.1.5",
"@types/koa__cors": "^5.0.0",
"@types/koa__router": "^12.0.4",
"@types/node": "^20.14.6",
"@types/supertest": "^6.0.2",
"@types/ws": "^8.5.11",
"tsx": "^4.15.6",
"typescript": "^5.4.5"
"typescript": "^5.4.5",
"vite-tsconfig-paths": "^5.0.1",
"vitest": "^2.0.5"
}
}
9 changes: 6 additions & 3 deletions apps/server/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type Message implements Node {

"""mongoose_id"""
_id: String!
localId: Int!
from: User!
delivered: Boolean!
deliveredAt: DateTime
Expand Down Expand Up @@ -336,16 +337,18 @@ input LoginMutationInput {
scalar NonEmptyString

type SendMessagePayload {
message: Message
chat: ChatEdge
message: MessageEdge
clientMutationId: String
}

input SendMessageInput {
content: NonEmptyString

"""The recipient id, a user or a chat"""
to: String!
toId: String!

"""A int that identifies the message of a user in chat locally"""
localId: Int!
clientMutationId: String
}

Expand Down
4 changes: 3 additions & 1 deletion apps/server/scripts/generateSchema.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { schema } from "@/schemas";
import { printSchema } from "graphql";
import fs from "node:fs/promises";
import path from "node:path";

process.env.SCHEMA_GENERATION = "true";

(async () => {
const filePath = path.resolve(__dirname, "..", "schema", "schema.graphql");
const { schema } = await import("@/schemas");

await fs.writeFile(filePath, printSchema(schema));
})();
8 changes: 5 additions & 3 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { useServer } from "graphql-ws/lib/use/ws";
import Koa from "koa";
import logger from "koa-logger";
import mount from "koa-mount";
import * as ws from "ws";
import { WebSocketServer } from "ws";
import { buildContext } from "./context";
import { connectDb } from "./database";
import { schema } from "./schemas";
Expand Down Expand Up @@ -64,9 +64,11 @@ app.use(async (ctx) => {
const port = process.env.PORT ?? "4000";

const server = app.listen(port, async () => {
await connectDb();
if (process.env.NODE_ENV !== "test") {
await connectDb();
}

const wsServer = new ws.Server({
const wsServer = new WebSocketServer({
server,
path: "/graphql",
});
Expand Down
8 changes: 8 additions & 0 deletions apps/server/src/modules/chat/fixture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { type Chat, ChatModel } from "./ChatModel";

export const createChat = async (data: Partial<Chat> | undefined = {}) => {
const chat = await ChatModel.create(data);
await chat.save();

return chat;
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { UserLoader } from "@/modules/user/UserLoader";
import { UserType } from "@/modules/user/UserType";
import { events, pubSub } from "@/pubsub";
import { GraphQLBoolean, GraphQLNonNull, GraphQLString } from "graphql";
import { fromGlobalId } from "graphql-relay";
import { subscriptionWithClientId } from "graphql-relay-subscription";
import { withFilter } from "graphql-subscriptions";

Expand Down Expand Up @@ -39,7 +40,7 @@ export const OnTypeSubscription = subscriptionWithClientId<
() => pubSub.asyncIterator(events.chat.typing),
(payload: TypingStatusPayload, input: TypingStatusInput) => {
if (input.chatId) {
return input.chatId === payload.chatId;
return fromGlobalId(input.chatId).id === payload.chatId;
}
return false;
},
Expand Down
5 changes: 1 addition & 4 deletions apps/server/src/modules/chat/util/getChat.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import type { Context } from "@/context";
import { ChatModel } from "../ChatModel";

export async function getChat(
ctx: Context | { userId: string },
args: { chatId: string },
) {
export async function getChat(ctx: Context, args: { chatId: string }) {
const currentUserId = "userId" in ctx ? ctx.userId : ctx.user?.id.toString();

const id = args.chatId;
Expand Down
3 changes: 3 additions & 0 deletions apps/server/src/modules/message/MessageModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface Message extends Document {
createdAt: Date;
updatedAt: Date;
chat: Types.ObjectId;
localId: number;
}

const MessageSchema = new Schema<Message>(
Expand All @@ -22,6 +23,7 @@ const MessageSchema = new Schema<Message>(
seenAt: Date,
content: { type: String, required: true },
chat: { type: Schema.Types.ObjectId, ref: "Chat", required: true },
localId: { type: Number, required: true },
},
{
timestamps: true,
Expand All @@ -30,5 +32,6 @@ const MessageSchema = new Schema<Message>(
);

MessageSchema.index({ createdAt: -1 });
MessageSchema.index({ from: 1, localId: 1, chat: 1 }, { unique: true });

export const MessageModel = model<Message>("Message", MessageSchema);
4 changes: 4 additions & 0 deletions apps/server/src/modules/message/MessageType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Context } from "@/context";
import { connectionDefinitions } from "@entria/graphql-mongo-helpers";
import {
GraphQLBoolean,
GraphQLInt,
GraphQLNonNull,
GraphQLObjectType,
GraphQLString,
Expand All @@ -25,6 +26,9 @@ export const MessageType = new GraphQLObjectType<Message, Context>({
description: "mongoose_id",
resolve: (msg) => msg.id,
},
localId: {
type: new GraphQLNonNull(GraphQLInt),
},
from: {
type: new GraphQLNonNull(UserType),
resolve: (msg, _args, ctx) => UserLoader.load(ctx, msg.from),
Expand Down
115 changes: 32 additions & 83 deletions apps/server/src/modules/message/mutations/SendMessageMutation.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
import type { Context } from "@/context";
import { ChatLoader } from "@/modules/chat/ChatLoader";
import { type Chat, ChatModel } from "@/modules/chat/ChatModel";
import { ChatConnection } from "@/modules/chat/ChatType";
import { getChat } from "@/modules/chat/util/getChat";
import { UserModel } from "@/modules/user/UserModel";
import { events, pubSub } from "@/pubsub";
import { GraphQLNonNull, GraphQLString } from "graphql";
import { GraphQLInt, GraphQLNonNull, GraphQLString } from "graphql";
import {
fromGlobalId,
mutationWithClientMutationId,
toGlobalId,
} from "graphql-relay";
import { GraphQLNonEmptyString } from "graphql-scalars";
import { startSession } from "mongoose";
import { MessageLoader } from "../MessageLoader";
import { MessageModel } from "../MessageModel";
import { MessageType } from "../MessageType";
import type { MessageSubscription } from "../subscription/OnMessage";
import type { Message } from "../MessageModel";
import { MessageConnection } from "../MessageType";
import { sendMessage } from "./lib/sendMessage";

type SendMessageInput = {
to: string;
export type SendMessageInput = {
toId: string;
content: string;
localId: number;
};

type SendMessageOutput = {
content: string;
createdAt: Date;
id: string;
chat: Chat | null;
};
type SendMessageOutput = { message: Message };

export const SendMessage = mutationWithClientMutationId<
SendMessageInput,
Expand All @@ -38,84 +27,44 @@ export const SendMessage = mutationWithClientMutationId<
name: "SendMessage",
inputFields: {
content: { type: GraphQLNonEmptyString },
to: {
toId: {
type: new GraphQLNonNull(GraphQLString),
description: "The recipient id, a user or a chat",
},
localId: {
type: new GraphQLNonNull(GraphQLInt),
description:
"A int that identifies the message of a user in chat locally",
},
},
outputFields: {
message: {
type: MessageType,
resolve: async (out, _, ctx) => MessageLoader.load(ctx, (await out).id),
},
chat: {
type: ChatConnection.edgeType,
resolve: async (res, _, ctx) => {
const { chat } = await res;
const node = await ChatLoader.load(ctx, chat?.id);

if (!node) {
return null;
}

return { cursor: toGlobalId("Chat", node.id), node };
type: MessageConnection.edgeType,
resolve: async (out, _, ctx) => {
const node = await MessageLoader.load(
ctx,
(await out).message._id as string,
);
if (!node) return null;

return {
cursor: toGlobalId("Message", node.id),
node,
};
},
},
},
mutateAndGetPayload: async ({ content, to }, ctx) => {
mutateAndGetPayload: async ({ toId, ...data }, ctx) => {
if (!ctx.user) {
throw new Error("Sender not specified");
}

const toId = fromGlobalId(to).id;
const selfMessage = toId === ctx.user.id.toString();

let chat = await getChat(ctx, { chatId: toId });

const recipient = await UserModel.findById(toId);

if (!chat && !recipient) {
throw new Error("Recipient not specified");
}

let newChat = false;

if (!chat) {
newChat = true;
chat = new ChatModel({
users: selfMessage ? [ctx.user.id] : [recipient?.id, ctx.user.id],
});
}

const message = new MessageModel({
content,
from: ctx.user,
const { message } = await sendMessage({
...data,
toId: fromGlobalId(toId).id,
ctx,
});

const session = await startSession();

try {
chat.lastMessage = message.id;
message.chat = chat.id;

await message.save({ session });
await chat.save({ session });
} finally {
await session.endSession();
}

await pubSub.publish(events.message.new, {
topic: events.message.new,
newMessageId: message.id,
chatId: chat.id,
newChat,
} satisfies MessageSubscription);

return {
id: toGlobalId("Message", message.id),
content: message.content,
createdAt: message.createdAt,
chat,
};
return { message };
},
});
Loading

0 comments on commit d9c2e7c

Please sign in to comment.