import throttle from 'lodash/throttle';

// Abstraction for a batch handler, something that accumulates writes in a buffer and flushes batches when instructed
export class BatchWriter<T, U> {
  protected readonly buffer: Array<
    [chunk: T | undefined, resolve: (result: U) => void, reject: (reason: unknown) => void]
  > = [];

  constructor(private readonly consume: (batch: Array<T>) => Promise<U>) {}

  public async write(item: T) {
    return new Promise<U>((resolve, reject) => this.buffer.push([item, resolve, reject]));
  }

  public async flush() {
    const [batch, resolves, rejects] = this.buffer
      .splice(0) // note: synchronously remove from buffer before async processing to avoid dupes
      .reduce<
        [
          items: Array<T | undefined>,
          resolvers: Array<(result: U) => void>,
          rejecters: Array<(reason: unknown) => void>,
        ]
      >(
        ([items, resolvers, rejecters], [item, resolve, reject]) => [
          [...items, item],
          [...resolvers, resolve],
          [...rejecters, reject],
        ],
        [[], [], []],
      );

    if (batch.length) {
      const batchValues: Array<T> = batch.filter((item) => item !== undefined) as Array<T>;
      try {
        const result = await this.consume(batchValues);
        resolves.forEach((resolve) => resolve(result));
      } catch (err) {
        rejects.forEach((reject) => reject(err));
        throw err;
      }
    }
  }

  public getBufferSize() {
    return this.buffer.length;
  }
}

// Throttled flush with every write, yields batched flushes on `interval` ms
export class ThrottledBatchWriter<T, U> extends BatchWriter<T, U> {
  private readonly throttledFlush: () => void;

  constructor(consume: (batch: Array<T>) => Promise<U>, interval: number, onError?: (reason: unknown) => void) {
    super(consume);

    // Note: { leading: false } ensures underlying flush method is only called at the end of `intervalMs`. This
    // ensures the first execution of flush queues up a call to the underlying flush method instead of calling it
    // immediately
    this.throttledFlush = throttle(() => this.flush().catch(onError), interval, { leading: false });
  }

  public async write(item: T) {
    const promise = super.write(item);
    this.throttledFlush();
    return promise;
  }
}

export class MaxLengthThrottledBatchWriter<T, U> extends ThrottledBatchWriter<T, U | undefined> {
  constructor(
    consume: (batch: Array<T>) => Promise<U>,
    interval: number,
    private readonly maxBufferLength: number,
    onError?: (reason: unknown) => void,
  ) {
    super(consume, interval, onError);
    this.maxBufferLength = maxBufferLength;
  }

  public async write(item: T) {
    if (this.buffer.length < this.maxBufferLength) {
      return super.write(item);
    }
  }
}
