import { MaybePromise, SingleOrArray } from '../utils';
import { DomainEvent, DomainEventId, DomainEventShape } from './DomainEvent';
import { Committed, DomainEventStore, UnknownDomainEventIdError } from './DomainEventStore';
import { LedgerStore } from './LedgerStore';
import { Projector, ProjectorError } from './Projector';
import { Publisher } from './Publisher';
import { Reactor } from './Reactor';

/**
 * @note Opaque type.
 */
declare const validProjectorRef: unique symbol;
export type ProjectorRef<TRepos, TDomainEvent extends DomainEventShape = DomainEvent> = {
  projector: Projector<TRepos, TDomainEvent>;
  repos: TRepos;
  clearProjectionFn: (repos: TRepos) => MaybePromise<void>;
} & { [validProjectorRef]: true };

/**
 * @note As a workaround TS's lack of support for existential types,
 *       ProjectionistConfig uses opaques type to require going through
 *       helper functions to guarantee ProjectorRef's internal consistency.
 *
 * @todo Consider moving clearProjectionFn definition to createProjector().
 */
export function createProjectorRef<TRepos, TDomainEvent extends DomainEventShape = DomainEvent>(
  projector: Projector<TRepos, TDomainEvent>,
  repos: TRepos,
  clearProjectionFn: (repos: TRepos) => MaybePromise<void>
): ProjectorRef<TRepos, TDomainEvent> {
  return {
    projector,
    repos,
    clearProjectionFn,
  } as ProjectorRef<TRepos, TDomainEvent>;
}

/**
 * @note Opaque type.
 */
declare const validReactorRef: unique symbol;
export type ReactorRef<TRepos, TDomainEvent extends DomainEventShape = DomainEvent> = {
  reactor: Reactor<TRepos, TDomainEvent>;
  repos: TRepos;
} & { [validReactorRef]: true };

/**
 *
 */
export function createReactorRef<TRepos, TDomainEvent extends DomainEventShape = DomainEvent>(
  reactor: Reactor<TRepos, TDomainEvent>,
  repos: TRepos
): ReactorRef<TRepos, TDomainEvent> {
  return {
    reactor,
    repos,
  } as ReactorRef<TRepos, TDomainEvent>;
}

/**
 *
 */
// eslint-disable-next-line  @typescript-eslint/no-explicit-any
export interface ProjectionistConfig<TRepos = any, TDomainEvent extends DomainEventShape = DomainEvent> {
  projectors: ProjectorRef<TRepos, TDomainEvent>[];
  reactors?: ReactorRef<TRepos, TDomainEvent>[];
}

export type ProjectorStatus = 'PLAYING' | 'STOPPED' | 'BROKEN';

export interface ProjectorCursor {
  name: string;
  status: ProjectorStatus;
  lastEventId: DomainEventId | null;
}

/**
 * Provide an interface exposing static members to allow for async creation of a
 * complete Projectionist via a static method.
 */
export type ProjectionistBuilder = typeof Projectionist;

/**
 * @note Reactors are not tracked in the ledger, they only play events as they
 *       happen, where they happen, i.e. : merged events are not sent to Reactors.
 *
 * @todo Consider compacting app history by using a ledgered projections set as
 *       seed and discarding event older than the seed ledger shows.
 *
 * @todo Consider that nothing prevents you from instancing multiple Projectionists
 *       hitting the same LedgerStore, and it's bad. It's probably not that bad
 *       for the DomainEventStore because of the id constraints.
 *       Look for a way to detect and prevent this.
 *       A singleton doesn't look like a good fit, you may want multiple Projectionist
 *       and they need to be configurable.
 *       Tracking LedgerStore refs in a static set is an option ( but there's no finalizer
 *       to automatically remove them when an instance is GCed ).
 *
 */
export class Projectionist<
  TConfig extends ProjectionistConfig<unknown, TDomainEvent>,
  TDomainEvent extends DomainEventShape = DomainEvent
> {
  private readonly projectorRefs: TConfig['projectors'];
  private readonly reactorRefs: TConfig['reactors'];

  /**
   *
   */
  private constructor(
    /** @todo Consider not having ProjectionistConfig in the ctor but passed on play/boot as arg. */
    config: TConfig,
    private ledger: Record<TConfig['projectors'][number]['projector']['name'], ProjectorCursor>,
    private readonly ledgerStore: LedgerStore,
    private readonly eventStore: DomainEventStore<TDomainEvent>,
    private readonly chunkSize = 1000
  ) {
    this.projectorRefs = config.projectors;
    this.reactorRefs = config.reactors;
  }

  /**
   * Build a consistent and ready Projectionist using an async factory method
   * and a private constructor to restrict creation methods.
   */
  // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
  static async create<
    TConfig extends ProjectionistConfig<unknown, TDomainEvent>,
    TDomainEvent extends DomainEventShape = DomainEvent
  >(config: TConfig, ledgerStore: LedgerStore, eventStore: DomainEventStore<TDomainEvent>, chunkSize = 1000) {
    return new Projectionist(
      config,
      await Projectionist.fetchLedger<TConfig, TDomainEvent>(config, ledgerStore),
      ledgerStore,
      eventStore,
      chunkSize
    );
  }

  /**
   * Retrieve existing cursors from the store and init new ones if necessary for
   * current config.
   */
  private static async fetchLedger<
    TConfig extends ProjectionistConfig<unknown, TDomainEvent>,
    TDomainEvent extends DomainEventShape = DomainEvent
  >(config: TConfig, ledgerStore: LedgerStore) {
    const ledger = {} as Record<TConfig['projectors'][number]['projector']['name'], ProjectorCursor>;
    const stored = {} as Record<string, ProjectorCursor>;

    /** @note Any 'dangling' entries in the LedgerStore will be retrieved. */
    (await ledgerStore.getAll()).forEach((cursor) => {
      stored[cursor.name] = cursor;
    });

    config.projectors.forEach((projectorRef) => {
      const projectorName: keyof typeof ledger = projectorRef.projector.name;
      /** Create a new cursor if none found. */
      ledger[projectorName] = stored[projectorName] ?? {
        name: projectorName,
        status: 'PLAYING',
        lastEventId: null,
      };
    });

    return ledger;
  }
  /**
   * Retrieve existing cursors from the store and init new ones if necessary for
   * current config.
   *
   * @todo Consider using static fetchLedger in _fetchLedger :
   *
   *        return Projectionist.fetchLedger<TConfig, TDomainEvent>(
   *          { projectors: this.projectorRefs } as TConfig, // , reactors: this.reactorRefs
   *            this.ledgerStore
   *         );
   */
  private async _fetchLedger() {
    const ledger = {} as Record<TConfig['projectors'][number]['projector']['name'], ProjectorCursor>;
    const stored = {} as Record<string, ProjectorCursor>;
    /** @note Any 'dangling' entries in the LedgerStore will be retrieved. */
    (await this.ledgerStore.getAll()).forEach((cursor) => {
      stored[cursor.name] = cursor;
    });

    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof ledger = projectorRef.projector.name;
      /** Create a new cursor if none found. */
      ledger[projectorName] = stored[projectorName as string] ?? {
        name: projectorName as string,
        status: 'PLAYING',
        lastEventId: null,
      };
    });
    return ledger;
  }

  /**
   * @todo Consider :
   *         - retrieving only one batch ( and later stream ? ) from the EventStore
   *           starting from oldest lastEventId and using it for every projector
   *           starting from where each needs.
   *
   *         - retrieving only the type of events a projector cares about.
   *
   *         - catching up projections in memory and flushing the result to persistent
   *           storage when done.
   *
   * @todo Consider clearing projection if lastEventId is null.
   *
   * @note Does not catch EventStore error because it has to be handled differently than BrokenProjectorError.
   *
   * @todo Consider using an ad-hoc publisher, registering projectors, emitting
   *       all events then discarding ad-hoc publisher. This is probably one of
   *       the multiple necessary steps to allow proper cross-aggregate reference
   *       ( but not the only one since you also need to guarantee causal order
   *        of events holding cross aggregate refs, see tla specs notes).
   *
   *       There would still NOT be any order guarantee for multiple projector
   *       handling the same event.
   */
  async catchUp(): Promise<ProjectorError<TDomainEvent>[]> {
    this.ledger = await this._fetchLedger();
    // console.log('Projectionist.catchUp', this.ledger);

    /**
     * @todo Consider that using an ad-hoc publisher would require to identify
     *       the 'oldest' lastEventId. Then all retrieved events would be played
     *       for all configured projectors, so you better be confident that
     *       every handler is idempotent.
     *
     * @todo Consider that you only need to do this if you want to allow projections
     *       reading from other projections. And you probably don't want to do that
     *       at all. Re-read the x-agg recommendations doc, see if moving the handler
     *       in each projector and avoiding x-projections dependencies is enough and
     *       ok. If you really need to do it anyway, isn't the safe way to do
     *       rollback projection with store.getDomainEventsAt ? Yes but only
     *       if it's not also an x-agg ref : remember that causal order is very
     *       limited for now, x-agg is mostly concurrent/incomparable. There are
     *       ways to get some happens before relationships established for x-agg
     *       but we're not there yet.
     *       Reread about how to apply the idea of lamport timestamps here.
     */

    /** Track projectors breaking DURING the catchUp process. */
    const brokenProjectorNames = new Set();
    const report: ProjectorError<TDomainEvent>[] = [];

    console.groupCollapsed('Projectionist.catchUp');
    /** Process projectors sequentially. */
    for (let i = 0, length = this.projectorRefs.length; i < length; ++i) {
      const projectorName: keyof typeof this.ledger = this.projectorRefs[i].projector.name;

      console.log('Projectionist.catchUp : starting...', projectorName);
      let lastEventId = this.ledger[projectorName].lastEventId;

      /**
       * Start over if projector is broken.
       *
       * With this setup, if a projector keeps failing, it will be restarted on
       * *every* catchUp call, i.e. most likely every app restart.
       *
       * @todo Consider alternatives :
       *         - capping the number of retries ?
       *         - tracking and restarting from failing event pos ?
       *         - moving on ?
       */
      if (this.ledger[projectorName].status === 'BROKEN') {
        console.log('Projectionist.catchUp : BROKEN', projectorName);
        lastEventId = null;
        await this.projectorRefs[i].clearProjectionFn(this.projectorRefs[i].repos);
      }

      /** Retrieve events after projector position. */
      let events: Committed<TDomainEvent>[];
      try {
        events = lastEventId
          ? await this.eventStore.getAllAfter(lastEventId, this.chunkSize)
          : await this.eventStore.getAll(0, this.chunkSize);
      } catch (error) {
        if (error instanceof UnknownDomainEventIdError) {
          /**
           * Attempt to reboot if projector is ahead of eventStore.
           *
           * This may happen when emitting an event if a projector processed
           * the event before writing to the eventStore ends up failing.
           *
           * @todo Consider clearing projection first vs marking as broken first.
           */
          console.log('Projectionist.catchUp : AHEAD OF STORE', projectorName);

          this.ledger[projectorName].status = 'BROKEN';
          this.ledger[projectorName].lastEventId = null;
          this.ledgerStore.put(this.ledger[projectorName]);
          await this.projectorRefs[i].clearProjectionFn(this.projectorRefs[i].repos);
          events = await this.eventStore.getAll(0, this.chunkSize);
        } else {
          throw error;
        }
      }

      try {
        while (events.length) {
          console.log('Projectionist.catchUp : playing', events.length);

          /** Play retrieved events with the projector. */
          const errors = await this.projectorRefs[i].projector.play(events, this.projectorRefs[i].repos);

          if (errors.length) {
            brokenProjectorNames.add(projectorName);
            report.push(...errors);
          }

          /** Update ledger. */
          const { aggregateId, version } = events[events.length - 1];
          this.ledger[projectorName].status = brokenProjectorNames.has(projectorName) ? 'BROKEN' : 'PLAYING';
          this.ledger[projectorName].lastEventId = { aggregateId, version };

          /** Next chunk. */
          events = await this.eventStore.getAll(events[events.length - 1].id + 1, this.chunkSize);
        }
      } catch (error) {
        /**
         * Mark failed projector as BROKEN but allow other to go on.
         *
         * @todo ----> Consider what happens when a projector throws on play
         *       and the process is interrupted before it can be marked as broken
         *       and its cursor persisted
         */
        console.log('Projectionist.catchUp : failed', projectorName, error);
        this.ledger[projectorName].status = 'BROKEN';
        this.ledger[projectorName].lastEventId = this.projectorRefs[i].projector.getLastEventId();
        this.ledgerStore.put(this.ledger[projectorName]);
      }
      console.log('Projectionist.catchUp : done with', projectorName);
    }

    console.groupEnd();

    await this.persistLedger();
    return report;
  }

  /**
   * Catch-up all projectors then register all PLAYING projectors them with given publisher.
   *
   * @note Exceptions from catchUp are neither handled or swallowed here.
   *       PLAYING projectors are registered no matter what.
   */
  async play(publisher: Publisher<TDomainEvent>): Promise<ProjectorError<TDomainEvent>[]> {
    try {
      return await this.catchUp();
    } finally {
      await this.register(publisher);
    }
  }

  /**
   * Play freshly merged events.
   *
   * @note This is a stopgap until something like merge/conflict resolution events are
   *       introduced.
   *
   * @todo Consider that this probably require some form of lock !!!
   *
   * @todo Consider using an ad-hoc publisher, registering projectors, emitting
   *       all events then discarding ad-hoc publisher. This is probably one of
   *       the multiple necessary steps to allow proper cross-aggregate reference
   *       ( but not the only one since you also need to guarantee causal order
   *        of events holding cross aggregate refs, see tla specs notes).
   *
   *       There would still NOT be any order guarantee for multiple projectors
   *       handling the same event.
   *
   * @todo Consider if caching the ad-hoc publisher is worth the trouble, especially
   *       wrt the ease of starting fresh versus registering/unregistering projector
   *       changing status.
   */
  async playMerged(events: TDomainEvent[]): Promise<ProjectorError<TDomainEvent>[]> {
    if (!events.length) {
      return [];
    }

    console.warn('Projectionist.playMerged : starting...');
    const t0 = performance.now();

    /** Create an ad-hoc publisher-like event player. */
    const eventPlayer = new Player<TDomainEvent>();

    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
      if (this.ledger[projectorName].status === 'PLAYING') {
        projectorRef.projector.registerPlayer(eventPlayer, projectorRef.repos, projectorName);
      }
    });

    let report: ProjectorError<TDomainEvent>[] = [];

    // try {
    /** Play merged events. */
    report = await eventPlayer.emit(events);
    const brokenProjectorNames = new Set(report.map((r) => r.projectorName));

    /** Update ledger. */
    const { aggregateId, version } = events[events.length - 1];

    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
      this.ledger[projectorName].status = brokenProjectorNames.has(projectorName) ? 'BROKEN' : 'PLAYING';
      this.ledger[projectorName].lastEventId = { aggregateId, version };
    });
    // } catch (error) {
    //   /**
    //    * @todo Repurpose or discard this.
    //    */
    //   this.projectorRefs.forEach((projectorRef) => {
    //     const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
    //     console.log('lastEventId: ', projectorRef.projector.getLastEventId());
    //     this.ledger[projectorName] = {
    //       name: projectorName,
    //       status: 'BROKEN',
    //       lastEventId: projectorRef.projector.getLastEventId(),
    //     };
    //   });

    //   const brokenProjectors = Object.values<ProjectorCursor>(this.ledger).filter((cursor) => cursor.status === 'BROKEN');
    //   await this.persistLedger();
    //   throw new BrokenProjectorError(
    //     brokenProjectors,
    //     error instanceof Error ? error.name : typeof error,
    //     error instanceof Error ? error.message : '' + error
    //   );
    // }

    /** @todo Consider this.bumpLedgerTo(events[events.length -1 ]); */
    await this.persistLedger();
    const t1 = performance.now();
    console.warn(`Projectionist.playMerged : done in ${t1 - t0} milliseconds.`);
    return report;
  }

  /**
   * @note Keep this as baseline to compare efficiency vs using ad-hoc publisher-like
   *       Player
   */
  //   async playMergedSequential(events: TDomainEvent[]): Promise<void> {
  //     if (!events.length) {
  //       return;
  //     }

  //     if (!this.ledger) {
  //       this.ledger = await this._fetchLedger();
  //     }

  //     console.warn('Projectionist.playMergedSequential : starting...');
  //     const t0 = performance.now();

  //     /** Process projectors sequentially. */
  //     for (
  //       let i = 0,
  //         projectorNames = Object.keys(this.projectorRefs) as Array<keyof TConfig['projectors']>,
  //         length = projectorNames.length;
  //       i < length;
  //       ++i
  //     ) {
  //       const projectorName = projectorNames[i];

  //       console.log('Projectionist.playMerged : starting', projectorName);
  //       //   let lastEventId = this.ledger[projectorName].lastEventId;

  //       if (this.ledger[projectorName].status === 'PLAYING') {
  //         try {
  //           //   console.log('Projectionist.playMerged', events.length); //, events);
  //           /** Play retrieved events with the projector. */

  //           await this.projectorRefs[projectorName].projector.play(events, this.projectorRefs[projectorName].repos);

  //           /** Update ledger. */
  //           const { aggregateId, version } = events[events.length - 1];
  //           this.ledger[projectorName].status = 'PLAYING';
  //           this.ledger[projectorName].lastEventId = { aggregateId, version };
  //         } catch (error) {
  //           /** Mark failed projector as BROKEN but allow other to go on. */
  //           console.log('Projectionist.playMerged : failed', projectorName, error);
  //           this.ledger[projectorName].status = 'BROKEN';
  //           this.ledger[projectorName].lastEventId = this.projectorRefs[projectorName].projector.getLastEventId();
  //           this.ledgerStore.put(this.ledger[projectorName]);
  //         }
  //       }
  //     }

  //     await this.persistLedger();
  //     this.reportBroken();
  //     const t1 = performance.now();
  //     console.warn(`Projectionist.playMergedSequential : done in ${t1 - t0} milliseconds.`);
  //   }

  /**
   * @todo Consider what is safer and cover mores scenarios between :
   *
   *         - clear ledger first, projections may not get cleared and further events
   *           get played over whatever state they were left in.
   *
   *         - clear projections first, ledger may not be updated and projections
   *           are not caught up to latest event before new events get played.
   */
  async replay(publisher: Publisher<TDomainEvent>): Promise<void> {
    this.resetLedger();
    await this.persistLedger();

    await Promise.all(
      Object.values(this.projectorRefs).map(async (projectorRef) => {
        await projectorRef.clearProjectionFn(projectorRef.repos);
      })
    );
    await this.play(publisher);
  }

  /**
   * Register all non-STOPPED projectors with publisher.
   *
   * @note Experimental : BROKEN projector are allowed to go on as well. Errors
   *       during play/merge are reported with a lot more infos.
   *
   * @todo Consider if the extra write to the ledger store affects read-after-write
   *       and also if it is worth worrying about performance wise.
   *
   * @todo --->>> Consider a 'afterAny' or using a 'done' promise returned by the publisher
   *       for a more optimistic setup relying even more on projector idempotence where
   *       projector are registered individually and all cursor of the ledger are updated
   *       at once.
   *       This setup makes should make eventProcessedCount and position more easily
   *       avalaible when updating the ledger.
   *
   * @todo Test Projectionist with an IdbDomainEventStore and figure out if '*'
   *       can be done after 'topic' handlers have started firing. If it's bad ?
   *       If it does need the workaround 'done' in emit ?
   *
   * @todo Consider emitting a SYNC_DONE event vs calling catchUp when SyncService
   *       posts a message. Does a SYNC_DONE event make sense ? Is it useful to know
   *       about this on other devices ?
   *
   * @todo Consider pausing during catchUp, playMerged, etc ... and then catchUp
   *       before resuming.
   */
  private async register(publisher: Publisher<TDomainEvent>) {
    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
      if (this.ledger[projectorName].status !== 'STOPPED') {
        projectorRef.projector.register(publisher, projectorRef.repos);
      }
    });

    if (this.reactorRefs) {
      this.reactorRefs.forEach((reactorRef) => {
        reactorRef.reactor.register(publisher, reactorRef.repos);
      });
    }

    /** Register ledger update with given publisher. */
    publisher.afterAny((event) => {
      this.bumpLedgerTo(event);
      this.persistLedger();
    });
  }

  /**
   * Bump all non-STOPPED projector cursor to given event.
   */
  private bumpLedgerTo({ aggregateId, version }: TDomainEvent) {
    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
      if (this.ledger[projectorName].status !== 'STOPPED') {
        this.ledger[projectorName].lastEventId = { aggregateId, version };
      }
    });
  }

  /**
   *
   */
  private resetLedger() {
    if (!this.ledger) {
      this.ledger = {} as Record<TConfig['projectors'][number]['projector']['name'], ProjectorCursor>;
    }

    this.projectorRefs.forEach((projectorRef) => {
      const projectorName: keyof typeof this.ledger = projectorRef.projector.name;
      this.ledger[projectorName] = {
        name: projectorName as string,
        status: 'PLAYING',
        lastEventId: null,
      };
    });
  }

  /**
   * @todo Consider that if ledgerStore.put is a transaction, either all cursors
   *       get updated or none get update.
   */
  private async persistLedger() {
    this.ledgerStore.put(Object.values(this.ledger));
  }
}

/**
 *
 */
export class BrokenProjectorError<TDomainEvent extends DomainEventShape = DomainEvent> extends Error {
  /**
   *
   */
  constructor(public readonly report: ProjectorError<TDomainEvent>[]) {
    const message = report.reduce<string>(
      (msg, projectorError) => `${msg}\t${projectorError.projectorName} on event type ${projectorError.event.type}\n`,
      'Some projector(s) failed :\n'
    );
    super(message);
    this.name = 'BrokenProjectorError';
  }
}

/**
 *
 */
export type PlayerSubscriber<TEvent extends DomainEventShape> = [
  subscriber: (event: TEvent) => Promise<void>,
  projectorName: string
];

/**
 *
 */
export class Player<TDomainEvent extends DomainEventShape = DomainEvent> {
  private subs: {
    [TdomainEventType in TDomainEvent['type']]?: PlayerSubscriber<TDomainEvent & { type: TdomainEventType }>[];
  } = {};

  /**
   *
   */
  on<TdomainEventType extends TDomainEvent['type']>(
    eventType: TdomainEventType,
    subscriber: PlayerSubscriber<TDomainEvent & { type: TdomainEventType }>
  ): this {
    (this.subs[eventType] || (this.subs[eventType] = []))?.push(subscriber);
    return this;
  }

  /**
   *
   */
  async emit(events: TDomainEvent[]): Promise<ProjectorError<TDomainEvent>[]> {
    const errors: ProjectorError<TDomainEvent>[] = [];

    for (let i = 0, length = events.length; i < length; ++i) {
      const event = events[i];
      await Promise.allSettled(
        this.subs[event.type as TDomainEvent['type']]?.map((subscriber) =>
          subscriber[0](event).catch((error) => {
            errors.push({
              error,
              projectorName: subscriber[1],
              event,
            });
          })
        ) ?? []
      );
    }
    return errors;
  }
}
