You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
105 lines
3.2 KiB
Go
105 lines
3.2 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
|
|
models "code.locsi.com/locsi/api/models"
|
|
"code.locsi.com/locsi/queue/controllers"
|
|
"code.locsi.com/locsi/queue/tasks"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/go-redis/redis"
|
|
"github.com/hibiken/asynq"
|
|
)
|
|
|
|
const redisAddr = "queue-redis-master:6379"
|
|
|
|
func main() {
|
|
// Connecting to the database here instead of in each task so we can just maintain one connection. Otherwise we could flood the db with connections and block other users.
|
|
models.ConnectDatabase()
|
|
|
|
// Stand up an API endpoint that publishes the status of the application for consumption by Kubernetes
|
|
r := gin.Default()
|
|
// Health check route
|
|
r.GET("/healthz", controllers.GetHealth)
|
|
go r.Run() // Run in a separate go routine so it doesn't block queue server below.
|
|
|
|
/* for seriesID := 693423; seriesID < 693523; seriesID++ {
|
|
const redisAddr = "queue-redis-master:6379"
|
|
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")})
|
|
defer client.Close()
|
|
task, err := tasks.NewUpdatePodcastFromFeedTask(seriesID, "")
|
|
if err != nil {
|
|
log.Fatalf("could not create task: %v", err)
|
|
}
|
|
info, err := client.Enqueue(task)
|
|
if err != nil {
|
|
log.Fatalf("could not enqueue task: %v", err)
|
|
}
|
|
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
|
|
}
|
|
*/
|
|
/*const redisAddr = "queue-redis-master:6379"
|
|
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")})
|
|
defer client.Close()
|
|
task, err := tasks.NewWebSubSubscribeTask()
|
|
log.Print(task)
|
|
if err != nil {
|
|
log.Fatalf("could not create task: %v", err)
|
|
}
|
|
info, err := client.Enqueue(task)
|
|
if err != nil {
|
|
log.Fatalf("could not enqueue task: %v", err)
|
|
}
|
|
log.Printf("enqueued websub task: id=%s queue=%s", info.ID, info.Queue)*/
|
|
|
|
srv := asynq.NewServer(
|
|
asynq.RedisClientOpt{Addr: redisAddr, Password: os.Getenv("REDIS_PASSWORD")},
|
|
asynq.Config{
|
|
// Specify how many concurrent workers to use
|
|
Concurrency: 10,
|
|
// Optionally specify multiple queues with different priority.
|
|
Queues: map[string]int{
|
|
"critical": 6,
|
|
"default": 3,
|
|
"low": 1,
|
|
},
|
|
HealthCheckFunc: healthHandler,
|
|
},
|
|
)
|
|
|
|
// mux maps a type to a handler
|
|
mux := asynq.NewServeMux()
|
|
mux.HandleFunc(tasks.TypeUpdatePodcastFromFeed, tasks.HandleUpdatePodcastFromFeedTask)
|
|
mux.HandleFunc(tasks.TypeWebSubSubscribe, tasks.HandleWebSubSubscribeTask)
|
|
|
|
if err := srv.Run(mux); err != nil {
|
|
log.Fatalf("could not run server: %v", err)
|
|
}
|
|
}
|
|
|
|
func healthHandler(err error) {
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: redisAddr,
|
|
Password: os.Getenv("REDIS_PASSWORD"),
|
|
})
|
|
if err != nil {
|
|
log.Printf("health handler received error: %v", err)
|
|
// Store that the Asynq server is unhealthy for use by our liveness API endpoint.
|
|
err = client.Set("queueHealthOK", "false", 0).Err()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
log.Printf("could not set queueHealthOK to false: %v", err)
|
|
}
|
|
} else {
|
|
// Store that the Asynq server is healthy.
|
|
err = client.Set("queueHealthOK", "true", 0).Err()
|
|
log.Printf("health handler set to ok")
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
log.Printf("could not set queueHealthOK to true: %v", err)
|
|
}
|
|
}
|
|
}
|