All files channel.ts

68.53% Statements 61/89
82.35% Branches 14/17
75% Functions 6/8
68.53% Lines 61/89

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 901x 1x 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 5x 1x 1x 4x 5x                     5x 4x 4x 4x 5x 4x 4x 5x 1x 1x 4x 4x 4x 4x 4x 2x 2x 4x 4x 4x             4x 4x 7x 4x 4x     4x 7x 4x 4x     4x 4x                 4x 1x 1x 1x 2x 2x 2x 1x  
export enum ChannelState {
    EMPTY = 0,
    RECEIVER,
    DATA,
    CLOSED,
}
 
export class Channel<T> implements AsyncIterable<T> {
    private state: ChannelState = ChannelState.EMPTY;
    private dataQueue: T[] = [];
    private receiverQueue: ((value: T) => void)[] = [];
 
    get State(): ChannelState {
        return this.state;
    }
 
    send(data: T): void {
        if (this.state === ChannelState.CLOSED) {
            throw new ChannelError("Channel is closed");
        }
 
        if (this.state === ChannelState.RECEIVER) {
            const receiver = this.receiverQueue.shift();
            if (receiver) {
                receiver(data);
                if (this.receiverQueue.length === 0) {
                    this.state = ChannelState.EMPTY;
                }
            } else {
                this.dataQueue.push(data);
                this.state = ChannelState.DATA;
            }
        } else {
            this.dataQueue.push(data);
            this.state = ChannelState.DATA;
        }
    }
 
    async receive(): Promise<T> {
        if (this.state === ChannelState.CLOSED) {
            throw new ChannelError("Channel is closed");
        }
 
        if (this.state === ChannelState.DATA) {
            const data = this.dataQueue.shift();
            if (data !== undefined) {
                if (this.dataQueue.length === 0) {
                    this.state = ChannelState.EMPTY;
                }
                return data;
            }
        }

        return new Promise<T>((resolve, reject) => {
            this.receiverQueue.push(resolve);
            this.state = ChannelState.RECEIVER;
        });
    }
 
    close(): void {
        if (this.state !== ChannelState.CLOSED) {
            this.state = ChannelState.CLOSED;
            while (this.receiverQueue.length > 0) {
                this.receiverQueue.shift();
            }
        }
    }
 
    [Symbol.asyncIterator](): AsyncIterator<T> {
        return this;
    }
 
    async next(): Promise<IteratorResult<T, any>> {
        try {
            const value = await this.receive();
            return { value, done: false };
        } catch {
            this.close();
            return { value: undefined, done: true };
        }
    }
}
 
class ChannelError extends Error {
    constructor(message: string) {
        super(message);
        this.name = "ChannelError";
    }
}