golang 的 chan select 實在太方便,其實任何提供了協程的語言都能很好且方便的支持 chan 和 select,因爲經常寫 typescript 腳本,於是我把這兩個組件實現到了一個 typescript,你可以直接使用我的庫來得到 chan 和 select,本文後續是實現代碼的分析,你也可以參照分析去任何支持協程的語言中把golang的特性發揚光大/images/emoticon/emoticon08.gif

實現可讀寫的 class RW

緊跟上篇文章 我們已經實現了 Reader 和 Writer 可以用來創建和通知讀寫任務了

但 Reader 只能讀 Writer 只能寫, chan 是可以讀寫的,所以我們現在來創建一個新的 class RW ,它可讀寫並帶有一個可選的讀寫緩衝。

RW 要實現可讀寫直接包含一個 Reader 和 Writer 的實例即可,要實現緩衝直接使用一個數組或鏈表即可,只要能滿足數組先進先出即可

class RW<T>
{
    // 讀寫緩存
    private list: Array<T> | undefined
    // 可取器
    private r_ = new Reader()
    // 寫入器
    private w_ = new Writer()
}

Ring

緩存需要先進先出,數組刪除第一個元素會很慢,鏈表則快很多,但是我們緩存長度是固定的數組可以一次分配足夠內存,鏈表則需要頻繁申請和釋放內存,爲此我使用原始數組實現了一個先進先出的 Ring 隊列來充當緩存,其定義如下

class Ring<T> {
    private offset_ = 0
    private size_ = 0
    constructor(private readonly arrs: Array<T>) {
    }
    get length(): number {
        return this.size_
    }
    get capacity(): number {
        return this.arrs.length
    }
    // 在隊列某位壓入一個數據,如果隊列已慢就返回 false
    push(val: T): boolean {
        const arrs = this.arrs
        const size = this.size_
        if (size == arrs.length) {
            return false
        }
        arrs[(this.offset_ + size) % arrs.length] = val
        this.size_++
        return true
    }
    // 如果隊列不爲空 就將其第一個元素 彈出,否則返回迭代器 {done:true}
    pop(): IteratorResult<T> {
        const size = this.size_
        if (size == 0) {
            return noResult
        }
        const val = this.arrs[this.offset_++]
        if (this.offset_ == this.arrs.length) {
            this.offset_ = 0
        }
        this.size_--
        return {
            value: val,
        }
    }
}

所以我的 RW 定義如下

class RW<T>
{
    // 讀寫緩存
    private list: Ring<T> | undefined
    // 可取器
    private r_ = new Reader()
    // 寫入器
    private w_ = new Writer()
    constructor(buf: number) {
        if (buf > 0) { // 如果構造傳入了緩存大小就創建一個緩存
            this.list = new Ring<T>(new Array<T>(buf))
        }
    }
}

read

我們的 class RW 包含了 Reader Writer 屬性和一個緩存,我們先看下要如何讀取,

  1. 首先我們應該判斷緩存中是否有數據,有就應該立刻將緩存讀出來
  2. 之後我們應該判斷是否已經關閉,已經關閉,就應該立刻返回一個代表關閉的值
  3. 沒有關閉就應該查看有沒有寫入任務(Writer 中記錄),有就應該通知寫入成功並把寫入的值作爲這裏讀取到的值返回給讀取者
  4. 如果也沒有寫入任何就向 Reader 註冊一個讀取任務,用於等待有值可讀時的通知

爲此定義了兩個函數 tryRead 和 read, tryRead 完成了步驟 1,2,3 如果成功了就不必執行 read 了,read 則單獨完成了步驟 4,下面是實現代碼

class RW<T>{
    tryRead(): IteratorResult<any> | undefined {
        const list = this.list
        if (list) { // 緩存存在
            const result = list.pop()
            if (!result.done) { // 緩存中有值,將緩存的值讀出來傳給調用者
                return result
            }
            // 緩存爲空 執行後續讀取代碼
        }

        // 已經關閉 返回一個 undefined 作爲 關閉標記
        if (this.isClosed) {
            return
        }
        // 從 writer 獲取值
        const w = this.w_
        if (w.isEmpty) { // writer 爲空沒有寫入任務
            return noResult //返回讀取失敗
        }
        return {
            value: w.invoke(), // writer 有寫入任務,則讓 writer 完成一個任務並把寫入的值傳遞給 讀取者
        }
    }
    read(callback: ReadCallback): ReadValue {
        // 直接註冊一個讀取任務,所以 read 之前必須要先調用 tryRead 以確定沒有值可讀
        return this.r_.connect(callback)
    }
}

write

我們還是先看下要如何寫入,

  1. 首先我們應該判斷是否已經關閉了,因爲 chan 一旦關閉就不可寫了
  2. 之後我們應該判斷是否有讀取任務,因爲如果有讀取任務則緩存一定爲空(都被讀完了才會有待讀取的任務),有則應該把值寫入給讀取任務
  3. 沒有讀取任務則將值寫入緩存
  4. 最後如果緩存已滿寫不進去,則註冊一個寫入任務等待寫入完成

爲此定義了兩個函數 tryWrite 和 write, tryWrite 完成了步驟 1,2,3 如果成功了就不必執行 write 了,write 則單獨完成了步驟 4,下面是實現代碼

class RW<T>{
    tryWrite(val: T): boolean | undefined {
        // 如果已經關閉 就返回一個 undefined 作爲關閉標記
        if (this.isClosed) {
            return
        }
        const r = this.r_
        if (r.isEmpty) { // 沒有讀取任務
            // 將數據寫入緩存
            return this.list?.push(val) ?? false // 緩存滿了 push 會返回失敗
        }
        // 存在讀取任務,則用值通知 讀取任務完成
        r.invoke({
            value: val,
        })
        return true // 返回寫入成功
    }
    write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {
        // 直接註冊一個寫入任務,所以 write 之前必須要先調用 tryWrite 確定沒有值可寫
        return this.w_.connect(callback, reject, val)
    }
 }

完整 RW

下面是 class RW 的完整代碼

class RW<T>{
    private list: Ring<T> | undefined
    constructor(buf: number) {
        if (buf > 0) {
            this.list = new Ring<T>(new Array<T>(buf))
        }
    }
    private r_ = new Reader()
    private w_ = new Writer()
    tryRead(): IteratorResult<any> | undefined {
        // 讀取緩存
        const list = this.list
        if (list) {
            const result = list.pop()
            if (!result.done) {
                return result
            }
        }

        // 是否關閉
        if (this.isClosed) {
            return
        }
        // 讀取 writer
        const w = this.w_
        if (w.isEmpty) { // 沒有寫入者
            return noResult
        }
        return {
            value: w.invoke(),
        }
    }
    read(callback: ReadCallback): ReadValue {
        // 設置待讀
        return this.r_.connect(callback)
    }
    tryWrite(val: T): boolean | undefined {
        if (this.isClosed) {
            return
        }
        const r = this.r_
        if (r.isEmpty) { // 沒有讀取者
            // 寫入緩存
            return this.list?.push(val) ?? false
        }
        r.invoke({
            value: val,
        })
        return true
    }
    write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {
        // 設置待寫
        return this.w_.connect(callback, reject, val)
    }
    close(): boolean {
        if (this.isClosed) {
            return false
        }
        this.isClosed = true
        this.w_.close()
        this.r_.close()
        const closed = this.closed_
        if (closed) {
            this.closed_ = undefined
            closed.resolve()
        }
        return true
    }
    wait(): undefined | Promise<void> {
        if (this.isClosed) {
            return
        }
        let closed = this.closed_
        if (closed) {
            return closed.promise
        }
        closed = new Completer<void>()
        this.closed_ = closed
        return closed.promise
    }
    private closed_: Completer<void> | undefined
    isClosed = false
    get length(): number {
        return this.list?.length ?? 0
    }
    get capacity(): number {
        return this.list?.capacity ?? 0
    }
}
實現 chan

到此我們其實就已經實現好了 chan 的功能,只是現在是 RW 用回調函數來通知 寫入和讀取完成,對於支持協程的語言我們只需要 使用一個 class Chan 來包裝下然後以 協程 的形式來等待而非回調函數來等待通知即可,因爲很簡單基本上是對 RW 的調用所以直接貼關鍵代碼了

export class Chan<T> implements ReadChannel<T>, WriteChannel<T> {
    // 存儲了底層的讀寫實現
    private rw_: RW<T>
    get rw(): RW<T> {
        return this.rw_
    }
    constructor(buf = 0) {
        // 依據參數創建是否帶緩存的底層讀寫器
        this.rw_ = new RW<T>(Math.floor(buf))
    }
    // golang <-chan 的 chan 讀取實現
    read(): IteratorResult<T> | Promise<IteratorResult<T>> {
        const rw = this.rw_
        const val = rw.tryRead()
        if (val === undefined) {
            // chan 已經關閉
            return noResult
        } else if (!val.done) {
            // 返回讀取到的值
            return val
        }
        // 使用 js 的 Promise 來等待
        return new Promise((resolve) => {
            rw.read(resolve) // 通知 Promise 完成
        })
    }
    // golang 沒有直接提供這個函數但可以使用 select default 來實現,這裏直接提供了
    tryRead(): IteratorResult<T> | undefined {
        const rw = this.rw_
        const val = rw.tryRead()
        if (val === undefined) {
            // chan 已經關閉
            return noResult
        } else if (!val.done) {
            // 返回讀取到的值
            return val
        }
        return undefined
    }
    // golang chan<- 的 chan 寫入實現
    write(val: T, exception?: boolean): boolean | Promise<boolean> {
        const rw = this.rw_
        const result = rw.tryWrite(val)
        if (result === undefined) {
            // chan 已經關閉
            if (exception) { // 依據調用參數(使用者自己決定) 是要拋出異常還是返回 false
                throw errChannelClosed
            }
            return false
        } else if (result) {
            // 寫入 chan 成功
            return true
        }
        // 使用 js 的 Promise 來等待
        return new Promise((resolve, reject) => {
            rw.write(resolve, exception ? reject : undefined, val) // 通知 Promise 成功或失敗
        })
    }
    //  golang 沒有直接提供這個函數但可以使用 select default 來實現,這裏直接提供了
    tryWrite(val: T, exception?: boolean): boolean {
        const rw = this.rw_
        const result = rw.tryWrite(val)
        if (result === undefined) {
            // chan 已經關閉
            if (exception) {
                throw errChannelClosed
            }
            return false
        } else if (result) {
            // 寫入 chan 成功
            return true
        }
        // 目前不可寫
        return false
    }
    // 這個返回已經緩存了多少個數據
    get length(): number {
        return this.rw.length
    }
    // 這裏返回緩衝區容量
    get capacity(): number {
        return this.rw.capacity
    }
    // 這裏實現 js 的迭代器,就可以使用 原生的 await for of 來讀取 chan 了
    async *[Symbol.asyncIterator](): AsyncGenerator<T> {
        while (true) {
            const val = await this.read()
            if (val.done) {
                break
            }
            yield val.value
        }
    }
}
實現 select

所有工具都其實都已經完備了,下篇文章將分析如何實現 select 來等待多個 chan