Updated code to grab series data from feed.

master
Michael 1 year ago
parent 86984647a2
commit 6f5cb2139c

1
.gitignore vendored

@ -1 +1,2 @@
locsi_models
tasks/podcastindex_feeds.db

@ -13,19 +13,22 @@ const redisAddr = "localhost:6379"
func main() {
// Connecting to the database here instead of in each task so we can just maintain one connection. Otherwise we could flood the db with connections and block other users.
models.ConnectDatabase()
/* const redisAddr = "localhost:6379"
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
task, err := tasks.NewUpdatePodcastEpisodesTask(169)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) */
/* for seriesID := 943118; seriesID < 943119; seriesID++ {
const redisAddr = "localhost:6379"
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
task, err := tasks.NewUpdatePodcastFromFeedTask(seriesID)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
*/
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
@ -42,7 +45,7 @@ func main() {
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeUpdatePodcastEpisodes, tasks.HandleUpdatePodcastEpisodesTask)
mux.HandleFunc(tasks.TypeUpdatePodcastFromFeed, tasks.HandleUpdatePodcastFromFeedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)

@ -0,0 +1,68 @@
package tasks
import (
"testing"
models "locsi.com/server/queue/locsi_models"
)
type expectedPodcastSeriesResult struct {
value models.PodcastSeries
isValid bool
}
type PodcastSeriesTest struct {
ID int
expectedResult expectedPodcastSeriesResult
}
func validatePodSeries(podSeries models.PodcastSeries, tt PodcastSeriesTest, t *testing.T) {
if podSeries.WebFeed != tt.expectedResult.value.WebFeed {
t.Errorf("for %d, field WebFeed: got %v, want %v", tt.ID, podSeries.WebFeed, tt.expectedResult.value.WebFeed)
}
if podSeries.Name != tt.expectedResult.value.Name {
t.Errorf("for %d, field Name: got %v, want %v", tt.ID, podSeries.Name, tt.expectedResult.value.Name)
}
if podSeries.URL != tt.expectedResult.value.URL {
t.Errorf("for %d, field URL: got %v, want %v", tt.ID, podSeries.URL, tt.expectedResult.value.URL)
}
if podSeries.Explicit != tt.expectedResult.value.Explicit {
t.Errorf("for %d, field Explicit: got %v, want %v", tt.ID, podSeries.Explicit, tt.expectedResult.value.Explicit)
}
if podSeries.ImageURL != tt.expectedResult.value.ImageURL {
t.Errorf("for %d, field ImageURL: got %v, want %v", tt.ID, podSeries.ImageURL, tt.expectedResult.value.ImageURL)
}
if podSeries.ITunesType != tt.expectedResult.value.ITunesType {
t.Errorf("for %d, field ITunesType: got %v, want %v", tt.ID, podSeries.ITunesType, tt.expectedResult.value.ITunesType)
}
if podSeries.InLanguage != tt.expectedResult.value.InLanguage {
t.Errorf("for %d, field InLanguage: got %v, want %v", tt.ID, podSeries.InLanguage, tt.expectedResult.value.InLanguage)
}
if podSeries.GUID != tt.expectedResult.value.GUID {
t.Errorf("for %d, field GUID: got %v, want %v", tt.ID, podSeries.GUID, tt.expectedResult.value.GUID)
}
if podSeries.Description != tt.expectedResult.value.Description {
t.Errorf("for %d, field Description: got %v, want %v", tt.ID, podSeries.Description, tt.expectedResult.value.Description)
}
if podSeries.WebSubHub != tt.expectedResult.value.WebSubHub {
t.Errorf("for %d, field WebSubHub: got %v, want %v", tt.ID, podSeries.WebSubHub, tt.expectedResult.value.WebSubHub)
}
if podSeries.WebSubSelf != tt.expectedResult.value.WebSubSelf {
t.Errorf("for %d, field WebSubSelf: got %v, want %v", tt.ID, podSeries.WebSubSelf, tt.expectedResult.value.WebSubSelf)
}
if !EqualArray(podSeries.Categories, tt.expectedResult.value.Categories) {
t.Errorf("for %d, field Categories: got %v, want %v", tt.ID, podSeries.Categories, tt.expectedResult.value.Categories)
}
}
func EqualArray(a, b []models.Categories) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

@ -1,150 +0,0 @@
package tasks
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"github.com/hibiken/asynq"
"github.com/mmcdole/gofeed"
"gorm.io/gorm/clause"
models "locsi.com/server/queue/locsi_models"
)
// A list of task types.
const (
TypeUpdatePodcastEpisodes = "podcastEpisodes:update"
)
type UpdatePodcastEpisodesPayload struct {
PodcastSeriesID int
}
//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------
func NewUpdatePodcastEpisodesTask(podSeriesID int) (*asynq.Task, error) {
payload, err := json.Marshal(UpdatePodcastEpisodesPayload{PodcastSeriesID: podSeriesID})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeUpdatePodcastEpisodes, payload), nil
}
// Queries our database to find podcast series that need to have their RSS feeds crawled. If then stores new episode information found in the database.
func HandleUpdatePodcastEpisodesTask(ctx context.Context, t *asynq.Task) error {
var p UpdatePodcastEpisodesPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err)
}
log.Printf("Updating episodes for: PodcastSeriesID=%d", p.PodcastSeriesID)
// Below is just in place during the initial crawl phase. (It looks like this might take 45 days?) Remove and have this task enqueue logic managed elsewhere.
if p.PodcastSeriesID <= 3889935 {
// Get the highest series id that we have processed.
var podEpisode models.PodcastEpisode
if err := models.DB.Where("podcast_series_id <> ?", "693423").Order("podcast_series_id desc").Limit(1).Find(&podEpisode).Error; err != nil {
return fmt.Errorf("Reading from the database failed: %w", err.Error())
}
if p.PodcastSeriesID+1 > podEpisode.PodcastSeriesID {
/* fmt.Println("New pod %i", p.PodcastSeriesID+1)
// We are handling the highest podcast episode currently. Queue a task to handle the next highest series.
time.Sleep(time.Second)
const redisAddr = "localhost:6379"
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
task, err := NewUpdatePodcastEpisodesTask(p.PodcastSeriesID + 1)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) */
}
}
// END initial crawl phase code
// Grab podcast series feeds from the database.
//models.ConnectDatabase()
var podSeries models.PodcastSeries
if err := models.DB.Find(&podSeries, p.PodcastSeriesID).Error; err != nil {
return fmt.Errorf("Reading from the database failed: %w", err.Error())
}
// Grab the RSS feed for this podcast.
fp := gofeed.NewParser()
feed, err := fp.ParseURL(podSeries.WebFeed)
if err != nil {
fmt.Printf("Failed to get the feed for %v, %v.\n", podSeries.ID, podSeries.WebFeed)
/*????? Store that this failed and have the job run again. If it fails many times, decrease poll time or stop polling altogether. Currently it is skipping retry, but this is not the behavior we want long term.
Be able to know that a series is failing to parse in case we need to update our code to handle it.
*/
// return fmt.Errorf("Failed to get the feed for %s: %s", podSeries.WebFeed, err.Error(), asynq.SkipRetry)
return fmt.Errorf(err.Error(), asynq.SkipRetry)
} else {
// Process the RSS feed data.
//fmt.Println(feed.Title)
for _, item := range feed.Items {
// Update/Insert episodes in our database.
fmt.Printf("Converting to episode. Series id:%v, GUID:%v\n", podSeries.ID, item.GUID)
podEpisode, err := ToPodEpisode(item, podSeries.ID)
if err != nil {
fmt.Errorf("Failed to convert item to episode for %s, %s: %v", podSeries.WebFeed, item.GUID, err.Error())
} else {
models.DB.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "guid"}},
UpdateAll: true,
}).Create(&podEpisode)
}
}
//????? left off here.
PodcastUpdateFrequency()
}
return nil
}
func ToPodEpisode(item *gofeed.Item, podSeriesId int) (models.PodcastEpisode, error) {
var podEpisode models.PodcastEpisode
// fmt.Println(item.PublishedParsed.Format(time.RFC3339))
if len(item.Enclosures) > 0 && item.Enclosures[0].URL != "" {
podEpisode := models.PodcastEpisode{Name: item.Title, GUID: item.GUID, ContentURL: item.Enclosures[0].URL, PodcastSeriesID: podSeriesId}
if item.ITunesExt != nil && item.ITunesExt.Duration != "" {
podEpisode.Duration = item.ITunesExt.Duration
}
if item.ITunesExt != nil && item.ITunesExt.Image != "" {
podEpisode.ImageURL = item.ITunesExt.Image
}
if item.PublishedParsed != nil {
podEpisode.DatePublished = *item.PublishedParsed
}
if item.Content != "" {
podEpisode.Description = item.Content
} else if item.ITunesExt != nil && item.ITunesExt.Summary != "" {
podEpisode.Description = item.ITunesExt.Summary
}
return podEpisode, nil
} else {
return podEpisode, errors.New("There is no content for this item.")
}
}
/* TODO: ????? Determine the polling frequency to look for future updates for this podcast. Base this off of when the last episode was released, how frequently new episodes come out, how popular a podcast is and if new episodes are still being released or not. Could store a "next_crawl_time" variable for each series? This file would then query all podcasts that have "next_crawl_time" < "now".
1) Grab all of the episodes and populate the database so we have some data to base the logic of this function off of.
2) Come up with a first shot at this logic.
3) With the initial function, see how often we would have to be crawling to better understand the problem and adjust as necessary.
*/
func PodcastUpdateFrequency() {
}

@ -0,0 +1,165 @@
package tasks
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"github.com/hibiken/asynq"
"github.com/mmcdole/gofeed"
"gorm.io/gorm/clause"
models "locsi.com/server/queue/locsi_models"
)
// A list of task types.
const (
TypeUpdatePodcastFromFeed = "updatePodcast:fromFeed"
)
type UpdatePodcastFromFeedPayload struct {
PodcastSeriesID int
}
func NewUpdatePodcastFromFeedTask(podSeriesID int) (*asynq.Task, error) {
payload, err := json.Marshal(UpdatePodcastFromFeedPayload{PodcastSeriesID: podSeriesID})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeUpdatePodcastFromFeed, payload), nil
}
// Queries our database to find podcast series that need to have their RSS feeds crawled. It then stores new episode information found in the database.
func HandleUpdatePodcastFromFeedTask(ctx context.Context, t *asynq.Task) error {
var p UpdatePodcastFromFeedPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v", err)
}
log.Printf("Updating episodes for: PodcastSeriesID=%d", p.PodcastSeriesID)
// Grab podcast series feeds from the database.
var podSeries models.PodcastSeries
if err := models.DB.Preload("Categories").Find(&podSeries, p.PodcastSeriesID).Error; err != nil {
return fmt.Errorf("Reading from the database failed: %v", err.Error())
}
// Grab the RSS feed for this podcast.
fp := gofeed.NewParser()
feed, err := fp.ParseURL(podSeries.WebFeed)
if err != nil {
fmt.Printf("Failed to get the feed for %v, %v.\n", podSeries.ID, podSeries.WebFeed)
return fmt.Errorf(err.Error(), asynq.SkipRetry)
} else {
// Process the RSS feed data.
tx := models.DB.Begin()
podEpisodes := []models.PodcastEpisode{}
for _, item := range feed.Items {
// Update/Insert episodes in our database.
fmt.Printf("Converting to episode. Series id:%v, GUID:%v\n", podSeries.ID, item.GUID)
podEpisode, err := ToPodEpisode(item, podSeries.ID)
if err != nil {
return fmt.Errorf("Failed to convert item to episode for %s, %s: %v", podSeries.WebFeed, item.GUID, err.Error())
} else {
podEpisodes = append(podEpisodes, podEpisode)
}
}
if len(podEpisodes) > 0 {
fmt.Printf("Inserting %v episodes for %v\n", len(podEpisodes), podSeries.ID)
tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "guid"}},
UpdateAll: true,
}).CreateInBatches(&podEpisodes, 50)
}
// Update the Podcast Series data from the feed too.
updatedPodSeries := ToUpdatedPodSeries(feed, podSeries)
fmt.Printf("Updating series %v\n", podSeries.ID)
// Update the Podcast Series
tx.Save(&updatedPodSeries)
// Update the Podcast Series Categories (removes any that exist in the db that are not in the feed)
tx.Model(&updatedPodSeries).Association("Categories").Replace(updatedPodSeries.Categories)
tx.Commit()
}
return nil
}
func ToPodEpisode(item *gofeed.Item, podSeriesId int) (models.PodcastEpisode, error) {
var podEpisode models.PodcastEpisode
if len(item.Enclosures) > 0 && item.Enclosures[0].URL != "" {
podEpisode := models.PodcastEpisode{Name: item.Title, GUID: item.GUID, ContentURL: item.Enclosures[0].URL, PodcastSeriesID: podSeriesId}
if item.ITunesExt != nil && item.ITunesExt.Duration != "" {
podEpisode.Duration = item.ITunesExt.Duration
}
if item.ITunesExt != nil && item.ITunesExt.Image != "" {
podEpisode.ImageURL = item.ITunesExt.Image
}
if item.PublishedParsed != nil {
podEpisode.DatePublished = *item.PublishedParsed
}
if item.Content != "" {
podEpisode.Description = item.Content
} else if item.ITunesExt != nil && item.ITunesExt.Summary != "" {
podEpisode.Description = item.ITunesExt.Summary
}
return podEpisode, nil
} else {
return podEpisode, errors.New("There is no content for this item.")
}
}
func ToUpdatedPodSeries(feed *gofeed.Feed, podSeries models.PodcastSeries) models.PodcastSeries {
// Compare the feed data to the existing podcast series in the database and update with any new data.
updatedPodSeries := podSeries
if feed.Title != "" && feed.Title != podSeries.Name {
updatedPodSeries.Name = feed.Title
}
if feed.Link != "" && feed.Link != podSeries.URL {
updatedPodSeries.URL = feed.Link
}
if feed.ITunesExt != nil && feed.ITunesExt.Explicit != "" {
if (strings.ToLower(feed.ITunesExt.Explicit) == "yes" || strings.ToLower(feed.ITunesExt.Explicit) == "explicit" || strings.ToLower(feed.ITunesExt.Explicit) == "true") && podSeries.Explicit == false {
updatedPodSeries.Explicit = true
} else if (strings.ToLower(feed.ITunesExt.Explicit) == "no" || strings.ToLower(feed.ITunesExt.Explicit) == "clean" || strings.ToLower(feed.ITunesExt.Explicit) == "false") && podSeries.Explicit == true {
updatedPodSeries.Explicit = false
}
}
if feed.Image != nil && feed.Image.URL != "" && feed.Image.URL != podSeries.ImageURL {
updatedPodSeries.ImageURL = feed.Image.URL
}
if feed.Language != "" && feed.Language != podSeries.InLanguage {
updatedPodSeries.InLanguage = feed.Language
}
if feed.Description != "" && feed.Description != podSeries.Description {
updatedPodSeries.Description = feed.Description
}
if feed.Extensions != nil {
for _, ext := range feed.Extensions["atom"]["link"] {
if ext.Attrs["rel"] == "self" && ext.Attrs["href"] != podSeries.WebSubSelf {
updatedPodSeries.WebSubSelf = ext.Attrs["href"]
}
if ext.Attrs["rel"] == "hub" && ext.Attrs["href"] != podSeries.WebSubHub {
updatedPodSeries.WebSubHub = ext.Attrs["href"]
}
}
}
// Update the categories for this podcast series.
podCategories := []models.Categories{}
// Add all of the categories in the feed to the db.
if feed.Categories != nil {
for _, cat := range feed.Categories {
podCategories = append(podCategories, models.Categories{Category: strings.ToLower(cat)})
if len(podCategories) == 10 {
// Don't allow more than 10 categories for a Podcast Series
break
}
}
updatedPodSeries.Categories = podCategories
}
return updatedPodSeries
}

@ -0,0 +1,129 @@
package tasks
import (
"fmt"
"testing"
"time"
"github.com/mmcdole/gofeed"
models "locsi.com/server/queue/locsi_models"
)
func TestToPodEpisode(t *testing.T) {
type expectedResult struct {
value models.PodcastEpisode
isValid bool
}
parsedTime1, _ := time.Parse(time.RFC3339, "2015-05-29T22:59:00Z")
parsedTime2, _ := time.Parse(time.RFC3339, "0000-00-00T00:00:00Z")
var tests = []struct {
podSeriesId int
episodeGUID string
expectedResult expectedResult
}{
{6, "tag:blogger.com,1999:blog-8429222294807394803.post-5283595917266045004", expectedResult{value: models.PodcastEpisode{
Name: "June 6-7 2015 Dual Sport Events", DatePublished: parsedTime1, GUID: "tag:blogger.com,1999:blog-8429222294807394803.post-5283595917266045004", Duration: "",
Description: "<iframe allowfullscreen=\"\" frameborder=\"0\" height=\"30\" mozallowfullscreen=\"true\" src=\"https://archive.org/embed/DualSport662015\" webkitallowfullscreen=\"true\" width=\"500\"></iframe> <enclosure length=\"14,339,473\" type=\"audio/mpeg\" url=\"https://archive.org/embed/DualSport662015\"><a href=\"http://archive.org/details/mark_usdualsports_Fdx\">Link</a></enclosure> <a href=\"http://ia601505.us.archive.org/4/items/DualSport662015/Dual%20Sport%206-6-2015.mp3\">Link</a>",
ImageURL: "", ContentURL: "http://ia601505.us.archive.org/4/items/DualSport662015/Dual%20Sport%206-6-2015.mp3", PodcastSeriesID: 6}, isValid: true}},
{8, "f9b0051099697331aa933922a1cb6739", expectedResult{isValid: false}},
{120, "f9b0051099697331aa933922a1cb6739", expectedResult{isValid: false}},
{160, "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", expectedResult{value: models.PodcastEpisode{
Name: "Podcast CIEP | #2", DatePublished: parsedTime2, GUID: "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", Duration: "5:30",
Description: "Ajuste del gasto público: por qué se tiene que recortar el gasto y dónde estarán los recortes. Finalmente, se analiza la evolución del presupuesto asignado a la función Protección social y la improbabilidad de alcanzar una pensión universal.",
ImageURL: "", ContentURL: "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", PodcastSeriesID: 160}, isValid: true}},
}
models.ConnectDatabase()
for _, tt := range tests {
var podcast_series models.PodcastSeries
testname := fmt.Sprintf("Testing id %v\n", tt.episodeGUID)
t.Run(testname, func(t *testing.T) {
// Get a podcast from the db.
if err := models.DB.Find(&podcast_series, int(tt.podSeriesId)).Error; err != nil {
t.Errorf("Failed to get podcast from db: %v", err.Error())
}
// Grab the RSS feed for this podcast.
fp := gofeed.NewParser()
feed, err := fp.ParseURL(podcast_series.WebFeed)
if err != nil {
t.Errorf("Failed to get the feed for %s: %v", podcast_series.WebFeed, err.Error())
} else {
// Process the RSS feed data.
for _, item := range feed.Items {
if item.GUID == tt.episodeGUID {
// This is the GUID we are trying to test.
podEpisode, err := ToPodEpisode(item, tt.podSeriesId)
if err != nil {
if tt.expectedResult.isValid == true {
t.Errorf("Failed to convert item to episode for %s, %s: %v", podcast_series.WebFeed, tt.episodeGUID, err.Error())
}
} else {
if podEpisode.Name != tt.expectedResult.value.Name {
t.Errorf("for %s, field Name: got %v, want %v", tt.episodeGUID, podEpisode.Name, tt.expectedResult.value.Name)
}
if podEpisode.DatePublished != tt.expectedResult.value.DatePublished {
t.Errorf("for %s, field DatePublished: got %v, want %v", tt.episodeGUID, podEpisode.DatePublished, tt.expectedResult.value.DatePublished)
}
if podEpisode.GUID != tt.expectedResult.value.GUID {
t.Errorf("for %s, field GUID: got %v, want %v", tt.episodeGUID, podEpisode.GUID, tt.expectedResult.value.GUID)
}
if podEpisode.Duration != tt.expectedResult.value.Duration {
t.Errorf("for %s, field Duration: got %v, want %v", tt.episodeGUID, podEpisode.Duration, tt.expectedResult.value.Duration)
}
if podEpisode.Description != tt.expectedResult.value.Description {
t.Errorf("for %s, field Description: got %v, want %v", tt.episodeGUID, podEpisode.Description, tt.expectedResult.value.Description)
}
if podEpisode.ImageURL != tt.expectedResult.value.ImageURL {
t.Errorf("for %s, field ImageURL: got %v, want %v", tt.episodeGUID, podEpisode.ImageURL, tt.expectedResult.value.ImageURL)
}
if podEpisode.ContentURL != tt.expectedResult.value.ContentURL {
t.Errorf("for %s, field ContentURL: got %v, want %v", tt.episodeGUID, podEpisode.ContentURL, tt.expectedResult.value.ContentURL)
}
if podEpisode.PodcastSeriesID != tt.expectedResult.value.PodcastSeriesID {
t.Errorf("for %s, field PodcastSeriesID: got %v, want %v", tt.episodeGUID, podEpisode.PodcastSeriesID, tt.expectedResult.value.PodcastSeriesID)
}
}
}
}
}
})
}
}
func TestToUpdatedPodSeries(t *testing.T) {
var tests = []PodcastSeriesTest{
{693423, expectedPodcastSeriesResult{value: models.PodcastSeries{ID: 693423, WebFeed: "https://lexfridman.com/feed/podcast/", Name: "Lex Fridman Podcast", URL: "https://lexfridman.com/", Explicit: false,
ImageURL: "https://lexfridman.com/wordpress/wp-content/uploads/powerpress/artwork_3000-230.png", ITunesType: "episodic", InLanguage: "en-US", GUID: "89e87657-cd69-5a47-9190-1365d3de138d",
Description: "Conversations about science, technology, history, philosophy and the nature of intelligence, consciousness, love, and power. Lex is an AI researcher at MIT and beyond.",
WebSubHub: "https://pubsubhubbub.appspot.com/", WebSubSelf: "https://lexfridman.com/feed/podcast/",
Categories: []models.Categories{{Category: "technology"}, {Category: "science"}, {Category: "society & culture"}, {Category: "philosophy"}}}, isValid: true}},
{943118, expectedPodcastSeriesResult{value: models.PodcastSeries{ID: 943118, WebFeed: "https://audioboom.com/channels/5093219.rss", Name: "The Tim Dillon Show", URL: "https://audioboom.com/channels/5093219", Explicit: false,
ImageURL: "https://images.theabcdn.com/i/40034741/s=1400x1400/el=1/rt=fill.jpg", ITunesType: "episodic", InLanguage: "en-us", GUID: "b73a3f59-999b-548d-8f3d-c50fa4a754b1",
Description: "Tim Dillon is a comedian and tour guide. Hes very excited to give you a tour of the end of the world. Each week from a porch in Los Angeles he shares apocalyptic visions with his friends and berates a local diner.",
WebSubHub: "https://pubsubhubbub.appspot.com/", WebSubSelf: "https://audioboom.com/channels/5093219.rss",
Categories: []models.Categories{{Category: "comedy"}}}, isValid: true}},
}
for _, tt := range tests {
testname := fmt.Sprintf("Testing series id %v\n", tt.ID)
t.Run(testname, func(t *testing.T) {
var podSeries models.PodcastSeries
if err := models.DB.Preload("Categories").Find(&podSeries, tt.ID).Error; err != nil {
t.Errorf("Failed to get podcast from db: %v", err.Error())
}
// Grab the RSS feed for this podcast.
fp := gofeed.NewParser()
feed, err := fp.ParseURL(podSeries.WebFeed)
if err != nil {
t.Errorf("Failed to get the feed for %s: %v", podSeries.WebFeed, err.Error())
} else {
updatedPodSeries := ToUpdatedPodSeries(feed, podSeries)
// Run tests to validate the Podcast Series
validatePodSeries(updatedPodSeries, tt, t)
}
})
}
}

@ -1,112 +0,0 @@
package tasks
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/hibiken/asynq"
models "locsi.com/server/queue/locsi_models"
"locsi.com/server/queue/pod_index_models"
)
// A list of task types.
const (
TypeUpdatePodcastSeries = "podcastSeries:update"
)
type UpdatePodcastSeriesPayload struct {
PodcastSeriesID int
}
func NewUpdatePodcastSeriesTask(podSeriesID int) (*asynq.Task, error) {
payload, err := json.Marshal(UpdatePodcastSeriesPayload{0})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeUpdatePodcastSeries, payload), nil
}
// Queries our database to find podcast series that need to have their RSS feeds crawled. If then stores new episode information found in the database.
func HandleUpdatePodcastSeriesTask(ctx context.Context, t *asynq.Task) error {
var p UpdatePodcastSeriesPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err)
}
log.Printf("Updating Series")
// Get some rows from the pod index database.
var pod_index_podcasts []pod_index_models.Podcasts
// Below: Limit controls how many results to process. Offset controls where to start. Leave Limit stable and increment Offest by Limit value for each run.
if err := pod_index_models.DB.Limit(25000).Offset(3925000).Find(&pod_index_podcasts).Error; err != nil {
log.Fatal(err.Error())
}
// Convert each row from the pod index struct to our struct
for _, pod := range pod_index_podcasts {
// podCopy := pod
podSeries := ToPodSeries(pod)
// Insert data into postgresql db using API
podSeriesJson, _ := json.Marshal(podSeries)
//podJson, _ := json.Marshal(pod)
// fmt.Println(string(podSeriesJson))
// fmt.Println(string(podJson))
resp, err := http.Post("http://localhost:8080/podcast-series", "application/json",
bytes.NewBuffer(podSeriesJson))
if err != nil || resp.StatusCode != 200 {
fmt.Printf("Failed to post the item below with response code %v.\n", resp.StatusCode)
fmt.Println(string(podSeriesJson))
resBody, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("client: response body: %s\n", resBody)
log.Fatal("")
}
fmt.Printf("Inserted %v.\n", pod.ID)
}
return nil
}
func ToPodSeries(pod pod_index_models.Podcasts) models.PodcastSeries {
// Get the categories for this podcast series.
podCategories := []models.Categories{}
if pod.Category1 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category1})
}
if pod.Category2 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category2})
}
if pod.Category3 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category3})
}
if pod.Category4 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category4})
}
if pod.Category5 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category5})
}
if pod.Category6 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category6})
}
if pod.Category7 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category7})
}
if pod.Category8 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category8})
}
if pod.Category9 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category9})
}
if pod.Category10 != "" {
podCategories = append(podCategories, models.Categories{Category: pod.Category10})
}
return models.PodcastSeries{WebFeed: pod.URL, Name: pod.Title, URL: pod.Link, Explicit: (pod.Explicit != 0), ImageURL: pod.ImageUrl, ITunesType: pod.ItunesType, InLanguage: pod.Language, GUID: pod.PodcastGuid, Description: pod.Description, Categories: podCategories}
}

@ -1,212 +0,0 @@
package tasks
import (
"fmt"
"testing"
"time"
"github.com/mmcdole/gofeed"
models "locsi.com/server/queue/locsi_models"
"locsi.com/server/queue/pod_index_models"
)
/*????? Outliers to write tests for:
-Write a test for pod series #27, #71, #80, #90 that have an empty webfeed.
-#126 is failing?
-review failed tasks in asynq to find more tests to write
-query the db to find series that don't have any episodes and write tests if there are crawl failures associated with that
*/
func TestToPodSeries(t *testing.T) {
var tests = []struct {
rowID int
expectedResult models.PodcastSeries
}{
{150, models.PodcastSeries{
ID: 150, WebFeed: "https://www.podcasts.com/rss_feed/82efb3f4ebafda0e6d0f6b186ae52a67", Name: "Metal For Dummies", URL: "https://www.podcasts.com/metal-for-dummies-1f55c63b0", Explicit: true,
ImageURL: "https://s3.amazonaws.com/podcasts-image-uploads/metal-for-dummies-1f55c63b0-large.jpg", ITunesType: "Episodic", InLanguage: "", GUID: "fc548de5-4f10-50d3-b27a-f4aaa71e0ce9",
Description: "Weekly podcast for fans of hard rock and heavy metal", Categories: []models.Categories{{Category: "music"}}}},
{6046471, models.PodcastSeries{
ID: 6046471, WebFeed: "https://feeds.transistor.fm/songs", Name: "La musica Sacra Disc 2", URL: "", Explicit: false,
ImageURL: "", ITunesType: "serial", InLanguage: "en", GUID: "90b311a3-4724-58e0-b2ea-8330308dbcd0",
Description: "Compiled Christian music", Categories: []models.Categories{}}},
}
/* {pod_index_models.Podcasts{ID: 192, URL: "https://anchor.fm/s/126f9354/podcast/rss", Title:"WeR1 Podcasts", LastUpdate:1625174829, Link:"https://anchor.fm/WeR1ministries", LastHttpStatus:200,
Dead:0, Contenttype:"application/rss+xml; charset=utf-8", ItunesId:1000697584, OriginalUrl:"https://anchor.fm/s/126f9354/podcast/rss", ItunesAuthor:"WeR1", ItunesOwnerName:"WeR1", Explicit:0,
ImageUrl:"https://d3t3ozftmdmh3i.cloudfront.net/production/podcast_uploaded_nologo/2993021/2993021-1579844602262-d344aa73c11de.jpg", ItunesType:"episodic", Generator:"Anchor Podcasts",
NewestItemPubdate:1625169406, Language:"en", OldestItemPubdate:1579643364, EpisodeCount:21, PopularityScore:0, Priority:-1, CreatedOn:1596752671, UpdateFrequency:8, Chash:"987f5774bd42be7224356de984fef0ec",
Host:"anchor.fm", NewestEnclosureUrl:"https://anchor.fm/s/126f9354/podcast/play/36536242/https%3A%2F%2Fd3ctxlq1ktw2nl.cloudfront.net%2Fstaging%2F2021-6-1%2F202128442-44100-2-1ce916ef5a35b.m4a",
PodcastGuid:"61519044-3dce-52d0-ad72-b589d55aa7de", Description:"WeR1 is a dynamic christian community located in the greater Montreal area of Quebec, Canada. Our desire is to connect everyone, everywhere to the heart of God. Expect weekly prophetic teaching, observations and discussion.",
Category1:"religion", Category2:"spirituality", Category3:"christianity", Category4:"", Category5:"", Category6:"", Category7:"", Category8:"", Category9:"", Category10:"", NewestEnclosureDuration: 1936}, 1},
{pod_index_models.Podcasts{ID: 108874, URL:"https://anchor.fm/s/13024b18/podcast/rss" , Title:"Trabajo", LastUpdate:1622766208, Link:"https://anchor.fm/I4N-271207", LastHttpStatus:200,
Dead:0, Contenttype:"application/rss+xml; charset=utf-8", ItunesId:1496348905, OriginalUrl:"https://anchor.fm/s/13024b18/podcast/rss", ItunesAuthor:"I4N 2707", ItunesOwnerName:"I4N 2707",
Explicit:0, ImageUrl:"https://d3t3ozftmdmh3i.cloudfront.net/production/podcast_uploaded/3089174/3089174-1579834852932-4c92c81adb90a.jpg", ItunesType:"episodic", Generator:"Anchor Podcasts",
NewestItemPubdate:1579834857, Language:"es", OldestItemPubdate:1579834857, EpisodeCount:1, PopularityScore:0, Priority:-1, CreatedOn:1597201402, UpdateFrequency:9, Chash:"680777660f9796d51f8bbfb3b6dac302",
Host:"anchor.fm", NewestEnclosureUrl:"https://anchor.fm/s/13024b18/podcast/play/9906738/https%3A%2F%2Fd3ctxlq1ktw2nl.cloudfront.net%2Fproduction%2F2020-0-24%2F44898569-44100-1-bbc12aee9c985.m4a",
PodcastGuid:"a38d4efe-a674-59cb-bdab-a2bafb71af86", Description:"Trabajo de tecnologia", Category1:"fiction", Category2:"science", Category3:"", Category4:"", Category5:"", Category6:"", Category7:""
, Category8:, Category9:, Category10:, NewestEnclosureDuration: 55}, 1}, */
/* {pod_index_models.Podcasts{ID: , URL: , Title:, LastUpdate:, Link:, LastHttpStatus:,
Dead:, Contenttype:, ItunesId:, OriginalUrl:, ItunesAuthor:, ItunesOwnerName:, Explicit:, ImageUrl:, ItunesType:, Generator:,
NewestItemPubdate:, Language:, OldestItemPubdate:, EpisodeCount:, PopularityScore:, Priority:, CreatedOn:, UpdateFrequency:, Chash:,
Host:, NewestEnclosureUrl:, PodcastGuid:, Description:, Category1:, Category2:, Category3:, Category4:, Category5:, Category6:, Category7:, Category8:, Category9:, Category10:, NewestEnclosureDuration: }, 1},
*/
pod_index_models.ConnectDatabase()
for _, tt := range tests {
var pod_index_podcast pod_index_models.Podcasts
testname := fmt.Sprintf("Testing id %d", tt.rowID)
t.Run(testname, func(t *testing.T) {
// Get a podcast from the db.
if err := pod_index_models.DB.Find(&pod_index_podcast, int(tt.rowID)).Error; err != nil {
fmt.Println(err.Error())
}
//fmt.Println(&pod_index_podcast)
podSeries := ToPodSeries(pod_index_podcast)
//fmt.Println(tt.expectedResult.Description)
/* if reflect.DeepEqual(podSeries, tt.expectedResult) {
t.Errorf("for %d struct: got %v, want %v", tt.rowID, podSeries, tt.expectedResult)
}
*/if podSeries.WebFeed != tt.expectedResult.WebFeed {
t.Errorf("for %d, field WebFeed: got %v, want %v", tt.rowID, podSeries.WebFeed, tt.expectedResult.WebFeed)
}
if podSeries.Name != tt.expectedResult.Name {
t.Errorf("for %d, field Name: got %v, want %v", tt.rowID, podSeries.Name, tt.expectedResult.Name)
}
if podSeries.URL != tt.expectedResult.URL {
t.Errorf("for %d, field URL: got %v, want %v", tt.rowID, podSeries.URL, tt.expectedResult.URL)
}
if podSeries.Explicit != tt.expectedResult.Explicit {
t.Errorf("for %d, field Explicit: got %v, want %v", tt.rowID, podSeries.Explicit, tt.expectedResult.Explicit)
}
if podSeries.ImageURL != tt.expectedResult.ImageURL {
t.Errorf("for %d, field ImageURL: got %v, want %v", tt.rowID, podSeries.ImageURL, tt.expectedResult.ImageURL)
}
if podSeries.ITunesType != tt.expectedResult.ITunesType {
t.Errorf("for %d, field ITunesType: got %v, want %v", tt.rowID, podSeries.ITunesType, tt.expectedResult.ITunesType)
}
if podSeries.InLanguage != tt.expectedResult.InLanguage {
t.Errorf("for %d, field InLanguage: got %v, want %v", tt.rowID, podSeries.InLanguage, tt.expectedResult.InLanguage)
}
if podSeries.GUID != tt.expectedResult.GUID {
t.Errorf("for %d, field GUID: got %v, want %v", tt.rowID, podSeries.GUID, tt.expectedResult.GUID)
}
if podSeries.Description != tt.expectedResult.Description {
t.Errorf("for %d, field Description: got %v, want %v", tt.rowID, podSeries.Description, tt.expectedResult.Description)
}
if !EqualArray(podSeries.Categories, tt.expectedResult.Categories) {
t.Errorf("for %d, field Categories: got %v, want %v", tt.rowID, podSeries.Categories, tt.expectedResult.Categories)
}
})
}
}
func BenchmarkToPodSeries(b *testing.B) {
// Get some random podcasts from the db.
pod_index_models.ConnectDatabase()
var pod_index_podcasts []pod_index_models.Podcasts
if err := pod_index_models.DB.Order("RANDOM()").Limit(b.N).Find(&pod_index_podcasts).Error; err != nil {
fmt.Println(err.Error())
}
for _, pod := range pod_index_podcasts {
ToPodSeries(pod)
}
}
func EqualArray(a, b []models.Categories) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
func TestToPodEpisode(t *testing.T) {
type expectedResult struct {
value models.PodcastEpisode
isValid bool
}
parsedTime1, _ := time.Parse(time.RFC3339, "2015-05-29T22:59:00Z")
parsedTime2, _ := time.Parse(time.RFC3339, "0000-00-00T00:00:00Z")
var tests = []struct {
podSeriesId int
episodeGUID string
expectedResult expectedResult
}{
{6, "tag:blogger.com,1999:blog-8429222294807394803.post-5283595917266045004", expectedResult{value: models.PodcastEpisode{
Name: "June 6-7 2015 Dual Sport Events", DatePublished: parsedTime1, GUID: "tag:blogger.com,1999:blog-8429222294807394803.post-5283595917266045004", Duration: "",
Description: "<iframe allowfullscreen=\"\" frameborder=\"0\" height=\"30\" mozallowfullscreen=\"true\" src=\"https://archive.org/embed/DualSport662015\" webkitallowfullscreen=\"true\" width=\"500\"></iframe> <enclosure length=\"14,339,473\" type=\"audio/mpeg\" url=\"https://archive.org/embed/DualSport662015\"><a href=\"http://archive.org/details/mark_usdualsports_Fdx\">Link</a></enclosure> <a href=\"http://ia601505.us.archive.org/4/items/DualSport662015/Dual%20Sport%206-6-2015.mp3\">Link</a>",
ImageURL: "", ContentURL: "http://ia601505.us.archive.org/4/items/DualSport662015/Dual%20Sport%206-6-2015.mp3", PodcastSeriesID: 6}, isValid: true}},
{8, "f9b0051099697331aa933922a1cb6739", expectedResult{isValid: false}},
{120, "f9b0051099697331aa933922a1cb6739", expectedResult{isValid: false}},
{160, "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", expectedResult{value: models.PodcastEpisode{
Name: "Podcast CIEP | #2", DatePublished: parsedTime2, GUID: "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", Duration: "5:30",
Description: "Ajuste del gasto público: por qué se tiene que recortar el gasto y dónde estarán los recortes. Finalmente, se analiza la evolución del presupuesto asignado a la función Protección social y la improbabilidad de alcanzar una pensión universal.",
ImageURL: "", ContentURL: "http://ciep.mx/wp-content/uploads/2015/02/Podcast2_2.mp3", PodcastSeriesID: 160}, isValid: true}},
}
models.ConnectDatabase()
for _, tt := range tests {
var podcast_series models.PodcastSeries
testname := fmt.Sprintf("Testing id %v", tt.episodeGUID)
t.Run(testname, func(t *testing.T) {
// Get a podcast from the db.
if err := models.DB.Find(&podcast_series, int(tt.podSeriesId)).Error; err != nil {
t.Errorf("Failed to get podcast from db: %v", err.Error())
}
// Grab the RSS feed for this podcast.
fp := gofeed.NewParser()
feed, err := fp.ParseURL(podcast_series.WebFeed)
if err != nil {
//????? Store that this failed and have the job run again. If it fails many times, decrease poll time or stop polling altogether
t.Errorf("Failed to get the feed for %s: %v", podcast_series.WebFeed, err.Error())
} else {
// Process the RSS feed data.
for _, item := range feed.Items {
if item.GUID == tt.episodeGUID {
// This is the GUID we are trying to test.
podEpisode, err := ToPodEpisode(item, tt.podSeriesId)
if err != nil {
if tt.expectedResult.isValid == true {
t.Errorf("Failed to convert item to episode for %s, %s: %v", podcast_series.WebFeed, tt.episodeGUID, err.Error())
}
} else {
if podEpisode.Name != tt.expectedResult.value.Name {
t.Errorf("for %s, field Name: got %v, want %v", tt.episodeGUID, podEpisode.Name, tt.expectedResult.value.Name)
}
if podEpisode.DatePublished != tt.expectedResult.value.DatePublished {
t.Errorf("for %s, field DatePublished: got %v, want %v", tt.episodeGUID, podEpisode.DatePublished, tt.expectedResult.value.DatePublished)
}
if podEpisode.GUID != tt.expectedResult.value.GUID {
t.Errorf("for %s, field GUID: got %v, want %v", tt.episodeGUID, podEpisode.GUID, tt.expectedResult.value.GUID)
}
if podEpisode.Duration != tt.expectedResult.value.Duration {
t.Errorf("for %s, field Duration: got %v, want %v", tt.episodeGUID, podEpisode.Duration, tt.expectedResult.value.Duration)
}
if podEpisode.Description != tt.expectedResult.value.Description {
t.Errorf("for %s, field Description: got %v, want %v", tt.episodeGUID, podEpisode.Description, tt.expectedResult.value.Description)
}
if podEpisode.ImageURL != tt.expectedResult.value.ImageURL {
t.Errorf("for %s, field ImageURL: got %v, want %v", tt.episodeGUID, podEpisode.ImageURL, tt.expectedResult.value.ImageURL)
}
if podEpisode.ContentURL != tt.expectedResult.value.ContentURL {
t.Errorf("for %s, field ContentURL: got %v, want %v", tt.episodeGUID, podEpisode.ContentURL, tt.expectedResult.value.ContentURL)
}
if podEpisode.PodcastSeriesID != tt.expectedResult.value.PodcastSeriesID {
t.Errorf("for %s, field PodcastSeriesID: got %v, want %v", tt.episodeGUID, podEpisode.PodcastSeriesID, tt.expectedResult.value.PodcastSeriesID)
}
}
}
}
}
})
}
}
Loading…
Cancel
Save