熟悉 ruby 的开发同学一定对 rails 以及 sidekiq 印象颇深,前者作为 ruby 语言开发的网页程序框架,提供了许多脚手架工具以及语法糖,充分提高了开发者的开发效率。而后者则作为异步任务处理系统,依赖 redis 存储任务元数据,并且按照 FIFO 的策略进行任务执行。通过 sidekiq,能够在请求处理的过程中将繁重的逻辑计算放在异步任务中,减少 IO 阻塞,提升响应速率。

而我们今天要介绍的 gocraft/work 则号称是 golang 版本的 sidekiq。与 sidekiq 类似的是,gocraft/work 也是依赖于 redis 存储任务元数据,并且也支持 schedule job 与 cron job。

快速上手

我们将用 gocraft/work 开发一个 demo,从而了解 gocraft/work 的特性。

package main

import (
    "fmt"
    "log"

    "github.com/garyburd/redigo/redis"
    "github.com/gocraft/work"
)

var redisPool = &redis.Pool{
    MaxActive: 5,
    MaxIdle:   5,
    Wait:      true,
    Dial: func() (redis.Conn, error) {
        return redis.Dial("tcp", ":6379")
    },
}

var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func enqueueJob(job string, payload work.Q) {
    _, err := enqueuer.Enqueue(job, payload)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Enqueued:", job, "with Paylod:", payload)
}

func enqueueEmail() {
    enqueueJob(
        "send_email",
        work.Q{"address": "[email protected]", "subject": "hello world", "customer_id": 4},
    )
}

func enqueueS3() {
    enqueueJob(
        "upload_s3",
        work.Q{"bucket": "my-s3-bucket"},
    )
}

在 enqueue.go 文件里,我们先实例化了一个 Enqueuer.在实例化 enqueuer 的时候我们要提供 redis 的 namespace 以及连接池.我们可以通过调用 enqueuer.Enqueue 将任务入队,在将任务入队时,我们则要提供任务的名称以及元数据。

package main

import (
    "fmt"
    "os"
    "os/signal"

    "github.com/gocraft/work"
)

type Context struct {
    customerID int64
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
    fmt.Println("Starting job:", job.Name)
    return next()
}

func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
    // If there's a customer_id param, set it in the context for future middleware and handlers to use.
    if _, ok := job.Args["customer_id"]; ok {
        c.customerID = job.ArgInt64("customer_id")
        if err := job.ArgError(); err != nil {
            return err
        }
    }

    return next()
}

func (c *Context) SendEmail(job *work.Job) error {
    // Extract arguments:
    addr := job.ArgString("address")
    subject := job.ArgString("subject")
    if err := job.ArgError(); err != nil {
        return err
    }

    // Go ahead and send the email...
    fmt.Println("Send email to:", addr, "with subject:", subject, "and customer id:", c.customerID)

    return nil
}

func (c *Context) Export(job *work.Job) error {
    bucket := job.ArgString("bucket")
    fmt.Println("Upload:", bucket)

    return nil
}

func startProcess() {
    // Make a new pool. Arguments:
    // Context{} is a struct that will be the context for the request.
    // 10 is the max concurrency
    // "my_app_namespace" is the Redis namespace
    // redisPool is a Redis pool
    pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

    // Add middleware that will be executed for each job
    pool.Middleware((*Context).Log)
    pool.Middleware((*Context).FindCustomer)

    // Map the name of jobs to handler functions
    pool.Job("send_email", (*Context).SendEmail)

    // Customize options:
    pool.JobWithOptions("upload_s3", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

    // Start processing jobs
    pool.Start()

    // Wait for a signal to quit:
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, os.Kill)
    <-signalChan

    // Stop the pool
    pool.Stop()
}

在 process. go 文件里,我们声明了 middleware function。Middleware function 在每个 job 都会被执行。job work.Job 则声明了任务处理器,通过 pool.Job(“send_email”, (Context).SendEmail) 将任务处理器与对应路由注册。