import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/empty';
import 'rxjs/add/operator/defaultIfEmpty';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/do';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';
import log from '@atlassian/jira-common-util-logging/src/log';
import StateContainer from './state-container';
import { createInjectableSource, type InjectableSource } from './util';

const LOCATION = 'servicedesk.common.epic-util.concurrency-limiter';
const EMPTY_SENTINEL = 'EMPTY_SENTINEL';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Operator = (arg1: Observable<any>, arg2: (arg1?: any) => Observable<any>) => Observable<any>;
const defaultOperator = (
	// eslint-disable-next-line @typescript-eslint/no-explicit-any
	source: Observable<any>,
	// eslint-disable-next-line @typescript-eslint/no-explicit-any
	inner: (arg1?: any) => Observable<any>, // eslint-disable-next-line @typescript-eslint/no-explicit-any
): Observable<any> => source.switchMap(inner);
type InnerObservable<S, T> = (arg1: S) => Observable<T>;

/**
 * Limit concurrent calls to the inner observable using a token-based
 * mechanism.
 *
 * The limit for the number of concurrent calls is `availableTokenCount`.
 *
 * The operator joining the source and inner observable is, by default,
 * `switchMap`.  An optional fourth argument can be used to change this.
 *
 * Beware this has a couple of sharp edges:
 *
 * 1) The first emission from the returned observable is considered as "done"
 *    for the purposes of concurrency limiting, and its corresponding token is
 *    returned to the pool at this point.
 *
 * 2) Returning an empty observable runs the risk of leaking a token when used
 *    with the (default) switchMap operator. Avoid using this with switchMap
 *    in codepaths which may return an empty observable.
 *
 * To guard against the latter, tokens have a `leaseTime` which defaults to 10
 * seconds.  After this time the token is returned to the pool regardless of
 * whether the inner observable has emitted or not.
 *
 * To protect against the former, best practice is to ensure that observable
 * returned by 'inner' emits exactly one value.
 *
 * Because of the sharp edges we'll limit this to servicedesk/common. Once
 * these are fixed or we're comfortable of the benefits, it'd be good to
 * graduate this to the top-level common directory.  There's also potential
 * for a suite of utils like this one that form the foundation of what's good
 * practice around managing requests (e.g., retry).
 */
export const withConcurrencyLimiting = <S, T>(
	source: Observable<S>,
	availableTokenCount: number,
	inner: InnerObservable<S, T>,
	operator: Operator = defaultOperator,
	leaseTime?: number,
): Observable<T> => {
	if (availableTokenCount < 1) {
		throw new Error(`${LOCATION}: availableTokenCount must be at least 1`);
	}

	const { injectableSource$, inject }: InjectableSource<S> = createInjectableSource(source);
	const stateContainer: StateContainer<S> = new StateContainer(
		availableTokenCount,
		// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
		inject as (arg1: S) => void,
		leaseTime,
	);

	const withToken = (v: S) => stateContainer.withToken.call(stateContainer, v);

	return operator(injectableSource$.mergeMap(withToken), ({ returnToken, value }) =>
		inner(value) // eslint-disable-next-line @typescript-eslint/consistent-type-assertions, @typescript-eslint/no-explicit-any
			.defaultIfEmpty(EMPTY_SENTINEL as any)
			.do(returnToken, returnToken, returnToken)
			.catch((err) => {
				log.safeErrorWithoutCustomerData(LOCATION, err);
				return Observable.empty<never>();
			})
			.do((v) => {
				if (v === EMPTY_SENTINEL) {
					log.safeWarnWithoutCustomerData(
						LOCATION,
						'Empty observable detected. Do not use the concurrency-limiter with empty observables.',
					);
				}
			})
			.filter((v) => v !== EMPTY_SENTINEL),
	);
};
