fix(classes): cleanup resources, add cancellable timeouts, and fix bugs in several core utility classes
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/lik',
|
||||
version: '6.3.0',
|
||||
version: '6.3.1',
|
||||
description: 'Provides a collection of lightweight helpers and utilities for Node.js projects.'
|
||||
}
|
||||
|
||||
@@ -97,13 +97,20 @@ export class AsyncExecutionStack {
|
||||
private async executeExclusiveSlot(slot: IExecutionSlot<any>) {
|
||||
try {
|
||||
if (slot.timeout) {
|
||||
const result = await Promise.race([
|
||||
slot.funcToExecute(),
|
||||
plugins.smartdelay.delayFor(slot.timeout).then(() => {
|
||||
throw new Error('Timeout reached');
|
||||
}),
|
||||
]);
|
||||
slot.executionDeferred.resolve(result);
|
||||
const timeoutInstance = new plugins.smartdelay.Timeout(slot.timeout);
|
||||
try {
|
||||
const result = await Promise.race([
|
||||
slot.funcToExecute(),
|
||||
timeoutInstance.promise.then(() => {
|
||||
throw new Error('Timeout reached');
|
||||
}),
|
||||
]);
|
||||
timeoutInstance.cancel();
|
||||
slot.executionDeferred.resolve(result);
|
||||
} catch (error) {
|
||||
timeoutInstance.cancel();
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
const result = await slot.funcToExecute();
|
||||
slot.executionDeferred.resolve(result);
|
||||
@@ -120,11 +127,18 @@ export class AsyncExecutionStack {
|
||||
try {
|
||||
// execute with optional timeout
|
||||
if (slot.timeout) {
|
||||
const result = await Promise.race([
|
||||
slot.funcToExecute(),
|
||||
plugins.smartdelay.delayFor(slot.timeout).then(() => { throw new Error('Timeout reached'); }),
|
||||
]);
|
||||
slot.executionDeferred.resolve(result);
|
||||
const timeoutInstance = new plugins.smartdelay.Timeout(slot.timeout);
|
||||
try {
|
||||
const result = await Promise.race([
|
||||
slot.funcToExecute(),
|
||||
timeoutInstance.promise.then(() => { throw new Error('Timeout reached'); }),
|
||||
]);
|
||||
timeoutInstance.cancel();
|
||||
slot.executionDeferred.resolve(result);
|
||||
} catch (error) {
|
||||
timeoutInstance.cancel();
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
const result = await slot.funcToExecute();
|
||||
slot.executionDeferred.resolve(result);
|
||||
|
||||
@@ -5,6 +5,7 @@ export class BackpressuredArray<T> {
|
||||
private highWaterMark: number;
|
||||
public hasSpace = new plugins.smartrx.rxjs.Subject<'hasSpace'>();
|
||||
private itemsAvailable = new plugins.smartrx.rxjs.Subject<'itemsAvailable'>();
|
||||
private isDestroyed = false;
|
||||
|
||||
constructor(highWaterMark: number = 16) {
|
||||
this.data = [];
|
||||
@@ -14,7 +15,7 @@ export class BackpressuredArray<T> {
|
||||
push(item: T): boolean {
|
||||
this.data.push(item);
|
||||
this.itemsAvailable.next('itemsAvailable');
|
||||
|
||||
|
||||
const spaceAvailable = this.checkSpaceAvailable();
|
||||
if (spaceAvailable) {
|
||||
this.hasSpace.next('hasSpace');
|
||||
@@ -40,12 +41,17 @@ export class BackpressuredArray<T> {
|
||||
|
||||
waitForSpace(): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
if (this.checkSpaceAvailable()) {
|
||||
if (this.checkSpaceAvailable() || this.isDestroyed) {
|
||||
resolve();
|
||||
} else {
|
||||
const subscription = this.hasSpace.subscribe(() => {
|
||||
subscription.unsubscribe();
|
||||
resolve();
|
||||
const subscription = this.hasSpace.subscribe({
|
||||
next: () => {
|
||||
subscription.unsubscribe();
|
||||
resolve();
|
||||
},
|
||||
complete: () => {
|
||||
resolve();
|
||||
},
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -53,14 +59,28 @@ export class BackpressuredArray<T> {
|
||||
|
||||
waitForItems(): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
if (this.data.length > 0) {
|
||||
if (this.data.length > 0 || this.isDestroyed) {
|
||||
resolve();
|
||||
} else {
|
||||
const subscription = this.itemsAvailable.subscribe(() => {
|
||||
subscription.unsubscribe();
|
||||
resolve();
|
||||
const subscription = this.itemsAvailable.subscribe({
|
||||
next: () => {
|
||||
subscription.unsubscribe();
|
||||
resolve();
|
||||
},
|
||||
complete: () => {
|
||||
resolve();
|
||||
},
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* destroys the BackpressuredArray, completing all subjects
|
||||
*/
|
||||
public destroy() {
|
||||
this.isDestroyed = true;
|
||||
this.hasSpace.complete();
|
||||
this.itemsAvailable.complete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,12 +15,18 @@ export class Interest<DTInterestId, DTInterestFullfillment> {
|
||||
public comparisonFunc: IInterestComparisonFunc<DTInterestId>;
|
||||
public destructionTimer = new plugins.smarttime.Timer(10000);
|
||||
public isFullfilled = false;
|
||||
private isDestroyed = false;
|
||||
|
||||
/**
|
||||
* a generic store to store objects in that are needed for fullfillment;
|
||||
*/
|
||||
public fullfillmentStore: any[] = [];
|
||||
|
||||
/**
|
||||
* a cancellable timeout for the markLostAfterDefault feature
|
||||
*/
|
||||
private markLostTimeout: InstanceType<typeof plugins.smartdelay.Timeout> | null = null;
|
||||
|
||||
/**
|
||||
* quick access to a string that makes the interest comparable for checking for similar interests
|
||||
*/
|
||||
@@ -39,12 +45,9 @@ export class Interest<DTInterestId, DTInterestFullfillment> {
|
||||
this.isFullfilled = true;
|
||||
this.fullfillmentStore = [];
|
||||
this.interestDeferred.resolve(objectArg);
|
||||
this.destroy(); // Remove from InterestMap immediately after fulfillment
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
constructor(
|
||||
interestMapArg: InterestMap<DTInterestId, DTInterestFullfillment>,
|
||||
interestArg: DTInterestId,
|
||||
@@ -57,10 +60,17 @@ export class Interest<DTInterestId, DTInterestFullfillment> {
|
||||
this.options = optionsArg;
|
||||
|
||||
this.destructionTimer.completed.then(() => {
|
||||
this.destroy();
|
||||
if (!this.isDestroyed) {
|
||||
this.destroy();
|
||||
}
|
||||
});
|
||||
if (this.options?.markLostAfterDefault) {
|
||||
plugins.smartdelay.delayFor(this.options.markLostAfterDefault).then(this.markLost);
|
||||
this.markLostTimeout = new plugins.smartdelay.Timeout(this.options.markLostAfterDefault);
|
||||
this.markLostTimeout.promise.then(() => {
|
||||
if (!this.isDestroyed) {
|
||||
this.markLost();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,9 +82,28 @@ export class Interest<DTInterestId, DTInterestFullfillment> {
|
||||
* self destructs the interest
|
||||
*/
|
||||
public destroy() {
|
||||
if (this.isDestroyed) {
|
||||
return;
|
||||
}
|
||||
this.isDestroyed = true;
|
||||
|
||||
// Cancel timers to release references
|
||||
this.destructionTimer.reset();
|
||||
if (this.markLostTimeout) {
|
||||
this.markLostTimeout.cancel();
|
||||
this.markLostTimeout = null;
|
||||
}
|
||||
|
||||
// Clear the fulfillment store
|
||||
this.fullfillmentStore = [];
|
||||
|
||||
// Remove from the InterestMap
|
||||
this.interestMapRef.removeInterest(this);
|
||||
if (!this.isFullfilled && this.options.defaultFullfillment) {
|
||||
this.fullfillInterest(this.options.defaultFullfillment);
|
||||
|
||||
// Fulfill with default if not yet fulfilled (inlined to avoid mutual recursion)
|
||||
if (!this.isFullfilled && this.options?.defaultFullfillment) {
|
||||
this.isFullfilled = true;
|
||||
this.interestDeferred.resolve(this.options.defaultFullfillment);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,6 +70,8 @@ export class InterestMap<DTInterestId, DTInterestFullfillment> {
|
||||
if (!returnInterest) {
|
||||
returnInterest = newInterest;
|
||||
this.interestObjectMap.add(returnInterest);
|
||||
} else {
|
||||
newInterest.destroy(); // clean up abandoned Interest's timers
|
||||
}
|
||||
this.interestObservable.push(returnInterest);
|
||||
return returnInterest;
|
||||
@@ -131,4 +133,16 @@ export class InterestMap<DTInterestId, DTInterestFullfillment> {
|
||||
});
|
||||
return interest; // if an interest is found, the interest is returned, otherwise interest is null
|
||||
}
|
||||
|
||||
/**
|
||||
* destroys the InterestMap and cleans up all resources
|
||||
*/
|
||||
public destroy() {
|
||||
const interests = this.interestObjectMap.getArray();
|
||||
for (const interest of interests) {
|
||||
interest.destroy();
|
||||
}
|
||||
this.interestObjectMap.wipe();
|
||||
this.interestObservable.signalComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,4 +20,18 @@ export class LoopTracker<T> {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* resets the loop tracker, clearing all tracked objects
|
||||
*/
|
||||
public reset() {
|
||||
this.referenceObjectMap.wipe();
|
||||
}
|
||||
|
||||
/**
|
||||
* destroys the loop tracker and its underlying ObjectMap
|
||||
*/
|
||||
public destroy() {
|
||||
this.referenceObjectMap.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,8 +62,15 @@ export class ObjectMap<T> {
|
||||
* remove key
|
||||
* @param functionArg
|
||||
*/
|
||||
public removeMappedUnique(uniqueKey: string) {
|
||||
const object = this.getMappedUnique(uniqueKey);
|
||||
public removeMappedUnique(uniqueKey: string): T {
|
||||
const object = this.fastMap.removeFromMap(uniqueKey);
|
||||
if (object !== undefined) {
|
||||
this.eventSubject.next({
|
||||
operation: 'remove',
|
||||
payload: object,
|
||||
});
|
||||
}
|
||||
return object;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -220,8 +227,13 @@ export class ObjectMap<T> {
|
||||
* wipe Objectmap
|
||||
*/
|
||||
public wipe() {
|
||||
for (const keyArg of this.fastMap.getKeys()) {
|
||||
this.fastMap.removeFromMap(keyArg);
|
||||
const keys = this.fastMap.getKeys();
|
||||
for (const keyArg of keys) {
|
||||
const removedObject = this.fastMap.removeFromMap(keyArg);
|
||||
this.eventSubject.next({
|
||||
operation: 'remove',
|
||||
payload: removedObject,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,4 +255,12 @@ export class ObjectMap<T> {
|
||||
public addAllFromOther(objectMapArg: ObjectMap<T>) {
|
||||
this.fastMap.addAllFromOther(objectMapArg.fastMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* destroys the ObjectMap, completing the eventSubject and clearing all entries
|
||||
*/
|
||||
public destroy() {
|
||||
this.wipe();
|
||||
this.eventSubject.complete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,4 +116,12 @@ export class Stringmap {
|
||||
});
|
||||
this._triggerUntilTrueFunctionArray = filteredArray;
|
||||
}
|
||||
|
||||
/**
|
||||
* destroys the Stringmap, clearing all strings and pending triggers
|
||||
*/
|
||||
public destroy() {
|
||||
this._stringArray = [];
|
||||
this._triggerUntilTrueFunctionArray = [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ export interface ITimedAggregatorOptions<T> {
|
||||
export class TimedAggregtor<T> {
|
||||
public options: ITimedAggregatorOptions<T>;
|
||||
private storageArray: T[] = [];
|
||||
private isStopped = false;
|
||||
|
||||
constructor(optionsArg: ITimedAggregatorOptions<T>) {
|
||||
this.options = optionsArg;
|
||||
@@ -15,9 +16,16 @@ export class TimedAggregtor<T> {
|
||||
|
||||
private aggregationTimer: plugins.smarttime.Timer;
|
||||
private checkAggregationStatus() {
|
||||
if (this.isStopped) {
|
||||
return;
|
||||
}
|
||||
const addAggregationTimer = () => {
|
||||
this.aggregationTimer = new plugins.smarttime.Timer(this.options.aggregationIntervalInMillis);
|
||||
this.aggregationTimer.completed.then(() => {
|
||||
if (this.isStopped) {
|
||||
this.aggregationTimer = null;
|
||||
return;
|
||||
}
|
||||
const aggregateForProcessing = this.storageArray;
|
||||
if (aggregateForProcessing.length === 0) {
|
||||
this.aggregationTimer = null;
|
||||
@@ -35,7 +43,29 @@ export class TimedAggregtor<T> {
|
||||
}
|
||||
|
||||
public add(aggregationArg: T) {
|
||||
if (this.isStopped) {
|
||||
return;
|
||||
}
|
||||
this.storageArray.push(aggregationArg);
|
||||
this.checkAggregationStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* stops the aggregation timer chain
|
||||
* @param flushRemaining if true, calls functionForAggregation with any remaining items
|
||||
*/
|
||||
public stop(flushRemaining: boolean = false) {
|
||||
this.isStopped = true;
|
||||
if (this.aggregationTimer) {
|
||||
this.aggregationTimer.reset();
|
||||
this.aggregationTimer = null;
|
||||
}
|
||||
if (flushRemaining && this.storageArray.length > 0) {
|
||||
const remaining = this.storageArray;
|
||||
this.storageArray = [];
|
||||
this.options.functionForAggregation(remaining);
|
||||
} else {
|
||||
this.storageArray = [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ export class Tree<T> {
|
||||
}
|
||||
|
||||
compareTreePosition(leftArg: T, rightArg: T): number {
|
||||
return this.compareTreePosition(leftArg, rightArg);
|
||||
return this.symbolTree.compareTreePosition(leftArg, rightArg);
|
||||
}
|
||||
|
||||
remove(removeObjectArg: T): T {
|
||||
|
||||
Reference in New Issue
Block a user