用信号量限制并发

当执行CPU密集型的任务(如调整图像大小)时,创建比CPU核数更多的协程毫无意义。

事情并不会进行的更快,甚至可能损失部分性能,由于维护额外的状态记录和协程切换开销。

一种限制并发性(例如同时启动的协程数量)的方法是使用信号量(semaphore)。

您可以进入(获取)和离开(释放)信号量。

信号量对象具有固定容量大小,如果您超过它的容量,获取操作将被阻塞,直到释放操作释放它。

有缓冲的数据通道(buffered channel)天然就是信号量。

下面是使用数据通道作为信号量在给定时间限制激活的协程数量的例子:

package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	semaphoreSize      = 4
	mu                 sync.Mutex
	totalTasks         int
	curConcurrentTasks int
	maxConcurrentTasks int
)

func timeConsumingTask() {
	mu.Lock()
	totalTasks++
	curConcurrentTasks++
	if curConcurrentTasks > maxConcurrentTasks {
		maxConcurrentTasks = curConcurrentTasks
	}
	mu.Unlock()

	// in real system this would be a CPU intensive operation
	time.Sleep(10 * time.Millisecond)

	mu.Lock()
	curConcurrentTasks--
	mu.Unlock()
}

func main() {
	sem := make(chan struct{}, semaphoreSize)
	var wg sync.WaitGroup
	for i := 0; i < 32; i++ {
		// acquire semaphore
		sem <- struct{}{}
		wg.Add(1)

		go func() {
			timeConsumingTask()
			// release semaphore
			<-sem
			wg.Done()
		}()
	}

	// wait for all task to finish
	wg.Wait()

	fmt.Printf("total tasks         : %d\n", totalTasks)
	fmt.Printf("max concurrent tasks: %d\n", maxConcurrentTasks)
}

我们使用了等待协程结束中描述的技术来等待所有任务完成。

通常并发任务正确的数量应该等于CPU核心数量,该值可以通过 runtime.NumCPU()取得。

最后更新于