Source: models/requestData.js

import * as v from 'valibot'
import logger from '../utils/logger.js'
import { types } from '../utils/logging.js'
import axios from 'axios'
import config from '../../config/index.js'
import ResponseDetails from './responseDetails.js'

const ResponseDetailsOptions = v.optional(v.object({
  severity: v.optional(v.pipe(v.string(), v.minLength(2))),
  issue: v.optional(v.object({
    issueType: v.pipe(v.string(), v.minLength(1)),
    field: v.pipe(v.string(), v.minLength(1))
  }))
}))

/**
 * Holds response data of 'http://ASYNC-REQUEST-API-HOST/requests/:result-id' endpoint.
 *
 * Allows to get the result details by invoking {@link fetchResponseDetails}
 */
export default class ResultData {
  constructor (responseData) {
    Object.assign(this, responseData)
  }

  /**
   * Fetches check results details, optionally filtered by issue (type and field), or severity (but not both).
   *
   * @param {number} [pageOffset=0] - Zero based page offset
   * @param {number} [limit=50] - Results per page limit
   * @param {Object} [opts] - Filter options
   * @param {string} [opts.severity] - Filter by severity
   * @param {Object} [opts.issue] - Filter by issue
   * @param {string} [opts.issue.issueType] - Issue type to filter by
   * @param {string} [opts.issue.field] - Field to filter by
   * @returns {Promise<ResponseDetails>} Response details
   */
  async fetchResponseDetails (pageOffset = 0, limit = 50, opts = { severity: undefined }) {
    v.parse(ResponseDetailsOptions, opts)

    const url = new URL(`${config.asyncRequestApi.url}/${config.asyncRequestApi.requestsEndpoint}/${this.id}/response-details`)
    url.searchParams.append('offset', pageOffset * limit)
    url.searchParams.append('limit', limit)
    if ('issue' in opts) {
      const { issueType, field } = opts.issue
      // 'missing column' is an issue type we made up: the Request API does not use it, so we can't filter by that value
      if (issueType !== 'missing column') {
        url.searchParams.append('jsonpath', `$.issue_logs[*]."issue-type"=="${issueType}" && $.issue_logs[*]."field"=="${field}"`)
      }
    } else if (opts.severity) {
      url.searchParams.append('jsonpath', `$.issue_logs[*].severity=="${opts.severity}"`)
    }

    // we do initial request, check how many records there are via 'x-pagination-total-results' header
    // and if fetch the rest if needed
    const response = await axios.get(url, { timeout: 30000 })
    const totalResults = Number.parseInt(response.headers['x-pagination-total-results'])
    const responses = [...response.data]
    if (Number.isInteger(totalResults) && totalResults > response.data.length) {
      const urlTemplate = new URL(url)
      urlTemplate.searchParams.delete('offset')
      urlTemplate.searchParams.delete('limit')

      const paginationOpts = { limit, offset: response.data.length, maxOffset: Number.isInteger(totalResults) ? totalResults : 100 }
      const restResponses = await fetchPaginated(url, paginationOpts)
      responses.push(...restResponses.flatMap(resp => resp.data))
    }

    // we're not using x-pagination-offset and x-pagination-limit headers, because we fetched
    // all the records already, so there's no need for pagination controls on the table
    const pagination = {
      totalResults: `${totalResults}`,
      offset: '0',
      limit: `${totalResults}`
    }

    return new ResponseDetails(this.id, responses, pagination, this.getColumnFieldLog())
  }

  isFailed () {
    return this.status === 'FAILED'
  }

  getType () {
    return this.type
  }

  getError () {
    if (!this.response) {
      logger.warn('trying to get error when there are none', { requestId: this.id })
      return { message: 'An unknown error occurred.' }
    }

    return this.response.error
  }

  hasErrors () {
    if (!this.response || !this.response.data) {
      logger.warn('trying to check for errors when there are none', { requestId: this.id })
      return true
    }
    if (this.response.data['error-summary'] == null) {
      logger.warn('trying to check for errors but there is no error-summary', { requestId: this.id })
      return true
    }
    return this.response.data['error-summary'].length > 0
  }

  isComplete () {
    const finishedProcessingStatuses = ['COMPLETE', 'FAILED']
    return finishedProcessingStatuses.includes(this.status)
  }

  /**
   *
   * @returns {any[]}
   */
  getColumnFieldLog () {
    if (!this.response || !this.response.data || !this.response.data['column-field-log']) {
      logger.warn('trying to get column field log when there is none', { requestId: this.id })
      return []
    }
    return this.response.data['column-field-log']
  }

  getParams () {
    return this.params
  }

  getId () {
    return this.id
  }
}

/**
 * Returns a generator of offset values.
 *
 * @param {number} limit
 * @param {number} offset
 * @param {number} maxOffset
 */
function * offsets (limit, offset, maxOffset) {
  let currentOffset = offset
  while (currentOffset < maxOffset) {
    yield currentOffset
    currentOffset += limit
  }
}

/**
 *
 * @param {number} numTasks max number of tasks to run
 * @param {Object} gen offset generator
 * @param {Function} taskFactory (taskIndex, offset) => Promise<>
 * @returns {Promise[]}
 */
function startRequests (numTasks, gen, taskFactory) {
  const tasks = []
  for (let i = 0; i < numTasks; ++i) {
    const offsetItem = gen.next()
    if (!offsetItem.done) {
      const p = taskFactory(i, offsetItem.value)
      tasks.push(p)
    } else {
      break
    }
  }
  return tasks
}

/**
 * Given a task factor function, executes a number of async tasks in parallel,
 * but only at most `options.concurrency` tasks are in flight.
 *
 * Note: the tasks should be IO bound.
 *
 * If any of the tasks fail, the whole operation fails (in other words:
 * no partial results).
 *
 * @param {Object} options
 * @returns {Promise<Object[][]>}
 */
async function fetchBatched (options) {
  // Note: trying more involved strategy of launching requests by using Promise.any()
  // and trying to immedieately replace that one completed promise with a new one
  // didn't really behave as expected - work was happening mostly in a single promise.
  // This one's simpler and seems to actually do what expected.
  const { concurrency, taskFn, offsetInfo } = options
  const results = []
  const gen = offsets(offsetInfo.limit, offsetInfo.offset, offsetInfo.maxOffset)
  const newTask = async (index, offset) => {
    logger.debug('fetchBatched(): starting task', { task: index, offset, type: types.DataFetch })
    const p = taskFn(offset).then((val) => {
      logger.debug('fetchBatched(): finishing task', { task: index, offset, type: types.DataFetch })
      return { val, index, offset }
    })
    return p
  }

  let promises = startRequests(concurrency, gen, newTask)

  do {
    const completed = await Promise.all(promises)
    results.push(...completed)
    promises = startRequests(concurrency, gen, newTask)
    logger.debug(`fetchBatched(): completed ${completed.length} tasks`, { type: types.DataFetch })
  } while (promises.length > 0)

  logger.info(`fetchBatched(): completed ${results.length} requests`, { type: types.DataFetch })
  results.sort((r1, r2) => r1.offset - r2.offset)
  return results.map(r => r.val)
}

/**
 *
 * @param {URL} url url
 * @param {Object} options
 * @returns {Promise<Object[]>}
 */
export const fetchPaginated = async (url, { limit, offset, maxOffset }) => {
  const taskFn = async (offset) => {
    const thisUrl = new URL(url)
    thisUrl.searchParams.set('offset', offset)
    thisUrl.searchParams.set('limit', limit)
    const result = await axios.get(thisUrl, { timeout: 10000 })
    return result
  }

  return await fetchBatched({ concurrency: 4, taskFn, offsetInfo: { limit, offset, maxOffset } })
}