返回顶部
g

go-concurrencyGo并发模式

Production Go concurrency patterns — goroutines, channels, sync primitives, context, worker pools, pipelines, and graceful shutdown. Use when building concurrent Go applications or debugging race conditions.

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
816
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

go-concurrency

Go 并发模式

Go 并发生产模式,包括 goroutine、通道、同步原语和上下文管理。

使用场景

  • - 构建并发 Go 应用程序
  • 实现工作池和管道
  • 管理 goroutine 生命周期和取消
  • 调试竞态条件
  • 实现优雅关闭

并发原语

原语用途使用时机
goroutine轻量级并发执行任何并发工作
channel
goroutine 间通信 | 传递数据、信号通知 | | select | 多路通道操作复用 | 等待多个通道 | | sync.Mutex | 互斥锁 | 保护共享状态 | | sync.WaitGroup | 等待 goroutine 完成 | 协调 goroutine 完成 | | context.Context | 取消和超时 | 请求级生命周期管理 | | errgroup.Group | 带错误的并发任务 | 可能失败的并行工作 |

Go 并发格言: 不要通过共享内存来通信;而要通过通信来共享内存。

快速开始

go
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

results := make(chan string, 10)
var wg sync.WaitGroup

for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case results <- fmt.Sprintf(Worker %d done, id):
}
}(i)
}

go func() { wg.Wait(); close(results) }()

for result := range results {
fmt.Println(result)
}
}

模式 1:工作池

go
type Job struct {
ID int
Data string
}

type Result struct {
JobID int
Output string
Err error
}

func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup

for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf(Processed: %s, job.Data),
}
}
}
}()
}

go func() { wg.Wait(); close(results) }()
return results
}

// 使用示例
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

jobs := make(chan Job, 100)
go func() {
for i := 0; i < 50; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf(job-%d, i)}
}
close(jobs)
}()

for result := range WorkerPool(ctx, 5, jobs) {
fmt.Printf(Result: %+v\n, result)
}
}

模式 2:扇出/扇入管道

go
// 阶段 1:生成值
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}()
return out
}

// 阶段 2:转换(运行多个实例实现扇出)
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done(): return
case out <- n * n:
}
}
}()
return out
}

// 扇入:将多个通道合并为一个
func merge(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}(ch)
}

go func() { wg.Wait(); close(out) }()
return out
}

// 使用示例:扇出到 3 个平方计算器,扇入结果
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c1 := square(ctx, in)
c2 := square(ctx, in)
c3 := square(ctx, in)

for result := range merge(ctx, c1, c2, c3) {
fmt.Println(result)
}
}

模式 3:带取消的 errgroup

go
import golang.org/x/sync/errgroup

func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))

for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, GET, url, nil)
if err != nil {
return fmt.Errorf(creating request for %s: %w, url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf(fetching %s: %w, url, err)
}
defer resp.Body.Close()
results[i] = fmt.Sprintf(%s: %d, url, resp.StatusCode)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err // 第一个错误通过 ctx 取消所有其他任务
}
return results, nil
}

// 带并发限制
func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // 最大并发 goroutine 数
results := make([]string, len(urls))

for i, url := range urls {
i, url := i, url
g.Go(func() error {
result, err := fetchURL(ctx, url)
if err != nil { return err }
results[i] = result
return nil
})
}

return results, g.Wait()
}

模式 4:有界并发(信号量)

go
import golang.org/x/sync/semaphore

type RateLimitedWorker struct {
sem *semaphore.Weighted
}

func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)}
}

func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)

for _, task := range tasks {
if err := w.sem.Acquire(ctx, 1); err != nil {
return []error{err}
}
wg.Add(1)
go func(t func() error) {
defer wg.Done()
defer w.sem.Release(1)
if err := t(); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(task)
}

wg.Wait()
return errors
}

// 更简单的替代方案:基于通道的信号量
type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore { return make(chan struct{}, n) }
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

模式 5:优雅关闭

go
func main() {
ctx, cancel := context.WithCancel(context.Background())

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

server := NewServer()
server.Start(ctx)

sig := <-sigCh
fmt.Printf(Received signal: %v\n, sig)
cancel() // 取消上下文以停止所有工作器

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 go-concurrency-patterns-1776420048 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 go-concurrency-patterns-1776420048 技能

通过命令行安装

skillhub install go-concurrency-patterns-1776420048

下载

⬇ 下载 go-concurrency v1.0.0(免费)

文件大小: 5.29 KB | 发布时间: 2026-4-17 18:27

v1.0.0 最新 2026-4-17 18:27
Initial release of Go concurrency patterns with practical production examples.

- Introduces patterns for goroutines, channels, synchronization primitives, context management, worker pools, pipelines, and graceful shutdown in Go.
- Provides quick start code and tables for choosing concurrency primitives.
- Includes implementations for worker pools, fan-out/fan-in pipelines, bounded concurrency (semaphores), errgroup for error handling and cancellation, and graceful shutdown.
- Aims to assist with concurrent Go application development and race condition debugging.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部