Production Go concurrency patterns — goroutines, channels, sync primitives, context, worker pools, pipelines, and graceful shutdown. Use when building concurrent Go applications or debugging race conditions.
Go 并发生产模式,包括 goroutine、通道、同步原语和上下文管理。
| 原语 | 用途 | 使用时机 |
|---|---|---|
| goroutine | 轻量级并发执行 | 任何并发工作 |
| channel |
Go 并发格言: 不要通过共享内存来通信;而要通过通信来共享内存。
go
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
results := make(chan string, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case results <- fmt.Sprintf(Worker %d done, id):
}
}(i)
}
go func() { wg.Wait(); close(results) }()
for result := range results {
fmt.Println(result)
}
}
go
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf(Processed: %s, job.Data),
}
}
}
}()
}
go func() { wg.Wait(); close(results) }()
return results
}
// 使用示例
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan Job, 100)
go func() {
for i := 0; i < 50; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf(job-%d, i)}
}
close(jobs)
}()
for result := range WorkerPool(ctx, 5, jobs) {
fmt.Printf(Result: %+v\n, result)
}
}
go
// 阶段 1:生成值
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}()
return out
}
// 阶段 2:转换(运行多个实例实现扇出)
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done(): return
case out <- n * n:
}
}
}()
return out
}
// 扇入:将多个通道合并为一个
func merge(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}(ch)
}
go func() { wg.Wait(); close(out) }()
return out
}
// 使用示例:扇出到 3 个平方计算器,扇入结果
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c1 := square(ctx, in)
c2 := square(ctx, in)
c3 := square(ctx, in)
for result := range merge(ctx, c1, c2, c3) {
fmt.Println(result)
}
}
go
import golang.org/x/sync/errgroup
func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, GET, url, nil)
if err != nil {
return fmt.Errorf(creating request for %s: %w, url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf(fetching %s: %w, url, err)
}
defer resp.Body.Close()
results[i] = fmt.Sprintf(%s: %d, url, resp.StatusCode)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // 第一个错误通过 ctx 取消所有其他任务
}
return results, nil
}
// 带并发限制
func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // 最大并发 goroutine 数
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
result, err := fetchURL(ctx, url)
if err != nil { return err }
results[i] = result
return nil
})
}
return results, g.Wait()
}
go
import golang.org/x/sync/semaphore
type RateLimitedWorker struct {
sem *semaphore.Weighted
}
func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)}
}
func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, task := range tasks {
if err := w.sem.Acquire(ctx, 1); err != nil {
return []error{err}
}
wg.Add(1)
go func(t func() error) {
defer wg.Done()
defer w.sem.Release(1)
if err := t(); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(task)
}
wg.Wait()
return errors
}
// 更简单的替代方案:基于通道的信号量
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore { return make(chan struct{}, n) }
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
go
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
server := NewServer()
server.Start(ctx)
sig := <-sigCh
fmt.Printf(Received signal: %v\n, sig)
cancel() // 取消上下文以停止所有工作器
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 go-concurrency-patterns-1776420048 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 go-concurrency-patterns-1776420048 技能
skillhub install go-concurrency-patterns-1776420048
文件大小: 5.29 KB | 发布时间: 2026-4-17 18:27