经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
Go并发处理
来源:cnblogs  作者:pebblecome  时间:2021/2/18 15:35:51  对本文有异议

写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池。

代码从网上理解了之后写的。代码实例

简单的介绍:

  首先实现一个Job接口,只要有方法实现了Do方法即可

  定义个分发器结构体,主要是WorkPool线程池,用于存储Worker的JobChannel

  init的时候,先初始化一个JobQueue队列,其他的函数调用这个线程池的时候,把任务放在这个队列即可。

  然后Run的时候,创建多个Worker,起初的时候,woker会把自身的JobChannel先注册到线程池workerPool中

  然后worker.start就是for{select } 阻塞等待JobChannel中的job任务

  此时又启一个go d.Dispatcher() ,将JobQueue中的job任务放在worker的Jobchannle中。这样上面的for{select} 就可以拿到任务去执行

  

注: maxWorkers 是内核CPU数量,本机4核,就是线程池可以放4个JobChannel,所以,在newWorker的时候,就创建了4个Worker来并发的处理job任务。

 

任务处理

  1. package workPool
  2.  
  3. import "fmt"
  4.  
  5. type Worker struct {
  6. WorkerPool chan chan Job
  7. JobChannel chan Job
  8. Quit chan bool
  9. }
  10.  
  11. func NewWorker(workpool chan chan Job) *Worker {
  12. return &Worker{WorkerPool: workpool,JobChannel: make(chan Job),Quit: make(chan bool)}
  13. }
  14.  
  15. func (w *Worker) Start() {
  16. go func() {
  17. for{
  18. w.WorkerPool <-w.JobChannel
  19. select {
  20. case job := <-w.JobChannel:
  21. if err := job.Do();err !=nil{
  22. fmt.Println("exec some failed ....")
  23. }
  24. case <-w.Quit:
  25. return
  26. }
  27. }
  28. }()
  29. }
  30.  
  31. func (w *Worker) Stop() {
  32. go func() {
  33. w.Quit <-true
  34. }()
  35. }

  

 

实现一个分发器

  1. package workPool
  2.  
  3. import "runtime"
  4.  
  5. var(
  6. MaxWorkers = runtime.NumCPU()
  7. MaxQueue = 512
  8. )
  9.  
  10. type Job interface {
  11. Do() error
  12. }
  13.  
  14. var JobQueue chan Job
  15.  
  16. type Dispatcher struct {
  17. MaxWorkers int
  18. WorkerPool chan chan Job
  19. Quit chan bool
  20. }
  21.  
  22. func init() {
  23. runtime.GOMAXPROCS(MaxWorkers)
  24. JobQueue = make(chan Job,MaxQueue)
  25. dispatcher := NewDispatcher(MaxWorkers)
  26. dispatcher.Run()
  27. }
  28.  
  29. func NewDispatcher(maxWorkers int) *Dispatcher {
  30. pool := make(chan chan Job,maxWorkers)
  31. return &Dispatcher{MaxWorkers: maxWorkers,WorkerPool: pool,Quit: make(chan bool)}
  32. }
  33.  
  34. func (d *Dispatcher) Run() {
  35. for i:=0;i<d.MaxWorkers;i++{
  36. worker := NewWorker(d.WorkerPool)
  37. worker.Start()
  38. }
  39. go d.Dispatcher()
  40. }
  41.  
  42. func (d *Dispatcher) Dispatcher() {
  43. for {
  44. select {
  45. case job := <-JobQueue:
  46. jobChannel := <-d.WorkerPool
  47. jobChannel <- job
  48. case <-d.Quit:
  49. return
  50. }
  51. }
  52. }

  

  1. main函数中可以这样使用
  1. package main
  2.  
  3. import (
  4. "context_http/workPool"
  5. "fmt"
  6. "net/http"
  7. )
  8.  
  9. type Msg struct {
  10. mobile string
  11. }
  12.  
  13. func (m *Msg) Do() error {
  14. m.mobile = m.mobile+"_test"
  15. fmt.Println(m.mobile)
  16.  
  17. return nil
  18. }
  19.  
  20. func getMobile(w http.ResponseWriter,r *http.Request) {
  21. defer r.Body.Close()
  22.  
  23. r.ParseForm()
  24. mobile := r.PostForm.Get("mobile")
  25.  
  26. var work workPool.Job
  27. m := Msg{mobile: mobile}
  28. work = &m
  29. workPool.JobQueue <- work
  30. status := `{"status":"ok"}`
  31. w.Write([]byte(status))
  32. }
  33.  
  34.  
  35. func main() {
  36. http.HandleFunc("/test",getMobile)
  37. err := http.ListenAndServe(":8081",nil)
  38. if err !=nil{
  39. fmt.Println("server failure :",err)
  40. return
  41. }
  42. }

  

原文链接:http://www.cnblogs.com/pebblecome/p/14357918.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号