Go 语言实战:构建强大的延迟任务队列
2024-02-01 12:50:08 软件 170观看
摘要介绍延迟队列是一种数据结构,用于处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有到达指定的时间点时才能从队列中取出并执行。在实际应用中,延迟队列可以用于处理各种需要延

介绍

延迟队列是一种数据结构,用于处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有到达指定的时间点时才能从队列中取出并执行。7YU28资讯网——每日最新资讯28at.com

在实际应用中,延迟队列可以用于处理各种需要延迟处理的任务,例如发送邮件提醒、订单自动取消、对超时任务的处理等。由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。7YU28资讯网——每日最新资讯28at.com

7YU28资讯网——每日最新资讯28at.com

Simple

在 Go 语言中,我们可以使用 time 包提供的计时器功能,通过使用 Go 中的 slice 存储延迟处理的任务,实现一个简单的延迟队列的功能。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

type Task struct { ExecuteTime time.Time Job         func()}

首先,我们定义一个结构体 Task,它包含一个可以执行任务的函数 Job,和一个执行时间 ExecuteTime,这是期望执行该函数的时间。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

type DelayQueue struct { TaskQueue []Task}

接下来,我们定义一个 DelayQueue 结构体,它拥有一个 TaskQueue,这是一个 Task 类型的切片,用于保存待执行任务的列表。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

// 添加任务func (d *DelayQueue) AddTask(t Task) { d.TaskQueue = append(d.TaskQueue, t)}// 移除任务func (d *DelayQueue) RemoveTask() { d.TaskQueue = d.TaskQueue[1:]}// 执行任务func (d *DelayQueue) ExecuteTasks() { for len(d.TaskQueue) > 0 {  // 获取队列最顶部的任务  currentTask := d.TaskQueue[0]  // 如果执行时间还没到,等待  if time.Now().Before(currentTask.ExecuteTime) {   time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))  }  // 执行任务  currentTask.Job()  // 移除已执行的任务  d.RemoveTask() }}

DelayQueue 包含三个方法:7YU28资讯网——每日最新资讯28at.com

  • 第一个方法是 AddTask(t Task)。此方法将提供的任务 t 添加到 TaskQueue 的末尾。
  • 第二个方法是 RemoveTask()。此方法从 TaskQueue 中移除第一个任务。
  • 第三个方法是 ExecuteTasks()。此方法将执行 TaskQueue 中的所有任务。如果队列顶部任务的执行时间还未到,该方法将等待。一旦时间到了,它将会执行 Job 并从 TaskQueue 中移除该任务。

示例代码:7YU28资讯网——每日最新资讯28at.com

func main() { fmt.Println("Start DelayQueue") queue := DelayQueue{} firstTask := Task{  ExecuteTime: time.Now().Add(4 * time.Second),  Job: func() {   fmt.Println("Executed task 1 after delay")  }, } queue.AddTask(firstTask) secondTask := Task{  ExecuteTime: time.Now().Add(10 * time.Second),  Job: func() {   fmt.Println("Executed task 2 after delay")  }, } queue.AddTask(secondTask) queue.ExecuteTasks() fmt.Println("Done!")}

输出结果:7YU28资讯网——每日最新资讯28at.com

Start DelayQueueExecuted task 1 after delayExecuted task 2 after delayDone!

在示例代码中,我们创建了一个延时队列,将任务添加到队列中,并在指定的延时后执行它们。7YU28资讯网——每日最新资讯28at.com

通过使用这些结构体和方法,我们可以在 Go 中实现简单的延迟执行任务的功能。7YU28资讯网——每日最新资讯28at.com

但是,当 Go 程序重启时,存储在 slice 中的延迟处理的任务将全部丢失。7YU28资讯网——每日最新资讯28at.com

Complex

在 Go 程序中,如果想在重启后保留数据,我们可以将数据持久化到 Redis,可以使用 go-redis/redis 库[1]与 Redis 交互。而对于延迟队列的需求,则可以使用 Redis 的 ZSET(有序集合)特性来实现。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

// 定义一个全局的redisdb变量var redisdb *redis.Client// 初始化连接func initClient() (err error) { redisdb = redis.NewClient(&redis.Options{  Addr:     "localhost:6379",  Password: "", // no password set  DB:       0,  // use default DB }) _, err = redisdb.Ping().Result() if err != nil {  return err } return nil}

全局变量 redisdb 是 redis.Client 类型的指针,用来保存到 Redis 客户端的引用。7YU28资讯网——每日最新资讯28at.com

initClient 函数初始化连接到 Redis 服务器,该服务器在本地主机的 6379 端口运行。它将一个新的 Redis 客户端分配给 redisdb 变量。如果连接成功,它就会 ping Redis 服务器以测试连接。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

// 向队列中添加任务func addTaskToQueue(task string, executeTime int64) { err := redisdb.ZAdd("delay-queue", redis.Z{  Score:  float64(executeTime),  Member: task, }).Err() if err != nil {  panic(err) }}

addTaskToQueue 函数将具有执行时间的任务添加到 Redis 等待排序的集合 "delay-queue"。执行时间是一个 UNIX 时间戳,作为排序集合中的项目的 score,允许 Redis 按照他们应该执行的时间来排序项目。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

// 从队列中获取并处理任务func getAndExecuteTasks() { for {  // 使用 ZRANGEBYSCORE 命令获取分数(时间戳)<= 当前时间的任务  tasks, err := redisdb.ZRangeByScore("delay-queue", redis.ZRangeBy{   Min: "-inf",   Max: fmt.Sprintf("%d", time.Now().Unix()),  }).Result()  if err != nil {   time.Sleep(1 * time.Second)   continue  }  // 处理任务  for _, task := range tasks {   fmt.Println("Executing task: ", task)   // 执行完任务后,用 ZREM 移除该任务   redisdb.ZRem("delay-queue", task)  }  // 暂停一秒  time.Sleep(1 * time.Second) }}

getAndExecuteTasks 函数不断检查 "delay-queue"。它提取队列中 score 小于或等于当前时间戳的任务,意味着这些任务现在应该执行或者他们应该在过去就已经执行。获取任务后,它打印任务(模拟执行)并从队列中删除任务。7YU28资讯网——每日最新资讯28at.com

示例代码:7YU28资讯网——每日最新资讯28at.com

func main() { err := initClient() if err != nil {  fmt.Println("redis connect error:", err)  return } // 添加一些测试任务 addTaskToQueue("task1", time.Now().Add(10*time.Second).Unix()) addTaskToQueue("task2", time.Now().Add(20*time.Second).Unix()) // 执行延迟队列中的任务 getAndExecuteTasks()}

输出结果:7YU28资讯网——每日最新资讯28at.com

Executing task:  task1Executing task:  task2

main 函数调用这些函数。首先,它初始化 Redis 客户端。如果初始化和连接成功,它将一些测试任务添加到队列中,并启动任务执行循环。7YU28资讯网——每日最新资讯28at.com

总结一下,这段 Go 代码使用 Redis 的 Sorted Set 数据类型创建了一个延时队列系统,其中的任务按照他们的执行时间进行排序,一个任务工作者循环获取并执行队列中的任务。这是一个简单而高效地实现作业调度系统的方法。7YU28资讯网——每日最新资讯28at.com

总结

本文我们分别实现简单版和复杂版的延迟队列,其中简单版延迟队列,只使用 Go 实现,复杂版延迟队列,使用 Go 和 Redis 实现。7YU28资讯网——每日最新资讯28at.com

(1) 只使用 Go 实现延迟队列:7YU28资讯网——每日最新资讯28at.com

优点:7YU28资讯网——每日最新资讯28at.com

  • 不需要外部依赖:只使用 Go 实现延迟队列,你不需要安装和维护外部的 Redis 服务器。

缺点:7YU28资讯网——每日最新资讯28at.com

  • 健壮性和持久性:如果程序崩溃或重新启动,延迟队列的数据可能会丢失。
  • 并发控制:使用 Go 内置的数据结构(如 channels 或 slices)在多个 goroutines 之间共享状态变量可能需要精细的并发控制,比如使用 mutexes 或者 channels。

(2) 使用 Go + Redis 实现延迟队列:7YU28资讯网——每日最新资讯28at.com

优点:7YU28资讯网——每日最新资讯28at.com

  • 数据持久性:Redis 提供了数据持久性,即使在程序重启或崩溃后,队列中的数据依然可以恢复。
  • 简化并发:Redis 提供的数据结构(如 sorted set)是原子操作,可以简化并发控制。
  • 功能强大:使用 Redis,你可以利用其提供的一些内建功能,如超时、TTL、持久化等。

缺点:7YU28资讯网——每日最新资讯28at.com

  • 需要额外的组件:使用 Redis 意味着需要安装和运行 Redis 服务器,这可能增加系统的复杂性和运维成本。
  • 网络延迟:如果 Go 程序和 Redis 服务器不在同一台机器上,网络延迟可能会影响延迟的准确性。

总的来说,如果我们对延迟队列的持久性、准确性和并发性有高要求,那么 Go + Redis 的方案可能会更适合。如果我们想要一个更简单的解决方案,并且可以容忍在程序崩溃时部分数据丢失,那么只使用 Go 实现可能会更合适。7YU28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-70436-0.htmlGo 语言实战:构建强大的延迟任务队列

声明:本网页内容旨在传播知识,不代表本站观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。

显示全文

上一篇:提升代码可读性的秘密武器——Pygments库详解!

下一篇:老板与秘书的故事理解CORS(跨域),真的超级简单

最新热点