package deadlinks import ( "context" "database/sql" "errors" "fmt" "strings" "time" "code.betamike.com/mediocregopher/mediocre-go-lib/miter" _ "github.com/mattn/go-sqlite3" migrate "github.com/rubenv/sql-migrate" ) // Store keeps track of the current status of all discovered Resources, and // links between them. A Resource which is neither pinned nor linked to from // another Resource is considered to not exist. // // An implementation of Store must be thread-safe. type Store interface { // GetByStatus returns all Resources with the given Status. GetByStatus(ResourceStatus) miter.Iterator[Resource] // GetURLsByLastChecked returns the URLs of all Resources with LastChecked // values older than the given timestamp, or which have never been checked. GetURLsByLastChecked(olderThan time.Time) miter.Iterator[URL] // SetPinned overwrites the set of pinned URLs with the given one. SetPinned(context.Context, []URL) error // Update updates the Resource identified by the given URL with the given // arguments. // // Update returns an error if the URL has not been pinned nor referenced as // an outgoing URL of a different Resource. Update( ctx context.Context, now time.Time, url URL, status ResourceStatus, errorString string, outgoing []URL, ) error // GC will garbage collect the store, removing any orphaned Resources. GC(context.Context) error } var migrations = &migrate.MemoryMigrationSource{Migrations: []*migrate.Migration{ { Id: "1", Up: []string{ `CREATE TABLE urls ( id INTEGER NOT NULL PRIMARY KEY, url TEXT NOT NULL, UNIQUE(url) )`, `CREATE TABLE resources ( url_id INTEGER NOT NULL PRIMARY KEY, status INTEGER NOT NULL DEFAULT 0, pinned INTEGER NOT NULL, last_checked INTEGER NOT NULL DEFAULT 0, error_string TEXT NOT NULL DEFAULT '', FOREIGN KEY(url_id) REFERENCES urls(id) ON DELETE CASCADE )`, `CREATE TABLE links ( from_url_id INTEGER NOT NULL, to_url_id INTEGER NOT NULL, FOREIGN KEY(from_url_id) REFERENCES urls(id) ON DELETE CASCADE, FOREIGN KEY(to_url_id) REFERENCES urls(id) ON DELETE CASCADE, PRIMARY KEY(from_url_id, to_url_id) )`, `CREATE INDEX links_outgoing_idx ON links (to_url_id)`, }, }, }} // SQLiteSQLiteStoreOpts are optional fields which can be provided to // NewSQLiteStore. A nil SQLiteSQLiteStoreOpts is equivalent to an empty one. type SQLiteStoreOpts struct { // Path to the database file to use. // // Defaults to ":memory:", indicating an in-memory database will be used. Path string } func (o *SQLiteStoreOpts) withDefaults() *SQLiteStoreOpts { if o == nil { o = new(SQLiteStoreOpts) } if o.Path == "" { o.Path = ":memory:" } return o } type SQLiteStore struct { db *sql.DB } var _ Store = (*SQLiteStore)(nil) // NewSQLiteStore returns a Store implementation which uses an in-memory SQLite // db. func NewSQLiteStore(o *SQLiteStoreOpts) *SQLiteStore { o = o.withDefaults() db, err := sql.Open("sqlite3", o.Path+"?_foreign_keys=1") if err != nil { panic(fmt.Errorf("opening sqlite in memory: %w", err)) } // go-sqlite doesn't support multiple go-routines, this is equivalent to // wrapping each call to the db in a mutex. db.SetMaxOpenConns(1) if _, err := migrate.Exec(db, "sqlite3", migrations, migrate.Up); err != nil { panic(fmt.Errorf("running migrations: %w", err)) } return &SQLiteStore{db} } // Close cleans up all resources held by the SQLiteStore store, if any. It must // be the final method call to the SQLiteStore. func (s *SQLiteStore) Close() error { return s.db.Close() } // iterate is a helper which can be used to read the results of a query in // chunks, producing a single unified Iterator. // // iterate assumes that the rows are being scanned ordered by their row ID, // which must be a number, and that each query call has some kind of limit // applied. func iterate[T any]( db interface { QueryContext(context.Context, string, ...any) (*sql.Rows, error) }, query string, mkArgs func(minRowID int) []any, scan func(*sql.Rows) (T, int, error), // returns scanned value and its rowID ) miter.Iterator[T] { var ( zero T minRowID = -1 res, resBase []T pop = func() T { r := res[0] res = res[1:] return r } ) return miter.FromFunc(func(ctx context.Context) (T, error) { if len(res) > 0 { return pop(), nil } res = resBase rows, err := db.QueryContext(ctx, query, mkArgs(minRowID)...) if err != nil { return zero, fmt.Errorf("executing query: %w", err) } defer rows.Close() for rows.Next() { var r T if r, minRowID, err = scan(rows); err != nil { return zero, fmt.Errorf("scanning row: %w", err) } res = append(res, r) } resBase = res[:0] if len(res) == 0 { return zero, miter.ErrEnd } return pop(), nil }) } const getByStatusLimit = 16 // GetByStatus implements the method for the Store interface. func (s *SQLiteStore) GetByStatus(status ResourceStatus) miter.Iterator[Resource] { const query = ` WITH incoming(url_id, urls) AS ( SELECT to_url_id, GROUP_CONCAT(url, char(0)) FROM links JOIN urls ON (urls.id = links.from_url_id) GROUP BY to_url_id ), outgoing(url_id, urls) AS ( SELECT from_url_id, GROUP_CONCAT(url, char(0)) FROM links JOIN urls ON (urls.id = links.to_url_id) GROUP BY from_url_id ) SELECT resources.url_id, url, status, pinned, last_checked, error_string, incoming.urls, outgoing.urls FROM resources JOIN urls ON (urls.id = resources.url_id) LEFT JOIN incoming ON (incoming.url_id = resources.url_id) LEFT JOIN outgoing ON (outgoing.url_id = resources.url_id) WHERE status = ? AND (pinned OR incoming.urls IS NOT NULL) AND resources.url_id > ? ORDER BY resources.url_id ASC LIMIT ?` return iterate( s.db, query, func(minRowID int) []any { return []any{status, minRowID, getByStatusLimit} }, func(rows *sql.Rows) (Resource, int, error) { var ( r Resource rowID int err error lastChecked int64 incoming, outgoing sql.NullString ) if err := rows.Scan( &rowID, &r.URL, &r.Status, &r.Pinned, &lastChecked, &r.ErrorString, &incoming, &outgoing, ); err != nil { return Resource{}, 0, fmt.Errorf("calling Scan: %w", err) } if lastChecked != 0 { r.LastChecked = time.Unix(lastChecked, 0).UTC() } if incoming.String != "" { if r.IncomingLinkURLs, err = parseURLs( strings.Split(incoming.String, "\x00"), ); err != nil { return Resource{}, 0, fmt.Errorf( "parsing incoming links: %w", err, ) } } if outgoing.String != "" { if r.OutgoingLinkURLs, err = parseURLs( strings.Split(outgoing.String, "\x00"), ); err != nil { return Resource{}, 0, fmt.Errorf( "parsing outgoing links: %w", err, ) } } return r, rowID, nil }, ) } const getURLsByLastCheckedLimit = 64 // GetURLsByLastChecked implements the method for the Store interface. func (s *SQLiteStore) GetURLsByLastChecked( olderThan time.Time, ) miter.Iterator[URL] { const query = ` WITH incoming(url_id, urls) AS ( SELECT to_url_id, COUNT(1) FROM links GROUP BY to_url_id ) SELECT resources.url_id, url FROM resources JOIN urls ON (urls.id = resources.url_id) LEFT JOIN incoming ON (incoming.url_id = resources.url_id) WHERE last_checked < ? AND (pinned OR incoming.urls IS NOT NULL) AND resources.url_id > ? ORDER BY resources.url_id ASC LIMIT ?` return iterate( s.db, query, func(minRowID int) []any { return []any{ olderThan.Unix(), minRowID, getURLsByLastCheckedLimit, } }, func(rows *sql.Rows) (URL, int, error) { var ( urlID int urlStr string ) if err := rows.Scan(&urlID, &urlStr); err != nil { return "", 0, fmt.Errorf("scanning url: %w", err) } url, err := ParseURL(urlStr) if err != nil { return "", 0, fmt.Errorf( "parsing url %q from db: %w", urlStr, err, ) } return url, urlID, nil }, ) } func (s *SQLiteStore) touch(ctx context.Context, urls []URL, pinned bool) ( []int, error, ) { var ( urlsQueryParams = make([]any, len(urls)) resourcesQueryParams = make([]any, 0, (len(urls)*2)+1) ids = make([]int, 0, len(urls)) ) for i := range urls { urlsQueryParams[i] = urls[i] } urlsQuery := ` INSERT INTO urls (url) VALUES ` + joinRepeated("(?)", ",", len(urls)) + ` ON CONFLICT DO UPDATE SET url=url RETURNING id` rows, err := s.db.QueryContext(ctx, urlsQuery, urlsQueryParams...) if err != nil { return nil, fmt.Errorf("inserting into urls: %w", err) } for range urls { if !rows.Next() { return nil, errors.Join( errors.New("expected a returned row"), rows.Close(), ) } var id int if err := rows.Scan(&id); err != nil { return nil, errors.Join( fmt.Errorf("scanning return from insert into urls: %w", err), rows.Close(), ) } resourcesQueryParams = append(resourcesQueryParams, id, pinned) ids = append(ids, id) } rows.Close() resourcesQuery := ` INSERT INTO resources (url_id, pinned) VALUES ` + joinRepeated("(?,?)", ",", len(urls)) + ` ON CONFLICT DO UPDATE SET pinned = ?` resourcesQueryParams = append(resourcesQueryParams, pinned) _, err = s.db.ExecContext(ctx, resourcesQuery, resourcesQueryParams...) if err != nil { return nil, fmt.Errorf("inserting into resources: %w", err) } return ids, nil } // SetPinned implements the method for the Store interface. func (s *SQLiteStore) SetPinned(ctx context.Context, urls []URL) error { _, err := s.db.ExecContext(ctx, `UPDATE resources SET pinned = 0`) if err != nil { return fmt.Errorf("unsetting pinned on all resources: %w", err) } if _, err := s.touch(ctx, urls, true); err != nil { return fmt.Errorf("pinning resources: %w", err) } return err } // Update implements the method for the Store interface. func (s *SQLiteStore) Update( ctx context.Context, now time.Time, url URL, status ResourceStatus, errorString string, outgoing []URL, ) error { const resourcesQuery = ` UPDATE resources SET status = ?, last_checked = ?, error_string = ? WHERE url_id = (SELECT id FROM urls WHERE url = ?) RETURNING url_id` var urlID int err := s.db.QueryRowContext( ctx, resourcesQuery, status, now.Unix(), errorString, url, ).Scan(&urlID) if err != nil { return fmt.Errorf("inserting into resources: %w", err) } _, err = s.db.ExecContext( ctx, `DELETE FROM links WHERE from_url_id = ?`, urlID, ) if err != nil { return fmt.Errorf("deleting from links: %w", err) } if len(outgoing) == 0 { return nil } outgoingIDs, err := s.touch(ctx, outgoing, false) if err != nil { return fmt.Errorf("touching outgoing links: %w", err) } linksQueryParams := make([]any, 0, len(outgoingIDs)*2) for i := range outgoingIDs { linksQueryParams = append(linksQueryParams, urlID, outgoingIDs[i]) } linksQuery := ` INSERT INTO links (from_url_id, to_url_id) VALUES ` + joinRepeated("(?,?)", ",", len(outgoing)) + ` ON CONFLICT DO NOTHING` _, err = s.db.ExecContext(ctx, linksQuery, linksQueryParams...) if err != nil { return fmt.Errorf("inserting into links: %w", err) } return nil } // GC implements the method for the Store interface. func (s *SQLiteStore) GC(ctx context.Context) error { const query = ` WITH orphans AS ( SELECT url_id FROM resources LEFT JOIN links ON (links.to_url_id = resources.url_id) WHERE pinned = 0 AND from_url_id IS NULL ) DELETE FROM urls WHERE id IN orphans ` for { res, err := s.db.ExecContext(ctx, query) if err != nil { return fmt.Errorf("performing delete: %w", err) } else if n, err := res.RowsAffected(); err != nil { return fmt.Errorf("determining rows affected: %w", err) } else if n == 0 { return nil } } } func joinRepeated(str, sep string, n int) string { res := strings.Repeat(str+sep, n) return res[:len(res)-len(sep)] }