import { Uuid } from '../domain/Uuid';
import { Heap } from './Heap';
import { MaybePromise } from './utilityTypes';

/**
 *
 */
interface Job {
  id: symbol | Uuid; // symbol;
  priority: number;
  promise?: Promise<unknown>;
  markReadyToRun?: (value?: unknown) => void;
  markDone?: (value?: unknown) => void;
  cancelPending?: (reason?: unknown) => void;
  earlyRejectRunning?: (reason?: unknown) => void;
}

type JobId = Job['id'];

/**
 *
 */
interface SchedulerStats {
  lastRun: number;
  running: number;
  waiting: number;
}

const priorityComparator = (a: Job, b: Job) => a.priority - b.priority;

/**
 *
 */
export class JobCancelledError extends Error {
  constructor() {
    super('Job cancelled.');
    this.name = 'JobCancelledError';
  }
}

/**
 *
 */
export class JobLimitExceededError extends Error {
  constructor(limit: number) {
    super(`The queue is full : job limit ( ${limit} ) on the waiting list exceeded.`);
    this.name = 'JobLimitExceededError';
  }
}

/**
 *
 */
export class SchedulerLockedError extends Error {
  constructor() {
    super(`Scheduler is locked and is not accepting new jobs.`);
    this.name = 'SchedulerLockedError';
  }
}

/**
 * Thomas :
 *   Based on https://github.com/mmomtchev/Queue
 *   For an alternate lib, https://github.com/sindresorhus/p-queue
 *
 * Greg :
 *   The class name Queue was misleading.
 *
 *   The purpose of this class is to do non-preemptive priority scheduling with
 *   throttling and control over the number of possible concurrently running jobs.
 *
 *   Non-preemptive : once a job is scheduled, it will run to completion.
 *
 *   It does use a queue for the waiting list, but there's no concept of external
 *   consumer for the jobs. ( Although nothing prevents some form of out-of-band
 *   communication between the producer and some recipient to be defined as part
 *   of a job ).
 *
 *   It runs the jobs itself and make the results available to the job producer
 *   when done.
 *
 *   It 'dequeues' from the waiting list when a slot for concurrent running job is free,
 *   not when something gets on the queue.
 *
 *   The library that this class was originally based on offered two APIs :
 *
 *     - wait-end-flush
 *         Wait in line for a job slot with an 'id ticket'.
 *         Once a slot is available it is attributed to that 'id ticket'.
 *         Execution resume -> job begins.
 *         Once job is done, release the slot with end('id ticket').
 *         Optionally wait for the queue to be empty with flush.
 *
 *     - run ( a wrapper around wait-end-flush )
 *        Send a job with some priority to the scheduler.
 *        Wait for job result.
 *
 *   We remove the lower level wait-end-flush API.
 *
 *   We add an optional 'id ticket' argument to the run API, in order to be able
 *   to electively opt out of the waiting list.
 *
 *
 *
 *   I'm not sure it makes sense for it to 'block' if the waiting list is full.
 *   Isn't this already the purpose of the waiting list ?
 *   For now if some maximum capacity is set, it will throw a JobLimitExceededError
 *   when trying to add jobs above that limit.
 *   Whatever is using the scheduler can react to that exception accordingly.
 *   We can discuss about some automatic eviction mechanism based on priority
 *   later, if it is ever needed.
 *
 *   Starvation is possible : if higher priority jobs keep being added, lower
 *   priority jobs may never be run.
 *
 * @todo Clarify language : job, jobFunc, slot.
 */
export class Scheduler {
  running: Map<JobId, Job> = new Map(); // Job[] = [];
  waiting = new Heap(priorityComparator);
  lastRun = 0;
  private _locked = false;

  constructor(
    public readonly maxConcurrentJobs = 1,
    public readonly throttleTimeoutInMilliseconds = 100,
    public readonly capacity = Number.MAX_SAFE_INTEGER
  ) {
    this.maxConcurrentJobs = maxConcurrentJobs < 1 ? 1 : maxConcurrentJobs;
    this.throttleTimeoutInMilliseconds = throttleTimeoutInMilliseconds < 0 ? 0 : maxConcurrentJobs;
    this.capacity = capacity < 1 ? 1 : capacity;
  }

  /**
   * Dequeue next job in the waiting queue.
   *
   * @note This is not what the original library called 'dequeue'.
   */
  private _dequeue(): void {
    /** @todo Figure out why this.running.size < this.maxConcurrentJobs is not checked here in OG. */
    if (this.running.size < this.maxConcurrentJobs) {
      const nextJob = this.waiting.pop();
      nextJob?.markReadyToRun?.();
    }
  }

  /**
   * Return true if found a job and freed occupied slot for given jobId.
   */
  private _freeSlot(id: JobId): boolean {
    const job = this.running.get(id);
    if (job) {
      this.running.delete(id);
      job.markDone?.(); /** Used to wait in in Scheduler.lockAndFlush. */
      this._dequeue();
      return true;
    }

    return false;
  }

  /**
   *
   */
  private async _waitForSlot(id: JobId, priority = 0): Promise<Job> {
    const job: Job = {
      id,
      priority,
    };

    /** Wait in line if no free slot. */
    if (this.running.size >= this.maxConcurrentJobs) {
      /**
       * This promise may be resolved by endRunningJob when another running
       * job completes and this job reached the front of the waiting queue.
       *
       * Or it can be rejected if the job is cancelled.
       */
      job.promise = new Promise((resolve, reject) => {
        job.markReadyToRun = resolve;
        job.cancelPending = reject;
      });

      /** Add the job to the waiting queue and wait in line. */
      this.waiting.push(job);
      await job.promise;
    }

    /**
     * Assign a new promise when the job is running.
     * This promise will be resolved when the job is done or rejected if it is
     * cancelled as the job is running.
     *
     * Cancelling doesn't stop a job mid-execution, it only frees the slot and
     * reject early for whoever is waiting on that job to complete.
     *
     * @todo Test job cancelled that also throws inside the job.
     */
    job.promise = new Promise((resolve, reject) => {
      job.markDone = resolve;
      job.earlyRejectRunning = reject;
    }).catch((e) => {
      /**
       * Catch reject on earlyRejectRunning here, so it doesn't
       * go unhandled.
       *
       * Return the error so that _runJob can rethrow and have the main promise
       * from Scheduler.run reject with the correct error.
       */
      return e;
    });

    this.running.set(id, job);

    /** Throttle if necessary. */
    while (Date.now() - this.lastRun < this.throttleTimeoutInMilliseconds) {
      await new Promise((resolve) => setTimeout(resolve, this.throttleTimeoutInMilliseconds - Date.now() + this.lastRun));
    }
    this.lastRun = Date.now();

    return job;
  }

  /**
   * Allow early rejection if the job is cancelled by rejecting the slot promise.
   *
   * @note It races the job function promise against the slot promise.
   *       It also allows the thrown JobCancelledError to be caught by
   *       the original Scheduler.run caller.
   *
   * @todo Should we ever need to warn the original Scheduler.run caller, that
   *       a running job was meant to be cancelled but somehow completed before
   *       being early rejected, we could probably also await slot.promise
   *       before returning. Then throw the result if it's a JobCancelledError
   *       or return it if's it not.
   *
   * @todo Find a way to avoid the dirty as Promise<R> assertion.
   *       By construction the slot promise can only reject before the job function is done,
   *       but surely there is a better way.
   */
  private async _runJob<R>(slot: Job, job: () => MaybePromise<R>): Promise<R> {
    const result = await Promise.race([slot.promise, job()]);
    if (result instanceof JobCancelledError) {
      throw result;
    }
    return result as Promise<R>;
  }

  /**
   * Run a job, i.e. :
   *     - wait in line until a slot is free,
   *     - execute the job,
   *     - resolve when the job is complete with the return value of the job.
   *     - reject if the job errored or was cancelled.
   */
  run<R>(job: () => MaybePromise<R>, priority = 0, jobId = Symbol()): Promise<R> {
    if (this._locked) {
      throw new SchedulerLockedError();
    }
    if (this.waiting.size >= this.capacity) {
      throw new JobLimitExceededError(this.capacity);
    }

    return this._waitForSlot(jobId, priority)
      .then((slot) => this._runJob(slot, job))
      .finally(() => {
        this._freeSlot(jobId);
      });
  }

  /**
   *
   */
  get stats(): SchedulerStats {
    return {
      running: this.running.size,
      waiting: this.waiting.size,
      lastRun: this.lastRun,
    };
  }

  /**
   * Return true if found a pending job to cancel for given jobId.
   */
  cancelPending(jobId: JobId): boolean {
    const cancelledJob = this.waiting.remove(
      {
        id: jobId,
        priority: -1, // dummy value, not used to find a job by its jobId.
      },
      (a, b) => a.id === b.id
    );
    if (cancelledJob) {
      cancelledJob.cancelPending?.(new JobCancelledError());
      return true;
    }

    return false;
  }

  /**
   *
   */
  cancelAllPending(): void {
    /** Swap with a new waiting queue. */
    const cancelledJobs = this.waiting;
    this.waiting = new Heap(priorityComparator);

    /** Cancel jobs in order of priority. */
    let next: Job | undefined;
    while ((next = cancelledJobs.pop()) !== undefined) {
      next.cancelPending?.(new JobCancelledError());
    }
  }

  /**
   * Return true if found a running job to cancel for given jobId.
   *
   * @note You cannot really cancel an async function mid-execution unless it
   *       explicitely supports such mechanism, e.g. :
   *         - using fetch with an AbortController
   *         - listening / periodically checking for a signal to abort and reject
   *           from inside the async function.
   *
   *       What we can do is wrap and reject on behalf of that async function,
   *       so that whatever is awaiting for it to resolve can move on.
   *
   * @todo Figure how bad this is wrt concurrency.
   *       It may be ok,
   *       see https://jakearchibald.com/2015/tasks-microtasks-queues-and-schedules/
   */
  cancelRunning(jobId: JobId): boolean {
    const cancelledJob = this.running.get(jobId);
    if (cancelledJob) {
      this.running.delete(jobId);
      cancelledJob.earlyRejectRunning?.(new JobCancelledError());
      this._dequeue();
      return true;
    }

    return false;
  }

  /**
   * Return true if found a job, running or pending, to cancel for given jobId.
   *
   * @todo Figure how bad this is wrt concurrency.
   *       It may be ok,
   *       see https://jakearchibald.com/2015/tasks-microtasks-queues-and-schedules/
   */
  cancel(jobId: JobId): boolean {
    if (this.cancelRunning(jobId)) {
      return true;
    }

    return this.cancelPending(jobId);
  }

  /**
   * @note Don't dequeue while iterating over running jobs Map.
   *       It may 'work' as long as dequeuing involves resolving a promise,
   *       because the promise callback should only be called after execution
   *       context stack is empty. But it won't otherwise.
   *       See https://stackoverflow.com/questions/35940216/es6-is-it-dangerous-to-delete-elements-from-set-map-during-set-map-iteration
   *
   */
  cancelAllRunning(): void {
    for (const [jobId, cancelledJob] of this.running) {
      this.running.delete(jobId);
      cancelledJob.earlyRejectRunning?.(new JobCancelledError());
    }

    for (let i = this.maxConcurrentJobs; i > 0; --i) {
      this._dequeue();
    }
  }

  /**
   *
   */
  cancelAll(): void {
    this.cancelAllPending();
    this.cancelAllRunning();
  }

  /**
   * Lock Scheduler and cancel all pending and running job.
   */
  shutdown(): void {
    this._locked = true;
    this.cancelAll();
  }

  /**
   * Lock scheduler and return a promise that resolves when all jobs are done.
   *
   * @note Awaiting a 'shared promise' from multiple places seems ok,
   *       see https://gist.github.com/pozorfluo/f2a7ae39cb02f2e2e7742683019324ef
   *
   * @note I'm not sure about how the author of the original library intended to use
   *       this, but there was no guarantee for the queue to be empty when the returned
   *       promise resolved. More jobs could be added to the waiting list during throttling
   *       timeout as flush waited for running jobs to complete ¯\_(ツ)_/¯.
   *
   *       We lock the Scheduler as flush is called, wait for the waiting queue
   *       to be emptied, collect all the running promises and await Promise.allSettled.
   */
  async lockAndFlush(): Promise<void> {
    this._locked = true;

    /**
     * @todo Consider holding a ref to the underlying array for the waiting queue
     *       in the scheduler and calling Heap.make on it.
     *       This would make it possible to ditch this loop, getNode and the
     *       try/catch in favor of Promise.allSettled.
     */
    while (this.waiting.size > 0) {
      try {
        await this.waiting.getNode(this.waiting.size - 1)?.promise;
      } catch (e) {
        // This promise can reject with JobCancelledError, ignore.
      }
    }

    /** And then finish on the running queue. */
    await Promise.allSettled([...this.running.values()].map((job) => job.promise));
  }

  /**
   * Unlock Scheduler.
   */
  unlock(): void {
    this._locked = false;
  }
}
