Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish data to flyxc #48

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ vite.config.ts.timestamp*
coverage/
*.sql
*.gz
.env.*.local
3 changes: 3 additions & 0 deletions bin/_emit-schema-ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const typescriptCode = await definition
column.optional = false
}

if (column.propertyName === 'flyXcToken') {
column.propertyName = 'flyXCToken'
}
if (column.propertyName === 'neighbours') {
column.propertyType = 'Neighbour[]'
}
Expand Down
4 changes: 3 additions & 1 deletion frontend/src/db-entities.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export interface NodesEntity {
channelUtilization?: number
createdAt: string
firmwareVersion?: string
flyXcToken?: string
flyXCToken?: string
hardwareModel?: number
hasDefaultChannel?: boolean
inbox?: MessageIn[]
Expand Down Expand Up @@ -99,6 +99,8 @@ export interface PositionsEntity {
createdAt: string
from: number
gatewayId?: number
groundSpeed?: number
groundTrack?: number
id: number
latitude?: number
longitude?: number
Expand Down
25 changes: 25 additions & 0 deletions frontend/src/entrypoints/js/maps/node-details-modal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { LoadedState } from './loaded-state.tsx'
import { Position } from './position.tsx'
import { DateTime } from 'luxon'
import _ from 'lodash'
import { CopyIcon } from '../utils/icon-constants.ts'
import { Tooltip } from '../components/tooltip.tsx'

interface Props {
allNodes?: Record<number, NodesEntityForUI>
Expand Down Expand Up @@ -77,6 +79,28 @@ export function NodeDetailsModal({ node, onClose, allNodes }: Props) {
.map((t) => DateTime.fromISO(t))
.min()!

function flyXCToken(node: NodesEntityForUI) {
const flyXCToken = node.flyXCToken

if (!flyXCToken) {
return
}
return (
<div>
<NameValue name="Fly XC ID" value={flyXCToken} />

<Tooltip tooltipText="Copy link to clipboard" className="border-sm inline-block rounded border ml-3">
<CopyIcon
className="w-5 h-5 inline-block p-0.5"
onClick={() => {
navigator.clipboard.writeText(flyXCToken)
}}
/>
</Tooltip>
</div>
)
}

return (
<Modal onClose={onClose} isOpen={true} header={node.longName || `UNKNOWN`}>
{image}
Expand All @@ -91,6 +115,7 @@ export function NodeDetailsModal({ node, onClose, allNodes }: Props) {
<div>
<NameValue name="Hex ID" value={node.nodeIdHex} /> / <NameValue name="ID" value={node.nodeId} />
</div>
{flyXCToken(node)}
<div>
<NameValue name="Hardware" value={hardwareModel} /> {firmwareVersion}
</div>
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"lodash": "^4.17.21",
"mocha": "^10.6.0",
"nodemon": "^3.1.4",
"pg-boss": "^10.1.1",
"postcss": "^8.4.39",
"prettier": "^3.3.2",
"protobufjs-cli": "^1.1.2",
Expand Down
5 changes: 3 additions & 2 deletions src/config/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { fileURLToPath } from 'url'
import { dbConnectionOptions } from '#config/db-connection-opts-parser'
import _ from 'lodash'
import { DateTime } from 'luxon'
import PgBoss from 'pg-boss'

// https://github.com/trancong12102/typeorm-naming-strategies/blob/master/src/postgres-naming.strategy.ts
class SnakeNamingStrategy extends DefaultNamingStrategy implements NamingStrategyInterface {
Expand Down Expand Up @@ -40,8 +41,8 @@ class SnakeNamingStrategy extends DefaultNamingStrategy implements NamingStrateg
const __filename = fileURLToPath(import.meta.url)
const __dirname = path.dirname(__filename)

const dbUrl = process.env.DATABASE_URL || `sqlite:///${__dirname}/../../tmp/paragliding-meshmap.sqlite3`

export const dbUrl = process.env.DATABASE_URL || `sqlite:///${__dirname}/../../tmp/paragliding-meshmap.sqlite3`
export const pgBoss = new PgBoss(dbUrl)
const { dbConnectionOpts, driver } = dbConnectionOptions(dbUrl)

export const connString = <DataSourceOptions>{
Expand Down
32 changes: 30 additions & 2 deletions src/entity/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import TextMessage from './text_message.js'
import _ from 'lodash'
import { BROADCAST_ADDR } from '#helpers/utils'
import { v5 as uuidv5 } from 'uuid'
import { AppDataSource } from '#config/data-source'
import { AppDataSource, pgBoss } from '#config/data-source'
import { Configs } from '#entity/configs'
import { randomUUID } from 'node:crypto'
import { flyXCLog } from '#helpers/logger'

@Entity()
export default class Node extends BaseTypeWithoutPrimaryKey {
Expand Down Expand Up @@ -207,9 +208,27 @@ export default class Node extends BaseTypeWithoutPrimaryKey {
positionPrecisionBits: BaseType.sanitizeNumber(position.precisionBits),
satsInView: BaseType.sanitizeNumber(position.satsInView),
})
await this.maybeSendCoordinates(node, position)
return await repository.save(node)
}

private static async maybeSendCoordinates(node: Node, position: Position) {
if (!(position.latitude && position.longitude && node.flyXCToken)) {
return
}

flyXCLog(`Queuing position update for ${node.shortName} (${node.flyXCToken})`)
await pgBoss.send('fly-xc', {
type: 'position',
user_id: node.flyXCToken,
time: DateTime.now().toMillis(),
latitude: position.latitude / 10000000,
longitude: position.longitude / 10000000,
altitude: position.altitude,
ground_speed: position.groundSpeed || 0,
})
}

static async updateNeighbors(trx: EntityManager, neighborInfo: NeighbourInfo) {
const repository = trx.getRepository(Node)
const node = (await repository.findOne({ where: { nodeId: neighborInfo.nodeId } })) || new Node({ nodeId: neighborInfo.nodeId })
Expand Down Expand Up @@ -254,10 +273,19 @@ export default class Node extends BaseTypeWithoutPrimaryKey {
}
}

outboundMessage(tm: Pick<TextMessage, 'from' | 'text' | 'to' | 'createdAt'>, purgeOlderThan: Duration) {
async outboundMessage(tm: Pick<TextMessage, 'from' | 'text' | 'to' | 'createdAt'>, purgeOlderThan: Duration) {
const now = DateTime.now()
this.outbox ||= []
this.outbox.unshift({ to: tm.to, text: tm.text, time: tm.createdAt.toISOString() })
if (tm.to === BROADCAST_ADDR) {
flyXCLog(`Queuing text message update for ${this.shortName} (${this.flyXCToken})`)
await pgBoss.send('fly-xc', {
type: 'message',
user_id: this.flyXCToken,
time: tm.createdAt.getTime(),
message: tm.text,
})
}
if (purgeOlderThan) {
this.outbox = this.outbox.filter((msg) => {
const messageAge = now.diff(DateTime.fromISO(msg.time))
Expand Down
1 change: 1 addition & 0 deletions src/helpers/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export const errLog = debug('meshmap:error')
export const perfLog = debug('meshmap:perf')
export const decodeLog = debug('meshmap:decode')
export const protobufDecode = debug('meshmap:pb')
export const flyXCLog = debug('meshmap:flyxc')
57 changes: 56 additions & 1 deletion src/mqtt/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { errLog, perfLog } from '#helpers/logger'
import { errLog, flyXCLog, perfLog } from '#helpers/logger'
import { processMessage } from '#mqtt/decoder'
import { dumpStats, purgeData } from '#mqtt/mqtt-orm'
import mqtt from 'async-mqtt'
Expand All @@ -9,6 +9,8 @@ import PQueue from 'p-queue'
import os from 'os'
import { DataSource } from 'typeorm'
import { Configs } from '#entity/configs'
import { pgBoss } from '#config/data-source'
import _ from 'lodash'

export async function mqttProcessor(db: DataSource, cliOptions: MQTTCLIOptions) {
const logger = debug('meshmap:mqtt')
Expand All @@ -27,6 +29,59 @@ export async function mqttProcessor(db: DataSource, cliOptions: MQTTCLIOptions)
await clientIdConfig.save(db)
}

await pgBoss.start()
await pgBoss.createQueue('fly-xc')

pgBoss.work(
'fly-xc',
{
batchSize: 10,
pollingIntervalSeconds: 10,
},
async (jobs) => {
const flyXCApiKey = process.env.FLYXC_API_KEY
const flyXCApiUrl = process.env.FLYXC_API_URL

if (flyXCApiKey && flyXCApiUrl) {
return await Promise.all(
jobs.map(async (job) => {
flyXCLog(`Picked up job`, job.data)
try {
const requestHeaders = {
Accept: 'application/json',
'Content-Type': 'application/json',
}

const response = await fetch(flyXCApiUrl, {
method: 'POST',
headers: _.assign(
{
Authorization: `Bearer ${flyXCApiKey}`,
},
requestHeaders
),
body: JSON.stringify(job.data),
})
flyXCLog(`Sending job to ${flyXCApiUrl}`)
flyXCLog({ requestHeaders })
flyXCLog({ body: JSON.stringify(job.data) })
flyXCLog(`Response`, response)

if (response.status === 200) {
return await response.text()
} else {
throw 'Failed to send data'
}
} catch (e) {
flyXCLog(`Failed to send data`, e)
throw e
}
})
)
}
}
)

const clientId = clientIdConfig.value!.toString()
const client = mqtt.connect(cliOptions.mqttBrokerUrl, {
username: cliOptions.mqttUsername,
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt/mqtt-orm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export async function saveTextMessage(db: DataSource, envelope: meshtastic.Servi
const [from, to] = await Promise.all([Node.findOrBuild(trx, tm.from), Node.findOrBuild(trx, tm.to)])
await trx.save(tm)

from.outboundMessage(tm, purgeOlderThan)
await from.outboundMessage(tm, purgeOlderThan)
to.inboundMessage(tm, purgeOlderThan)

await trx.save([from, to], { reload: false })
Expand Down
27 changes: 25 additions & 2 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,13 @@ create-require@^1.1.0:
resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==

cron-parser@^4.0.0:
version "4.9.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5"
integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==
dependencies:
luxon "^3.2.1"

cross-spawn@^7.0.0, cross-spawn@^7.0.2:
version "7.0.3"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.3.tgz#f73a85b9d5d41d045551c177e2882d4ac85728a6"
Expand Down Expand Up @@ -5517,7 +5524,7 @@ lru-cache@^6.0.0:
dependencies:
yallist "^4.0.0"

luxon@^3.4.4:
luxon@^3.2.1, luxon@^3.4.4:
version "3.5.0"
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.5.0.tgz#6b6f65c5cd1d61d1fd19dbf07ee87a50bf4b8e20"
integrity sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==
Expand Down Expand Up @@ -6311,6 +6318,15 @@ perfect-debounce@^1.0.0:
resolved "https://registry.yarnpkg.com/perfect-debounce/-/perfect-debounce-1.0.0.tgz#9c2e8bc30b169cc984a58b7d5b28049839591d2a"
integrity sha512-xCy9V055GLEqoFaHoC1SoLIaLmWctgCUaBaWxDZ7/Zx4CTyX7cJQLJOok/orfjZAh9kEYpjJa4d0KcJmCbctZA==

pg-boss@^10.1.1:
version "10.1.1"
resolved "https://registry.yarnpkg.com/pg-boss/-/pg-boss-10.1.1.tgz#7367c292b86bbeb558dbdeb358d1c7a4c55950fe"
integrity sha512-2t7gz5nEUYFabj8czWWFRUSyPDQ5t+K/EF5l9Q5lHn2iwyPPKgIfwK+8LKgRfyHRUePTDQhogsGcwOlNczfZ5Q==
dependencies:
cron-parser "^4.0.0"
pg "^8.5.1"
serialize-error "^8.1.0"

pg-cloudflare@^1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz#e6d5833015b170e23ae819e8c5d7eaedb472ca98"
Expand Down Expand Up @@ -6352,7 +6368,7 @@ pg-types@^2.1.0:
postgres-date "~1.0.4"
postgres-interval "^1.1.0"

pg@^8.12.0:
pg@^8.12.0, pg@^8.5.1:
version "8.12.0"
resolved "https://registry.yarnpkg.com/pg/-/pg-8.12.0.tgz#9341724db571022490b657908f65aee8db91df79"
integrity sha512-A+LHUSnwnxrnL/tZ+OLfqR1SxLN3c/pgDztZ47Rpbsd4jUytsTtwQo/TLPRzPJMp/1pbhYVhH9cuSZLAajNfjQ==
Expand Down Expand Up @@ -7123,6 +7139,13 @@ [email protected]:
range-parser "~1.2.1"
statuses "2.0.1"

serialize-error@^8.1.0:
version "8.1.0"
resolved "https://registry.yarnpkg.com/serialize-error/-/serialize-error-8.1.0.tgz#3a069970c712f78634942ddd50fbbc0eaebe2f67"
integrity sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ==
dependencies:
type-fest "^0.20.2"

serialize-javascript@^6.0.2:
version "6.0.2"
resolved "https://registry.yarnpkg.com/serialize-javascript/-/serialize-javascript-6.0.2.tgz#defa1e055c83bf6d59ea805d8da862254eb6a6c2"
Expand Down
Loading