/**
* Performance DB API service
*/
import datasette from './datasette.js'
import logger from '../utils/logger.js'
import { types } from '../utils/logging.js'
// ===========================================
// for now we are using a csv for these messages but we will probably end up moving to a table, so for now this can sit in the fake performance db api
import csv from 'csv-parser' // ToDo: remember to remove this from package.json when we move away from csv
import fs from 'node:fs'
const messages = new Map()
/**
* Reads messages from CSV files and populates `messages` map.
*/
export async function initialiseMessages () {
// the csv parser works streams so we have wrap it with promises
const fieldIssuesMessages = new Promise((resolve, reject) => {
fs.createReadStream('src/content/fieldIssueMessages.csv').pipe(csv()).on('data', row => {
try {
const messageInfo = messages.get(row.issue_type) ?? { dataset_specific_messages: {} }
if (row.dataset && row.dataset !== 'all') {
messageInfo.dataset_specific_messages[row.dataset] = messageInfo.dataset_specific_messages[row.dataset] ?? {}
messageInfo.dataset_specific_messages[row.dataset].singular = row.singular_message
messageInfo.dataset_specific_messages[row.dataset].plural = row.plural_message
} else {
messageInfo.singular = row.singular_message
messageInfo.plural = row.plural_message
}
messages.set(row.issue_type, messageInfo)
} catch (error) {
reject(error)
}
}).on('end', () => {
logger.debug('finished populating messages', { type: types.App })
resolve(messages)
})
})
await fieldIssuesMessages
const entityMessages = new Promise((resolve, reject) => {
fs.createReadStream('src/content/entityIssueMessages.csv').pipe(csv()).on('data', row => {
try {
const messageInfo = messages.get(row.issue_type)
if (row.dataset && row.dataset !== 'all') {
messageInfo.dataset_specific_messages[row.dataset] = messageInfo.dataset_specific_messages[row.dataset] ?? {}
messageInfo.dataset_specific_messages[row.dataset].entities_singular = row.singular_message
messageInfo.dataset_specific_messages[row.dataset].entities_plural = row.plural_message
} else {
messageInfo.entities_singular = row.singular_message
messageInfo.entities_plural = row.plural_message
}
} catch (error) {
reject(error)
}
}).on('end', () => {
resolve(messages)
})
})
await entityMessages
const allRowsMessages = new Promise((resolve, reject) => {
fs.createReadStream('src/content/allRowsIssueMessages.csv').pipe(csv()).on('data', row => {
try {
const messageInfo = messages.get(row.issue_type)
if (row.dataset && row.dataset !== 'all') {
messageInfo.dataset_specific_messages[row.dataset] = messageInfo.dataset_specific_messages[row.dataset] ?? {}
messageInfo.dataset_specific_messages[row.dataset].allRows_message = row.allRows_message
} else {
messageInfo.allRows_message = row.allRows_message
}
} catch (error) {
reject(error)
}
}).on('end', () => {
// Messages object is now populated
logger.info('allRowsMessages stream end', { type: types.App })
resolve(messages)
})
})
await allRowsMessages
}
await initialiseMessages()
// ===========================================
const datasetIssuesQuery = (resource, datasetId) => {
return /* sql */ `
SELECT
i.field,
i.issue_type,
i.line_number,
i.value,
i.message,
CASE
WHEN COUNT(
CASE
WHEN it.severity == 'error' THEN 1
ELSE null
END
) > 0 THEN 'Needs fixing'
ELSE 'Live'
END AS status,
COUNT(i.issue_type) as num_issues
FROM
issue i
LEFT JOIN
issue_type it ON i.issue_type = it.issue_type
WHERE
i.resource = '${resource}'
AND i.dataset = '${datasetId}'
AND (it.severity == 'error')
GROUP BY i.issue_type, i.field
ORDER BY it.severity`
}
/**
* @typedef {Object} Dataset
* @property {string} status - One of: 'Not submitted', 'Error', 'Needs fixing', 'Warning', 'Live'
* @property {string} endpoint
* @property {number} issue_count
* @property {?string} error
*/
/**
* @typedef {Object} LpaOverview
* @property {Object.<string, Dataset>} datasets - Map of dataset names to Dataset objects
*/
const entityCountSelectFragment = (dataset, resource, entityCount) => `select '${dataset}' as d, '${resource}' as r, ${entityCount} as e`
/**
* Generates a query for LPA overview data
*
* @param {string} lpa - The LPA identifier
* @param {Object} params - Query parameters
* @param {string[]} params.datasetsFilter - List of dataset names to filter by
* @param {Object[]} params.entityCounts - Array of entity count objects
* @param {string} params.entityCounts[].dataset - Dataset name
* @param {string} params.entityCounts[].resource - Resource identifier
* @param {number=} params.entityCounts[].entityCount - Optional entity count
* @returns {string} The generated SQL query
*/
export function lpaOverviewQuery (lpa, params) {
let datasetClause = ''
if (params.datasetsFilter) {
const datasetString = params.datasetsFilter.map(dataset => `'${dataset}'`).join(',')
datasetClause = `AND rle.pipeline in (${datasetString})`
}
const entityCountsSelects = []
for (const { resource, dataset, entityCount } of params.entityCounts) {
if (Number.isInteger(entityCount) && entityCount >= 0) {
entityCountsSelects.push(entityCountSelectFragment(dataset, resource, entityCount))
}
}
if (entityCountsSelects.length === 0) {
// add bogus select, to ensure the resulting SQL is valid
entityCountsSelects.push(entityCountSelectFragment('none', 'none', 0))
}
return /* sql */`
with entity_counts as (
select d as dataset, r as resource, e as entity_count
from (
${entityCountsSelects.join(' union ')}
)
)
SELECT
REPLACE(rle.organisation, '-eng', '') as organisation,
rle.name,
rle.pipeline as dataset,
rle.endpoint,
rle.resource,
rle.latest_exception,
rle.latest_status as http_status,
coalesce(ec.entity_count, 0) as entity_count,
i.count_issues as issue_count,
i.responsibility,
i.fields,
case
when (rle.latest_status is null) then 'Not submitted'
when (rle.latest_status != '200') then 'Error'
when (i.severity = 'error') then 'Needs fixing'
else 'Live'
end as status,
case
when ((cast(rle.latest_status as integer) > 200)) then format('There was a %s error accessing the endpoint URL', rle.latest_status)
else null
end as error,
case
when (i.severity = 'info') then ''
else i.issue_type
end as issue_type,
case
when (i.severity = 'info') then ''
else i.severity
end as severity
FROM
reporting_latest_endpoints rle
LEFT JOIN
endpoint_dataset_issue_type_summary i ON rle.resource = i.resource AND rle.pipeline = i.dataset
LEFT OUTER JOIN
entity_counts ec ON ec.resource = rle.resource AND ec.dataset = rle.pipeline
WHERE
REPLACE(rle.organisation, '-eng', '') = '${lpa}'
${datasetClause}
ORDER BY
rle.organisation,
rle.name;`
}
export const issuesQueryLimit = 1000
/**
* @typedef {Object} TaskMessageOptions
* @property {string} issue_type - Type of issue
* @property {number} num_issues - Number of issues
* @property {number} rowCount - Total row count
* @property {string} field - Field name
* @property {('html'|'text')} [format] - Output format
* @property {string} [dataset] - Dataset name for dataset-specific messages
*/
/**
* Performance DB API service
* @export
* @default
*/
export default {
resourceStatusQuery (lpa, datasetId) {
return /* sql */ `
select resource, endpoint_url, endpoint, status, latest_log_entry_date, days_since_200
from reporting_latest_endpoints
WHERE REPLACE(organisation, '-eng', '') = '${lpa}'
AND pipeline = '${datasetId}'`
},
datasetIssuesQuery,
/**
* Returns a task message based on the provided issue type, issue count, and entity count.
*
* Pass format = 'html' if you want the fields in the message to be marked up with span.column-name. Otherwise
* plain text message is returned.
*
* @param {TaskMessageOptions} options - Task message options
* @param {boolean} [entityLevel=false] - Whether to use entity-level or dataset level messaging
*
* @returns {string} The task message with the issue count inserted
*
* @throws {Error} If the issue type is unknown
*/
getTaskMessage ({
issue_type: issueType,
num_issues: numIssues,
rowCount,
field,
...rest
}, entityLevel = false) {
const defaultMessage = `${numIssues} issue of type ${issueType}`
let messageInfo = messages.get(issueType)
if (!messageInfo) {
logger.warn({
message: `PerformanceDbApi.getTaskMessage(): Unknown issue type: ${issueType}`,
type: types.App
})
return defaultMessage
}
if (!field) {
logger.warn('performanceDbApi.getTaskMessage(): no field provided', { issueType })
field = 'value'
}
if (rest.dataset && messageInfo?.dataset_specific_messages[rest.dataset]) {
messageInfo = {
...messageInfo,
...messageInfo.dataset_specific_messages[rest.dataset]
}
}
let message
if (Number.isInteger(rowCount) && numIssues >= rowCount) {
message = messageInfo.allRows_message
} else if (entityLevel) {
message = numIssues === 1
? messageInfo.entities_singular
: messageInfo.entities_plural
} else {
message = numIssues === 1
? messageInfo.singular
: messageInfo.plural
}
if (!message) return defaultMessage
const fieldText = rest.format === 'html' ? `<span class="column-name">${field}</span>` : field
return message.replace('{num_issues}', numIssues).replace('{num_entries}', numIssues).replace('{column_name}', fieldText)
},
latestResourceQuery: (lpa, dataset) => {
return /* sql */ `
SELECT rle.resource, rle.status, rle.endpoint, rle.endpoint_url, rle.status, rle.days_since_200, rle.exception, rle.resource_start_date as startDate
FROM reporting_latest_endpoints rle
LEFT JOIN resource_organisation ro ON rle.resource = ro.resource
LEFT JOIN organisation o ON REPLACE(ro.organisation, '-eng', '') = o.organisation
WHERE REPLACE(ro.organisation, '-eng', '') = '${lpa}'
AND rle.pipeline = '${dataset}'`
},
/**
* Query for obtaining resource ids for given datasets
*
* @param {*} lpa
* @param {{datasetsFilter: string[]}} params
* @returns {string} SQL
*/
latestResourcesQuery: (lpa, params) => {
let datasetClause = ''
if (params.datasetsFilter) {
const datasetString = params.datasetsFilter.map(dataset => `'${dataset}'`).join(',')
datasetClause = `AND rle.pipeline in (${datasetString})`
}
return /* sql */ `
select
rle.pipeline as dataset,
rle.resource as resource,
rle.resource_start_date as startDate
from reporting_latest_endpoints rle
where
REPLACE(organisation, '-eng', '') = '${lpa}'
${datasetClause}`
},
/**
* Query for datasets with active endpoints with error status.
*
* @param {string} lpa
* @param {{datasetsFilter: string[]}} params
* @returns {string} SQL
*/
datasetErrorStatusQuery: (lpa, params) => {
return /* sql */ `
select
dataset
from
provision_summary
where
coalesce("active_endpoint_count", 0) > 1 and coalesce("error_endpoint_count", 0) >= 1
and "organisation" = '${lpa}'
and "dataset" in (${params.datasetsFilter.map(dataset => `'${dataset}'`).join(',')})
order by dataset asc`
},
/**
* Get entity counts for a list of resources
* @param {Array<Object>} resources - Array of resource objects
* @param {string} resources[].resource - Resource identifier
* @param {string} resources[].dataset - Dataset identifier
* @returns {Promise<Array<Object>>} Array of objects containing resource, dataset and optional entityCount
*/
async getEntityCounts (resources) {
const requests = resources.map(({ resource, dataset }) => {
const q = datasette.runQuery(this.entityCountQuery(resource), dataset)
return q
.then(result => {
if (result.formattedData.length === 0) {
logger.info({ message: 'getEntityCounts(): No results for resource.', resource, dataset, type: types.App })
return { resource, dataset }
}
return { resource, dataset, entityCount: result.formattedData[0].entity_count }
})
.catch((error) => {
logger.warn('getEntityCounts(): could not obtain entity counts. Proceeding without them.',
{ type: types.App, errorMessage: error.message, errorStack: error.stack })
return { resource, dataset }
})
})
const results = await Promise.allSettled(requests)
return results
.filter(p => p.status === 'fulfilled')
.map(p => p.value)
},
getEntitiesWithIssuesCountQuery: (req) => {
const { issue_type: issueType, issue_field: issueField } = req.params
const { resource: resourceId } = req.resource
return /* sql */ `
SELECT count(DISTINCT entry_number) as count
FROM issue
WHERE resource = '${resourceId}'
AND issue_type = '${issueType}'
AND field = '${issueField}'
`
},
/**
*
* @param {{ params: { issue_type: string, issue_field: string}, parsedParams: { pageNumber: number}, resource: { resource: string } }} req
* @returns {string} sql query
*/
getIssuesQuery: (req) => {
const { issue_type: issueType, issue_field: issueField } = req.params
const { pageNumber } = req.parsedParams
const resourceId = req.resource.resource
const offset = Math.floor((pageNumber - 1) / issuesQueryLimit) * issuesQueryLimit
return /* sql */ `
SELECT i.field, i.line_number, entry_number, message, issue_type, value
FROM issue i
WHERE resource = '${resourceId}'
AND issue_type = '${issueType}'
AND field = '${issueField}'
ORDER BY entry_number ASC
LIMIT 1000 ${offset ? `OFFSET ${offset}` : ''}`
},
/**
* Get entry details for a specific resource
* @param {string} resourceId - Resource identifier
* @param {number} entryNumber - Entry number
* @param {string} dataset - Dataset identifier
* @returns {Promise<Array<Object>>} Array of entry objects containing field and value information
*/
async getEntry (resourceId, entryNumber, dataset) {
logger.debug({ message: 'getEntry()', resourceId, entryNumber, dataset, type: types.App })
// TODO: why do we order by rowid?
const sql = /* sql */ `
select
fr.rowid,
fr.end_date,
fr.fact,
fr.entry_date,
fr.entry_number,
fr.resource,
fr.start_date,
ft.entity,
ft.field,
ft.entry_date,
ft.start_date,
ft.value
from
fact_resource fr
left join fact ft on fr.fact = ft.fact
where
fr.resource = '${resourceId}'
and fr.entry_number = ${entryNumber}
order by
fr.rowid`
const result = await datasette.runQuery(sql, dataset)
return result.formattedData
}
}