517 lines
16 KiB
JavaScript
517 lines
16 KiB
JavaScript
// server.js
|
|
const fs = require("fs");
|
|
const path = require("path");
|
|
function loadConfig() {
|
|
const configPath = "/etc/kafkachat/config.json";
|
|
let fileConfig = {};
|
|
|
|
try {
|
|
if (fs.existsSync(configPath)) {
|
|
fileConfig = JSON.parse(fs.readFileSync(configPath, "utf8"));
|
|
console.log("Loaded config from", configPath);
|
|
}
|
|
} catch (err) {
|
|
console.warn("Failed to load config file:", err.message);
|
|
}
|
|
|
|
return {
|
|
kafka: {
|
|
broker: fileConfig.kafka?.broker || process.env.KAFKA_BROKER || "kafka-service.kafka.svc.cluster.local:9092",
|
|
},
|
|
couchdb: {
|
|
url: fileConfig.couchdb?.url || process.env.COUCHDB_URL || "http://admin:Couchdb01@couchdb.couchdb.svc.cluster.local:5984",
|
|
},
|
|
mail: {
|
|
host: fileConfig.mail?.host || process.env.MAIL_HOST || "smtp.eu.mailgun.org",
|
|
port: fileConfig.mail?.port || parseInt(process.env.MAIL_PORT || "587"),
|
|
user: fileConfig.mail?.user || process.env.MAIL_USER || "postmaster@allarddcs.nl",
|
|
pass: fileConfig.mail?.pass || process.env.MAIL_PASS || "",
|
|
},
|
|
app: {
|
|
baseUrl: fileConfig.app?.baseUrl || process.env.APP_BASE_URL || "https://kafkachat-dev.allarddcs.nl",
|
|
inviteExpiry: fileConfig.app?.inviteExpiry || 604800,
|
|
},
|
|
};
|
|
}
|
|
const config = loadConfig();
|
|
|
|
const express = require("express");
|
|
const { Kafka } = require("kafkajs");
|
|
|
|
const nano = require("nano")(config.couchdb.url);
|
|
|
|
const nodemailer = require("nodemailer");
|
|
const mailer = nodemailer.createTransport({
|
|
host: config.mail.host,
|
|
port: config.mail.port,
|
|
secure: false,
|
|
auth: {
|
|
user: config.mail.user,
|
|
pass: config.mail.pass,
|
|
},
|
|
});
|
|
|
|
const WebSocket = require("ws");
|
|
const cors = require("cors");
|
|
const bcrypt = require("bcrypt");
|
|
const usersDb = nano.db.use("users");
|
|
const roomsDb = nano.db.use("rooms");
|
|
const invitesDb = nano.db.use("invites");
|
|
|
|
async function initDatabases() {
|
|
for (const dbName of ["users", "rooms", "invites"]) {
|
|
try {
|
|
await nano.db.create(dbName);
|
|
console.log(`Created ${dbName} database`);
|
|
} catch (err) {
|
|
if (err.statusCode === 412) {
|
|
console.log(`${dbName} database already exists`);
|
|
} else {
|
|
throw err;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const app = express();
|
|
app.use(cors());
|
|
app.use(express.json({ limit: '10mb' }));
|
|
app.use(express.urlencoded({ limit: '10mb', extended: true }));
|
|
|
|
const kafka = new Kafka({
|
|
clientId: "chat-backend",
|
|
brokers: [config.kafka.broker],
|
|
});
|
|
|
|
const producer = kafka.producer();
|
|
const consumer = kafka.consumer({ groupId: "chat-broadcast" });
|
|
const clients = new Set();
|
|
|
|
async function start() {
|
|
await producer.connect();
|
|
await consumer.connect();
|
|
|
|
await consumer.subscribe({ topic: /^(?!__).*$/, fromBeginning: true });
|
|
|
|
await consumer.run({
|
|
eachMessage: async ({ message, topic }) => {
|
|
let msg;
|
|
try {
|
|
msg = JSON.parse(message.value.toString());
|
|
} catch (e) {
|
|
console.warn(`Skipping non-JSON message on topic ${topic}`);
|
|
return;
|
|
}
|
|
msg.roomId = topic;
|
|
for (const ws of clients) {
|
|
if (ws.readyState === WebSocket.OPEN && ws.roomId === topic) {
|
|
ws.send(JSON.stringify(msg));
|
|
}
|
|
}
|
|
},
|
|
});
|
|
|
|
const server = app.listen(3000, () => console.log("REST API listening on port 3000"));
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
wss.on("connection", (ws) => {
|
|
clients.add(ws);
|
|
console.log("Client connected, total clients:", clients.size);
|
|
|
|
ws.on("message", async (raw) => {
|
|
const msg = JSON.parse(raw);
|
|
if (msg.type === "join") { ws.roomId = msg.roomId; return; }
|
|
await producer.send({
|
|
topic: msg.roomId,
|
|
messages: [{ value: JSON.stringify(msg) }],
|
|
});
|
|
});
|
|
|
|
ws.on("close", () => {
|
|
clients.delete(ws);
|
|
console.log("Client disconnected, total clients:", clients.size);
|
|
});
|
|
});
|
|
|
|
console.log("WebSocket server running on ws://0.0.0.0:3000");
|
|
}
|
|
|
|
async function startWithRetry(retries = 10, delay = 5000) {
|
|
for (let i = 0; i < retries; i++) {
|
|
try {
|
|
await initDatabases();
|
|
await start();
|
|
return;
|
|
} catch (err) {
|
|
console.error(`Start failed (attempt ${i + 1}/${retries}), retrying in ${delay}ms...`, err.message);
|
|
await new Promise(r => setTimeout(r, delay));
|
|
}
|
|
}
|
|
console.error("Failed to start after maximum retries");
|
|
process.exit(1);
|
|
}
|
|
|
|
startWithRetry().catch(console.error);
|
|
|
|
|
|
// --- Messages ---
|
|
app.get("/messages/:roomId", async (req, res) => {
|
|
const { roomId } = req.params;
|
|
const tempConsumer = kafka.consumer({
|
|
groupId: `history-${roomId}-${Date.now()}`,
|
|
});
|
|
try {
|
|
await tempConsumer.connect();
|
|
await tempConsumer.subscribe({ topic: roomId, fromBeginning: true });
|
|
const messages = [];
|
|
let resolvePromise;
|
|
let timer;
|
|
await new Promise((resolve) => {
|
|
resolvePromise = resolve;
|
|
timer = setTimeout(resolve, 2000);
|
|
tempConsumer.run({
|
|
eachMessage: async ({ message }) => {
|
|
messages.push(JSON.parse(message.value.toString()));
|
|
clearTimeout(timer);
|
|
timer = setTimeout(resolvePromise, 300);
|
|
},
|
|
});
|
|
});
|
|
res.json(messages);
|
|
} catch (err) {
|
|
console.error("History fetch error:", err);
|
|
res.status(500).json({ error: "Failed to fetch history" });
|
|
} finally {
|
|
await tempConsumer.disconnect();
|
|
}
|
|
});
|
|
|
|
// --- Rooms ---
|
|
app.get("/rooms", async (req, res) => {
|
|
const { username } = req.query;
|
|
try {
|
|
const result = await roomsDb.list({ include_docs: true });
|
|
const rooms = result.rows
|
|
.map(r => r.doc)
|
|
.filter(r => !r._id.startsWith("_"));
|
|
const visible = rooms.filter(r =>
|
|
r.type === "public" ||
|
|
(r.type === "private" && r.members?.includes(username))
|
|
);
|
|
res.json(visible);
|
|
} catch (err) {
|
|
console.error("Failed to list rooms:", err);
|
|
res.status(500).json({ error: "Failed to list rooms" });
|
|
}
|
|
});
|
|
|
|
app.put("/rooms/:roomId", async (req, res) => {
|
|
const { username, description, avatar, type } = req.body;
|
|
try {
|
|
const room = await roomsDb.get(req.params.roomId);
|
|
if (room.createdBy !== username) {
|
|
return res.status(403).json({ error: "Only the owner can edit this room" });
|
|
}
|
|
const updated = { ...room, description, avatar, type };
|
|
await roomsDb.insert(updated);
|
|
res.json(updated);
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to update room" });
|
|
}
|
|
});
|
|
|
|
app.get("/rooms/subscribed/:username", async (req, res) => {
|
|
try {
|
|
const user = await usersDb.get(req.params.username);
|
|
const subscribedRooms = user.subscribedRooms || [];
|
|
const rooms = await Promise.all(
|
|
subscribedRooms.map(async (id) => {
|
|
try { return await roomsDb.get(id); }
|
|
catch { return null; }
|
|
})
|
|
);
|
|
res.json(rooms.filter(Boolean));
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to fetch subscribed rooms" });
|
|
}
|
|
});
|
|
|
|
|
|
app.post("/rooms/:roomId/invite", async (req, res) => {
|
|
const { invitedUsername, invitedBy } = req.body;
|
|
try {
|
|
let invitedUser;
|
|
try {
|
|
invitedUser = await usersDb.get(invitedUsername);
|
|
} catch {
|
|
return res.status(404).json({ error: "User not found" });
|
|
}
|
|
|
|
const room = await roomsDb.get(req.params.roomId);
|
|
if (room.createdBy !== invitedBy) {
|
|
return res.status(403).json({ error: "Only the owner can invite users" });
|
|
}
|
|
|
|
// Add to members
|
|
const members = room.members || [];
|
|
if (!members.includes(invitedUsername)) {
|
|
await roomsDb.insert({ ...room, members: [...members, invitedUsername] });
|
|
}
|
|
|
|
// Generate token
|
|
const token = require("crypto").randomBytes(32).toString("hex");
|
|
await invitesDb.insert({
|
|
_id: token,
|
|
roomId: req.params.roomId,
|
|
invitedUsername,
|
|
invitedBy,
|
|
createdAt: Date.now(),
|
|
expiresAt: Date.now() + 7 * 24 * 60 * 60 * 1000, // 7 days
|
|
});
|
|
|
|
// Send email
|
|
if (invitedUser.email) {
|
|
const inviter = await usersDb.get(invitedBy);
|
|
const inviterName = inviter.displayName || invitedBy;
|
|
const inviteUrl = `${config.app.baseUrl}?invite=${token}`;
|
|
console.log(`Sending invite email to ${invitedUser.email}`);
|
|
const info = await mailer.sendMail({
|
|
from: "KafkaChat <postmaster@allarddcs.nl>",
|
|
to: invitedUser.email,
|
|
subject: `${inviterName} invited you to join "${room.name}"`,
|
|
html: `
|
|
<div style="font-family: sans-serif; max-width: 480px; margin: 0 auto;">
|
|
<h2 style="color: #14b8a6;">You've been invited to KafkaChat</h2>
|
|
<p><strong>${inviterName}</strong> has invited you to join the private room <strong>${room.name}</strong>.</p>
|
|
${room.description ? `<p style="color: #6b7280;">${room.description}</p>` : ""}
|
|
<a href="${inviteUrl}" style="display: inline-block; margin-top: 16px; padding: 10px 20px; background: #14b8a6; color: white; border-radius: 8px; text-decoration: none;">
|
|
Join ${room.name}
|
|
</a>
|
|
<p style="margin-top: 24px; color: #9ca3af; font-size: 12px;">Log in as <strong>${invitedUsername}</strong>. This link expires in 7 days.</p>
|
|
</div>
|
|
`,
|
|
});
|
|
console.log("Email sent:", info.messageId, "response:", info.response);
|
|
}
|
|
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
console.error("Invite error:", err);
|
|
res.status(500).json({ error: "Failed to send invitation" });
|
|
}
|
|
});
|
|
|
|
// Get invite info
|
|
app.get("/invite/:token", async (req, res) => {
|
|
try {
|
|
const invite = await invitesDb.get(req.params.token);
|
|
if (invite.expiresAt < Date.now()) {
|
|
return res.status(410).json({ error: "Invite link has expired" });
|
|
}
|
|
const room = await roomsDb.get(invite.roomId);
|
|
res.json({ room, invitedUsername: invite.invitedUsername, invitedBy: invite.invitedBy });
|
|
} catch (err) {
|
|
res.status(404).json({ error: "Invalid invite link" });
|
|
}
|
|
});
|
|
|
|
// Accept invite
|
|
app.post("/invite/:token/accept", async (req, res) => {
|
|
const { username } = req.body;
|
|
try {
|
|
const invite = await invitesDb.get(req.params.token);
|
|
if (invite.invitedUsername !== username) {
|
|
return res.status(403).json({ error: "This invite is for a different user" });
|
|
}
|
|
if (invite.expiresAt < Date.now()) {
|
|
return res.status(410).json({ error: "Invite link has expired" });
|
|
}
|
|
|
|
// Subscribe user to room
|
|
const user = await usersDb.get(username);
|
|
const subscribedRooms = user.subscribedRooms || [];
|
|
if (!subscribedRooms.includes(invite.roomId)) {
|
|
await usersDb.insert({ ...user, subscribedRooms: [...subscribedRooms, invite.roomId] });
|
|
}
|
|
|
|
// Delete token
|
|
await invitesDb.destroy(invite._id, invite._rev);
|
|
|
|
res.json({ success: true, roomId: invite.roomId });
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to accept invite" });
|
|
}
|
|
});
|
|
|
|
app.post("/rooms", async (req, res) => {
|
|
const { name, type = "public", description = "", avatar = null, createdBy } = req.body;
|
|
if (!name || /\s/.test(name)) {
|
|
return res.status(400).json({ error: "Invalid room name — no spaces allowed" });
|
|
}
|
|
try {
|
|
const admin = kafka.admin();
|
|
await admin.connect();
|
|
await admin.createTopics({
|
|
topics: [{ topic: name, numPartitions: 1, replicationFactor: 1 }],
|
|
});
|
|
await admin.disconnect();
|
|
|
|
await roomsDb.insert({
|
|
_id: name,
|
|
name,
|
|
type,
|
|
description,
|
|
avatar,
|
|
createdBy,
|
|
createdAt: Date.now(),
|
|
members: [createdBy],
|
|
});
|
|
|
|
const user = await usersDb.get(createdBy);
|
|
const subscribedRooms = user.subscribedRooms || [];
|
|
if (!subscribedRooms.includes(name)) {
|
|
await usersDb.insert({
|
|
...user,
|
|
subscribedRooms: [...subscribedRooms, name],
|
|
});
|
|
}
|
|
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
console.error("Failed to create room:", err);
|
|
res.status(500).json({ error: err.message || "Failed to create room" });
|
|
}
|
|
});
|
|
|
|
app.delete("/rooms/:roomId", async (req, res) => {
|
|
const { username } = req.body;
|
|
try {
|
|
const room = await roomsDb.get(req.params.roomId);
|
|
if (room.createdBy !== username) {
|
|
return res.status(403).json({ error: "Only the owner can delete this room" });
|
|
}
|
|
|
|
// Delete from CouchDB
|
|
await roomsDb.destroy(room._id, room._rev);
|
|
|
|
// Remove from all users' subscribedRooms
|
|
const usersResult = await usersDb.list({ include_docs: true });
|
|
for (const row of usersResult.rows) {
|
|
const user = row.doc;
|
|
if (user.subscribedRooms?.includes(req.params.roomId)) {
|
|
await usersDb.insert({
|
|
...user,
|
|
subscribedRooms: user.subscribedRooms.filter((r) => r !== req.params.roomId),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Delete Kafka topic
|
|
try {
|
|
const admin = kafka.admin();
|
|
await admin.connect();
|
|
await admin.deleteTopics({ topics: [req.params.roomId] });
|
|
await admin.disconnect();
|
|
} catch (err) {
|
|
console.warn("Failed to delete Kafka topic:", err.message);
|
|
}
|
|
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to delete room" });
|
|
}
|
|
});
|
|
|
|
app.post("/rooms/:roomId/subscribe", async (req, res) => {
|
|
const { username } = req.body;
|
|
try {
|
|
const user = await usersDb.get(username);
|
|
const subscribedRooms = user.subscribedRooms || [];
|
|
if (!subscribedRooms.includes(req.params.roomId)) {
|
|
await usersDb.insert({
|
|
...user,
|
|
subscribedRooms: [...subscribedRooms, req.params.roomId],
|
|
});
|
|
}
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to subscribe" });
|
|
}
|
|
});
|
|
|
|
app.post("/rooms/:roomId/unsubscribe", async (req, res) => {
|
|
const { username } = req.body;
|
|
try {
|
|
const user = await usersDb.get(username);
|
|
const subscribedRooms = (user.subscribedRooms || []).filter(r => r !== req.params.roomId);
|
|
await usersDb.insert({ ...user, subscribedRooms });
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Failed to unsubscribe" });
|
|
}
|
|
});
|
|
|
|
// --- Auth ---
|
|
app.post("/auth/login", async (req, res) => {
|
|
const { username, password } = req.body;
|
|
try {
|
|
const user = await usersDb.get(username);
|
|
const valid = await bcrypt.compare(password, user.passwordHash);
|
|
if (!valid) return res.status(401).json({ error: "Invalid credentials" });
|
|
const { passwordHash, ...profile } = user;
|
|
res.json(profile);
|
|
} catch (err) {
|
|
res.status(401).json({ error: "Invalid credentials" });
|
|
}
|
|
});
|
|
|
|
app.post("/auth/register", async (req, res) => {
|
|
const { username, password, displayName } = req.body;
|
|
try {
|
|
const passwordHash = await bcrypt.hash(password, 10);
|
|
await usersDb.insert({
|
|
_id: username,
|
|
username,
|
|
passwordHash,
|
|
displayName: displayName || username,
|
|
avatar: null,
|
|
bio: "",
|
|
status: "online",
|
|
subscribedRooms: [],
|
|
createdAt: Date.now(),
|
|
});
|
|
res.json({ success: true });
|
|
} catch (err) {
|
|
if (err.statusCode === 409) return res.status(409).json({ error: "Username already exists" });
|
|
res.status(500).json({ error: "Registration failed" });
|
|
}
|
|
});
|
|
|
|
app.get("/auth/profile/:username", async (req, res) => {
|
|
try {
|
|
const user = await usersDb.get(req.params.username);
|
|
const { passwordHash, ...profile } = user;
|
|
res.json(profile);
|
|
} catch (err) {
|
|
res.status(404).json({ error: "User not found" });
|
|
}
|
|
});
|
|
|
|
app.put("/auth/profile/:username", async (req, res) => {
|
|
try {
|
|
const user = await usersDb.get(req.params.username);
|
|
const updated = { ...user, ...req.body, passwordHash: user.passwordHash };
|
|
await usersDb.insert(updated);
|
|
const { passwordHash, ...profile } = updated;
|
|
res.json(profile);
|
|
} catch (err) {
|
|
res.status(500).json({ error: "Update failed" });
|
|
}
|
|
});
|
|
|
|
// --- Static ---
|
|
app.use(express.static(path.join(__dirname, '../frontend')));
|
|
app.get('/{*path}', (req, res) => {
|
|
res.sendFile(path.join(__dirname, '../frontend', 'index.html'));
|
|
});
|