写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池。
代码从网上理解了之后写的。代码实例
简单的介绍:
首先实现一个Job接口,只要有方法实现了Do方法即可
定义个分发器结构体,主要是WorkPool线程池,用于存储Worker的JobChannel
init的时候,先初始化一个JobQueue队列,其他的函数调用这个线程池的时候,把任务放在这个队列即可。
然后Run的时候,创建多个Worker,起初的时候,woker会把自身的JobChannel先注册到线程池workerPool中,
然后worker.start就是for{select } 阻塞等待JobChannel中的job任务。
此时又启一个go d.Dispatcher() ,将JobQueue中的job任务放在worker的Jobchannle中。这样上面的for{select} 就可以拿到任务去执行。
注: maxWorkers 是内核CPU数量,本机4核,就是线程池可以放4个JobChannel,所以,在newWorker的时候,就创建了4个Worker来并发的处理job任务。
任务处理
- package workPool
-
- import "fmt"
-
- type Worker struct {
- WorkerPool chan chan Job
- JobChannel chan Job
- Quit chan bool
- }
-
- func NewWorker(workpool chan chan Job) *Worker {
- return &Worker{WorkerPool: workpool,JobChannel: make(chan Job),Quit: make(chan bool)}
- }
-
- func (w *Worker) Start() {
- go func() {
- for{
- w.WorkerPool <-w.JobChannel
- select {
- case job := <-w.JobChannel:
- if err := job.Do();err !=nil{
- fmt.Println("exec some failed ....")
- }
- case <-w.Quit:
- return
- }
- }
- }()
- }
-
- func (w *Worker) Stop() {
- go func() {
- w.Quit <-true
- }()
- }
实现一个分发器
- package workPool
-
- import "runtime"
-
- var(
- MaxWorkers = runtime.NumCPU()
- MaxQueue = 512
- )
-
- type Job interface {
- Do() error
- }
-
- var JobQueue chan Job
-
- type Dispatcher struct {
- MaxWorkers int
- WorkerPool chan chan Job
- Quit chan bool
- }
-
- func init() {
- runtime.GOMAXPROCS(MaxWorkers)
- JobQueue = make(chan Job,MaxQueue)
- dispatcher := NewDispatcher(MaxWorkers)
- dispatcher.Run()
- }
-
- func NewDispatcher(maxWorkers int) *Dispatcher {
- pool := make(chan chan Job,maxWorkers)
- return &Dispatcher{MaxWorkers: maxWorkers,WorkerPool: pool,Quit: make(chan bool)}
- }
-
- func (d *Dispatcher) Run() {
- for i:=0;i<d.MaxWorkers;i++{
- worker := NewWorker(d.WorkerPool)
- worker.Start()
- }
- go d.Dispatcher()
- }
-
- func (d *Dispatcher) Dispatcher() {
- for {
- select {
- case job := <-JobQueue:
- jobChannel := <-d.WorkerPool
- jobChannel <- job
- case <-d.Quit:
- return
- }
- }
- }
- main函数中可以这样使用
- package main
-
- import (
- "context_http/workPool"
- "fmt"
- "net/http"
- )
-
- type Msg struct {
- mobile string
- }
-
- func (m *Msg) Do() error {
- m.mobile = m.mobile+"_test"
- fmt.Println(m.mobile)
-
- return nil
- }
-
- func getMobile(w http.ResponseWriter,r *http.Request) {
- defer r.Body.Close()
-
- r.ParseForm()
- mobile := r.PostForm.Get("mobile")
-
- var work workPool.Job
- m := Msg{mobile: mobile}
- work = &m
- workPool.JobQueue <- work
- status := `{"status":"ok"}`
- w.Write([]byte(status))
- }
-
-
- func main() {
- http.HandleFunc("/test",getMobile)
- err := http.ListenAndServe(":8081",nil)
- if err !=nil{
- fmt.Println("server failure :",err)
- return
- }
- }