import { v4 as uuid } from 'uuid';
import { t3dev } from 't3dev';
import { Mutex } from './mutex';
import { logError } from './utils-extra';
import { asyncSleep } from './utils';
import { Bag } from 'ecs/Bag';

type AsyncTask<Result> = () => Promise<Result>;
type TaskResult<Result> = [index: number, id: string, result: Result];
type WrappedTask<T> = (index: number) => Promise<TaskResult<T>>;
type TaskEnqueueFunc<T> = (task: AsyncTask<T>) => Promise<T>;
type ProcessQueueFunc<T> = () => Promise<T[]>;
type GetAddTaskFunc<T> = (tasks: Bag<Promise<TaskResult<T>>>) => boolean;
type AsyncTaskFunctions<T> = [
  enqueueTask: TaskEnqueueFunc<T>,
  processQueue: ProcessQueueFunc<T>
];

export function AsyncTaskQueue<T>(
  concurrentLimit: number
): AsyncTaskFunctions<T> {
  // current number of running tasks
  let totalRunning = 0;
  // completed tasks
  let completed = 0;
  // tasks awaiting processing
  const pendingTasks: [id: string, task: WrappedTask<T>][] = [];
  // use a mutex to ensure only an explicit number of tasks are running
  // at any given time
  const mutex = new Mutex(concurrentLimit);

  const getAndAddTask: GetAddTaskFunc<T> = (tasks) => {
    const maybeTask = pendingTasks.shift();
    if (!maybeTask) return false;
    const [_id, task] = maybeTask;
    if (task) {
      const t = task(tasks.count);
      tasks.set(tasks.count, t);
      return true;
    }
    return false;
  };

  const processQueue: ProcessQueueFunc<T> = async () => {
    let results: T[] = [];
    let count = 0;
    const tasks = new Bag<Promise<TaskResult<T>>>(pendingTasks.length);

    // double loop so pendingTasks can grow and shrink during processing
    while (pendingTasks.length || tasks.count || totalRunning) {
      while (pendingTasks.length && totalRunning <= concurrentLimit) {
        getAndAddTask(tasks);
      }

      try {
        const spread = [...tasks];
        if (spread.length < 1) {
          break;
        }

        const [index, _id, singleDone] = await Promise.race(spread);
        results.addItem(singleDone);
        tasks.removeAt(index);
      } catch (err) {
        logError(err, 'ASYNC TASK QUEUE PROCESSING ERROR:');
      } finally {
        t3dev().log.log(
          totalRunning,
          'tasks running...',
          completed,
          'tasks completed '
        );
      }
    }

    // now just wait for all the running tasks to resolve
    while (tasks.count || totalRunning) {
      const spread = [...tasks];
      if (spread.length < 1) {
        break;
      }

      const [index, id, singleDone] = await Promise.race(tasks);
      results.addItem(singleDone);
      tasks.removeAt(index);
    }

    t3dev().log.log('total tasks processed:', completed);
    return results;
  };

  /**
   * enqueues the passed function, to be started and called at some
   * point in the future when `processQueue` is called.
   * You should always ensure the promise is only begun inside the
   * passed function and not before, or you defeat the purpose of
   * this async pattern
   */
  const enqueueTask: TaskEnqueueFunc<T> = async (task) => {
    const id = uuid();
    // NOTE: We start out with the returning promise locked and a null result
    // The task wrapper, when it completes, will:
    // 1) set the result
    // 2) unlock/unwait the return promise
    let waiting = true;
    let outterResult: T;
    let running = false;
    // this is the task that will be executed by the queue later
    const taskWrapper: WrappedTask<T> = async (index) => {
      if (running) throw new Error('cannot call task twice');
      running = true;
      const [ticket, _] = mutex.lock();
      const release = await ticket;
      totalRunning++;
      let result!: T;
      try {
        const foo = await task();
        result = foo;
      } catch (err) {
        logError(err, `TASK ${id} FAILED TO PROCESS:`);
      } finally {
        outterResult = result;
        waiting = false;
        totalRunning--;
        completed++;
        release();
        return [index, id, result];
      }
    };
    pendingTasks.push([id, taskWrapper]);
    return new Promise<T>(async (res) => {
      // spinlock
      while (waiting) {
        await asyncSleep(16);
      }
      res(outterResult);
    });
  };
  return [enqueueTask, processQueue]; // as AsyncTaskFunctions;
}
