Skip to content
This repository has been archived by the owner on Dec 10, 2024. It is now read-only.

Commit

Permalink
refactor(mqtt): mqtt topic & add provider
Browse files Browse the repository at this point in the history
  • Loading branch information
saeidex committed Nov 5, 2024
1 parent 81e652b commit 83ee7a9
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 30 deletions.
1 change: 1 addition & 0 deletions apps/next/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@ubus/configs": "workspace:*",
"@ubus/db": "workspace:*",
"@ubus/hooks": "workspace:*",
"@ubus/mqtt": "workspace:^",
"@ubus/ui": "workspace:*",
"@ubus/validators": "workspace:*",
"embla-carousel-autoplay": "^8.3.0",
Expand Down
40 changes: 40 additions & 0 deletions apps/next/src/app/dashboard/buses/[bussId]/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"use client";

import { MqttProvider, useMqttSubscription } from "@ubus/mqtt";

interface BusLocationPageProps {
params: {
bussId: string;
};
}

const BusLocationPage = (props: BusLocationPageProps) => {
return (
<MqttProvider>
<BusLocationContent {...props} />
</MqttProvider>
);
};

const BusLocationContent = (props: BusLocationPageProps) => {
const { data: location, error } = useMqttSubscription(props.params.bussId);

// if (isLoading) return <p>Loading...</p>;
if (error)
return (
<p className="grid h-dvh place-content-center place-items-center gap-4 text-destructive">
An error has occurred: {error.message}
</p>
);

return (
<div className="grid h-dvh place-content-center place-items-center gap-4">
<h1>Bus Location</h1>
<p>
{location ? `Current Location: ${location}` : "Waiting for location..."}
</p>
</div>
);
};

export default BusLocationPage;
2 changes: 0 additions & 2 deletions apps/next/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export const env = createEnv({
*/
client: {
// NEXT_PUBLIC_CLIENTVAR: z.string(),
NEXT_PUBLIC_MQTT_CLUSTER_URL: z.string(),
},
/**
* Destructure all variables from `process.env` to make sure they aren't tree-shaken away.
Expand All @@ -34,7 +33,6 @@ export const env = createEnv({
NODE_ENV: process.env.NODE_ENV,

// NEXT_PUBLIC_CLIENTVAR: process.env.NEXT_PUBLIC_CLIENTVAR,
NEXT_PUBLIC_MQTT_CLUSTER_URL: process.env.NEXT_PUBLIC_MQTT_CLUSTER_URL,
},
skipValidation:
!!process.env.CI || process.env.npm_lifecycle_event === "lint",
Expand Down
3 changes: 2 additions & 1 deletion packages/mqtt/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
},
"dependencies": {
"@tanstack/react-query": "^5.59.16",
"mqtt": "^5.10.1"
"mqtt": "^5.10.1",
"react": "^18.3.1"
}
}
18 changes: 15 additions & 3 deletions packages/mqtt/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import type { MqttClient } from "mqtt";
import type { IClientOptions, MqttClient } from "mqtt";
import mqtt from "mqtt";

import { MQTT_HOST, MqttOptions } from "./mqtt.config";
import { env } from "./env";

export const mqttClient: MqttClient = mqtt.connect(MQTT_HOST, MqttOptions);
export const MqttOptions: IClientOptions = {
clientId: `mqtt_${Math.random().toString(16).slice(3)}`,
username: env.NEXT_PUBLIC_MQTT_USERNAME,
password: env.NEXT_PUBLIC_MQTT_PASSWORD,
clean: true,
connectTimeout: 4000,
reconnectPeriod: 1000,
};

export const mqttClient: MqttClient = mqtt.connect(
env.NEXT_PUBLIC_MQTT_HOST,
MqttOptions,
);
22 changes: 22 additions & 0 deletions packages/mqtt/src/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { createEnv } from "@t3-oss/env-nextjs";
import { z } from "zod";

export const env = createEnv({
server: {
NODE_ENV: z.enum(["development", "production"]).optional(),
},
client: {
NEXT_PUBLIC_MQTT_HOST: z.string().min(1),
NEXT_PUBLIC_MQTT_USERNAME: z.string().min(1),
NEXT_PUBLIC_MQTT_PASSWORD: z.string().min(1),
NEXT_PUBLIC_MQTT_TOPIC: z.string().min(1),
},
experimental__runtimeEnv: {
NEXT_PUBLIC_MQTT_HOST: process.env.NEXT_PUBLIC_MQTT_HOST,
NEXT_PUBLIC_MQTT_USERNAME: process.env.NEXT_PUBLIC_MQTT_USERNAME,
NEXT_PUBLIC_MQTT_PASSWORD: process.env.NEXT_PUBLIC_MQTT_PASSWORD,
NEXT_PUBLIC_MQTT_TOPIC: process.env.NEXT_PUBLIC_MQTT_TOPIC,
},
skipValidation:
!!process.env.CI || process.env.npm_lifecycle_event === "lint",
});
1 change: 1 addition & 0 deletions packages/mqtt/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./client";
export * from "./provider";
export * from "./service";
export * from "./topics";
14 changes: 0 additions & 14 deletions packages/mqtt/src/mqtt.config.ts

This file was deleted.

16 changes: 16 additions & 0 deletions packages/mqtt/src/provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import React from "react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";

interface MqttProviderProps {
children: React.ReactNode;
}

const queryClient = new QueryClient();

export const MqttProvider = (props: MqttProviderProps) => {
return React.createElement(
QueryClientProvider,
{ client: queryClient },
props.children,
);
};
19 changes: 11 additions & 8 deletions packages/mqtt/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ import { getBusLocationTopic } from "./topics";
export const useMqttSubscription = (busId: string) => {
const topic = getBusLocationTopic(busId);

return useQuery<string, Error>({
return useQuery({
queryKey: [topic],
queryFn: async () => {
return new Promise<string>((resolve) => {
client.on("message", (receivedTopic, message) => {
queryFn: () => {
return new Promise<string>((resolve, reject) => {
client.on("message", (receivedTopic, payload) => {
if (receivedTopic === topic) {
resolve(message.toString());
resolve(payload.toString());
}
});

client.subscribe(topic, { qos: 1 }, (error) => {
if (error) {
console.error("Subscribe error: ", error);
} else {
console.log(`Subscribed to topic: ${topic}`);
reject(
new Error(
`Subscribe error: ${error.message || "Mqtt Subscription Error"}`,
),
);
}
});
});
},
refetchInterval: 1000 * 3,
staleTime: 1000 * 60,
});
};
5 changes: 4 additions & 1 deletion packages/mqtt/src/topics.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export const getBusLocationTopic = (id: string) => `buses/${id}/location`;
import { env } from "./env";

export const getBusLocationTopic = (busId: string) =>
`${env.NEXT_PUBLIC_MQTT_TOPIC}/${busId}`;
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion turbo.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
"AUTH_GOOGLE_SECRET",
"AUTH_REDIRECT_PROXY_URL",
"AUTH_SECRET",
"PORT"
"PORT",
"NEXT_PUBLIC_MQTT_HOST",
"NEXT_PUBLIC_MQTT_USERNAME",
"NEXT_PUBLIC_MQTT_PASSWORD",
"NEXT_PUBLIC_MQTT_TOPIC"
],
"globalPassThroughEnv": [
"NODE_ENV",
Expand Down

0 comments on commit 83ee7a9

Please sign in to comment.