You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
3.2 KiB
Go

package main
import (
"fmt"
"log"
"os"
models "code.locsi.com/locsi/api/models"
"code.locsi.com/locsi/queue/controllers"
"code.locsi.com/locsi/queue/tasks"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis"
"github.com/hibiken/asynq"
)
const redisAddr = "queue-redis-master:6379"
func main() {
// Connecting to the database here instead of in each task so we can just maintain one connection. Otherwise we could flood the db with connections and block other users.
models.ConnectDatabase()
// Stand up an API endpoint that publishes the status of the application for consumption by Kubernetes
r := gin.Default()
// Health check route
r.GET("/healthz", controllers.GetHealth)
go r.Run() // Run in a separate go routine so it doesn't block queue server below.
/* for seriesID := 693423; seriesID < 693523; seriesID++ {
const redisAddr = "queue-redis-master:6379"
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")})
defer client.Close()
task, err := tasks.NewUpdatePodcastFromFeedTask(seriesID, "")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
*/
/*const redisAddr = "queue-redis-master:6379"
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")})
defer client.Close()
task, err := tasks.NewWebSubSubscribeTask()
log.Print(task)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued websub task: id=%s queue=%s", info.ID, info.Queue)*/
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
HealthCheckFunc: healthHandler,
},
)
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeUpdatePodcastFromFeed, tasks.HandleUpdatePodcastFromFeedTask)
mux.HandleFunc(tasks.TypeWebSubSubscribe, tasks.HandleWebSubSubscribeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
func healthHandler(err error) {
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: os.Getenv("REDIS_PASSWORD"),
})
if err != nil {
log.Printf("health handler received error: %v", err)
// Store that the Asynq server is unhealthy for use by our liveness API endpoint.
err = client.Set("queueHealthOK", "false", 0).Err()
if err != nil {
fmt.Println(err)
log.Printf("could not set queueHealthOK to false: %v", err)
}
} else {
// Store that the Asynq server is healthy.
err = client.Set("queueHealthOK", "true", 0).Err()
log.Printf("health handler set to ok")
if err != nil {
fmt.Println(err)
log.Printf("could not set queueHealthOK to true: %v", err)
}
}
}